Skip to content

Commit 28dcbcb

Browse files
sean-roseBenWu
andauthored
feat(events_stream): Expose event extras as individual fields in events_stream views (DENG-9546) (#8361)
* feat(events_stream): Expose event extras as individual fields in `events_stream` views (DENG-9546). * fix(events_stream): Use `UNION ALL BY NAME` because the columns may not always be in the same order. * Correct typo in `sql_generators/glean_usage/events_stream.py`. Co-authored-by: Ben Wu <[email protected]> --------- Co-authored-by: Ben Wu <[email protected]>
1 parent de5921b commit 28dcbcb

File tree

6 files changed

+165
-27
lines changed

6 files changed

+165
-27
lines changed

sql_generators/glean_usage/common.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
1818
from bigquery_etl.util.common import get_table_dir, render, write_sql
1919

20-
APP_LISTINGS_URL = "https://probeinfo.telemetry.mozilla.org/v2/glean/app-listings"
20+
PROBEINFO_URL = "https://probeinfo.telemetry.mozilla.org"
21+
APP_LISTINGS_URL = f"{PROBEINFO_URL}/v2/glean/app-listings"
2122
PATH = Path(os.path.dirname(__file__))
2223

2324
# added as the result of baseline checks being added to the template,
@@ -174,6 +175,28 @@ def get_app_info() -> dict[str, list[dict]]:
174175
return app_info
175176

176177

178+
@cache
179+
def get_glean_repositories() -> list[dict]:
180+
"""Return a list of the Glean repositories."""
181+
resp = requests.get(f"{PROBEINFO_URL}/glean/repositories")
182+
resp.raise_for_status()
183+
return resp.json()
184+
185+
186+
def get_glean_app_pings(v1_name: str) -> dict[str, dict]:
187+
"""Return a dictionary of the Glean app's pings."""
188+
resp = requests.get(f"{PROBEINFO_URL}/glean/{v1_name}/pings")
189+
resp.raise_for_status()
190+
return resp.json()
191+
192+
193+
def get_glean_app_metrics(v1_name: str) -> dict[str, dict]:
194+
"""Return a dictionary of the Glean app's metrics."""
195+
resp = requests.get(f"{PROBEINFO_URL}/glean/{v1_name}/metrics")
196+
resp.raise_for_status()
197+
return resp.json()
198+
199+
177200
class GleanTable:
178201
"""Represents a generated Glean table."""
179202

sql_generators/glean_usage/event_monitoring_live.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
from pathlib import Path
77
from typing import List, Set
88

9-
import requests
10-
119
from bigquery_etl.config import ConfigLoader
1210
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
1311
from sql_generators.glean_usage.common import (
1412
GleanTable,
13+
get_glean_app_metrics,
14+
get_glean_app_pings,
1515
get_table_dir,
1616
render,
1717
table_names_from_baseline,
@@ -22,9 +22,6 @@
2222
TARGET_DATASET_CROSS_APP = "monitoring"
2323
PREFIX = "event_monitoring"
2424
PATH = Path(os.path.dirname(__file__))
25-
GLEAN_APP_BASE_URL = "https://probeinfo.telemetry.mozilla.org/glean/{app_name}"
26-
METRICS_INFO_URL = f"{GLEAN_APP_BASE_URL}/metrics"
27-
PING_INFO_URL = f"{GLEAN_APP_BASE_URL}/pings"
2825

2926

3027
class EventMonitoringLive(GleanTable):
@@ -54,15 +51,11 @@ def _get_tables_with_events(
5451
) -> Set[str]:
5552
"""Get tables for the given app that receive event type metrics."""
5653
pings = set()
57-
metrics_resp = requests.get(METRICS_INFO_URL.format(app_name=v1_name))
58-
metrics_resp.raise_for_status()
59-
metrics_json = metrics_resp.json()
54+
metrics_json = get_glean_app_metrics(v1_name)
6055

6156
min_pings = set()
6257
if skip_min_ping:
63-
ping_resp = requests.get(PING_INFO_URL.format(app_name=v1_name))
64-
ping_resp.raise_for_status()
65-
ping_json = ping_resp.json()
58+
ping_json = get_glean_app_pings(v1_name)
6659
min_pings = {
6760
name
6861
for name, info in ping_json.items()

sql_generators/glean_usage/events_stream.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
"""Generate events stream queries for Glean apps."""
22

33
import re
4+
from collections import defaultdict
5+
from datetime import date, datetime, timedelta
6+
from functools import cache
7+
from typing import Optional
48

59
from bigquery_etl.config import ConfigLoader
6-
from sql_generators.glean_usage.common import GleanTable, ping_has_metrics
10+
from sql_generators.glean_usage.common import (
11+
GleanTable,
12+
get_glean_app_metrics,
13+
get_glean_repositories,
14+
ping_has_metrics,
15+
)
716

817
TARGET_TABLE_ID = "events_stream_v1"
918
PREFIX = "events_stream"
@@ -85,6 +94,9 @@ def generate_per_app_id(
8594
app_id_info["bq_dataset_family"], unversioned_table_name
8695
),
8796
"slice_by_sample_id": slice_by_sample_id,
97+
"extras_by_type": get_glean_app_event_extras_by_type(
98+
app_id_info["v1_name"]
99+
),
88100
}
89101

90102
super().generate_per_app_id(
@@ -117,6 +129,16 @@ def generate_per_app(
117129
):
118130
return
119131

132+
extras_by_type = defaultdict(set)
133+
for app_id_info in app_ids_info:
134+
app_id_extras_by_type = get_glean_app_event_extras_by_type(
135+
app_id_info["v1_name"]
136+
)
137+
for app_id_extra_type, app_id_extras in app_id_extras_by_type.items():
138+
extras_by_type[app_id_extra_type].update(app_id_extras)
139+
140+
custom_render_kwargs = {"extras_by_type": extras_by_type}
141+
120142
super().generate_per_app(
121143
project_id,
122144
app_name,
@@ -126,4 +148,72 @@ def generate_per_app(
126148
parallelism=parallelism,
127149
id_token=id_token,
128150
all_base_tables_exist=all_base_tables_exist,
151+
custom_render_kwargs=custom_render_kwargs,
129152
)
153+
154+
155+
@cache
156+
def get_glean_app_repository(v1_name: str) -> dict:
157+
"""Return the Glean app's repository."""
158+
for repository in get_glean_repositories():
159+
if repository["name"] == v1_name:
160+
return repository
161+
raise Exception(f"No Glean repository found for app `{v1_name}`.")
162+
163+
164+
@cache
165+
def get_glean_dependency_v1_name(dependency: str) -> str:
166+
"""Return the v1 name of the Glean dependency, which may be a library."""
167+
for repository in get_glean_repositories():
168+
if "library_names" in repository and dependency in repository["library_names"]:
169+
return repository["name"]
170+
return dependency
171+
172+
173+
@cache
174+
def get_glean_app_event_extras_by_type(
175+
v1_name: str, ping: str = "events", cutoff_date: Optional[date] = None
176+
) -> dict[str, set[str]]:
177+
"""Return the Glean app's event extra keys for the specified ping, grouped by type."""
178+
extras_by_type = defaultdict(set)
179+
repository = get_glean_app_repository(v1_name)
180+
181+
if not cutoff_date:
182+
ping_delete_after_days = (
183+
(
184+
repository.get("moz_pipeline_metadata", {})
185+
.get(ping, {})
186+
.get("expiration_policy", {})
187+
.get("delete_after_days")
188+
)
189+
or (
190+
repository.get("moz_pipeline_metadata_defaults", {})
191+
.get("expiration_policy", {})
192+
.get("delete_after_days")
193+
)
194+
or 775
195+
)
196+
cutoff_date = date.today() - timedelta(days=(ping_delete_after_days - 1))
197+
198+
metrics = list(get_glean_app_metrics(v1_name).values())
199+
for metric in metrics:
200+
if metric["type"] == "event":
201+
for history in metric["history"]:
202+
last_date = datetime.fromisoformat(history["dates"]["last"]).date()
203+
if ping in history["send_in_pings"] and last_date >= cutoff_date:
204+
for extra_key, extra_key_info in history["extra_keys"].items():
205+
extra_type = extra_key_info.get("type", "string")
206+
extras_by_type[extra_type].add(extra_key)
207+
208+
for dependency in repository["dependencies"]:
209+
dependency_v1_name = get_glean_dependency_v1_name(dependency)
210+
dependency_extras_by_type = get_glean_app_event_extras_by_type(
211+
dependency_v1_name, ping, cutoff_date
212+
)
213+
for (
214+
dependency_extra_type,
215+
dependency_extras,
216+
) in dependency_extras_by_type.items():
217+
extras_by_type[dependency_extra_type].update(dependency_extras)
218+
219+
return extras_by_type
Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
1+
{% from 'macros.sql' import event_extras_by_type_struct -%}
12
-- Generated via ./bqetl generate glean_usage
23
CREATE OR REPLACE VIEW
34
`{{ project_id }}.{{ target_view }}`
45
AS
5-
{% for (dataset, channel) in datasets %}
6-
{% if not loop.first -%}
7-
UNION ALL
8-
{% endif -%}
6+
WITH events_stream_union AS (
7+
{% for (dataset, channel) in datasets %}
8+
{% if not loop.first -%}
9+
UNION ALL BY NAME
10+
{% endif -%}
11+
SELECT
12+
"{{ dataset }}" AS normalized_app_id,
13+
e.*
14+
{% if app_name == "fenix" -%}
15+
REPLACE(mozfun.norm.fenix_app_info("{{ dataset }}", client_info.app_build).channel AS normalized_channel),
16+
{% elif datasets|length > 1 -%}
17+
REPLACE("{{ channel }}" AS normalized_channel),
18+
{% endif -%}
19+
FROM `{{ project_id }}.{{ dataset }}_derived.events_stream_v1` AS e
20+
{% endfor %}
21+
)
922
SELECT
10-
"{{ dataset }}" AS normalized_app_id,
11-
e.*
12-
{% if app_name == "fenix" -%}
13-
REPLACE(mozfun.norm.fenix_app_info("{{ dataset }}", client_info.app_build).channel AS normalized_channel),
14-
{% elif datasets|length > 1 -%}
15-
REPLACE("{{ channel }}" AS normalized_channel),
16-
{% endif -%}
17-
FROM `{{ project_id }}.{{ dataset }}.events_stream` AS e
18-
{% endfor %}
23+
*,
24+
{% if extras_by_type %}
25+
{{ event_extras_by_type_struct(extras_by_type) }} AS extras
26+
{% endif %}
27+
FROM events_stream_union
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
{% from 'macros.sql' import event_extras_by_type_struct -%}
12
{{ header }}
23

34
CREATE OR REPLACE VIEW
45
`{{ project_id }}.{{ events_stream_view }}`
56
AS
67
SELECT
7-
*
8+
*,
9+
{% if extras_by_type %}
10+
{{ event_extras_by_type_struct(extras_by_type) }} AS extras
11+
{% endif %}
812
FROM
913
`{{ project_id }}.{{ events_stream_table }}`

sql_generators/glean_usage/templates/macros.sql

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,22 @@ _core_clients_first_seen AS (
3030
_fennec_id_lookup.client_id
3131
)
3232
{% endmacro %}
33+
34+
35+
{% macro event_extras_by_type_struct(extras_by_type) %}
36+
STRUCT(
37+
{% for extra_type in extras_by_type | sort %}
38+
STRUCT(
39+
{% for extra in extras_by_type[extra_type] | sort %}
40+
{% if extra_type == 'boolean' %}
41+
LAX_BOOL(event_extra.{{ extra }}) AS `{{ extra }}`{{ ',' if not loop.last else '' }}
42+
{% elif extra_type == 'quantity' %}
43+
LAX_INT64(event_extra.{{ extra }}) AS `{{ extra }}`{{ ',' if not loop.last else '' }}
44+
{% else %}
45+
JSON_VALUE(event_extra.{{ extra }}) AS `{{ extra }}`{{ ',' if not loop.last else '' }}
46+
{% endif %}
47+
{% endfor %}
48+
) AS `{{ extra_type }}`{{ ',' if not loop.last else '' }}
49+
{% endfor %}
50+
)
51+
{% endmacro %}

0 commit comments

Comments
 (0)