Skip to content

Commit af4028d

Browse files
authored
Merge branch 'main' into query-schema-generation-dryrun-caching
2 parents 53c0a8e + 0634ed8 commit af4028d

File tree

131 files changed

+3269
-486
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+3269
-486
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ workflows:
5959
sql/mozfun/.* validate-routines true
6060
sql/moz-fx-data-shared-prod/udf/.* validate-routines true
6161
sql/moz-fx-data-shared-prod/udf_js/.* validate-routines true
62+
bqetl_project.yaml trigger-sql-generation true
6263
bqetl_project.yaml validate-sql true
6364
bqetl_project.yaml validate-routines true
6465
dags.yaml validate-sql true

.circleci/workflows.yml

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,15 @@ jobs:
352352
- *restore_venv_cache
353353
- *build
354354
- *attach_generated_sql
355+
- &restore_schema_cache
356+
run:
357+
name: Restore schema cache from generate-sql job
358+
command: |
359+
# Restore stable table schema cache from workspace
360+
if [ -d /tmp/workspace/schema-cache/bigquery_etl_schemas ]; then
361+
cp -r /tmp/workspace/schema-cache/bigquery_etl_schemas /tmp/
362+
echo "Restored schema cache from generate-sql job"
363+
fi
355364
- *authenticate
356365
- &add_private_bigquery_etl_ssh_keys
357366
add_ssh_keys:
@@ -407,6 +416,11 @@ jobs:
407416
command: |
408417
rm ~/telemetry-airflow/dags/* -f || true
409418
cp -a /tmp/workspace/generated-sql/dags/. ~/telemetry-airflow/dags/
419+
- restore_cache:
420+
keys:
421+
- telemetry-airflow-deps-v1-{{ checksum "~/telemetry-airflow/requirements.txt" }}-{{ checksum "~/telemetry-airflow/requirements-dev.txt" }}-{{ checksum "~/telemetry-airflow/requirements-override.txt" }}
422+
- telemetry-airflow-deps-v1-{{ checksum "~/telemetry-airflow/requirements.txt" }}-{{ checksum "~/telemetry-airflow/requirements-dev.txt" }}-
423+
- telemetry-airflow-deps-v1-
410424
- run:
411425
name: Install telemetry-airflow dependencies
412426
command: |
@@ -416,6 +430,10 @@ jobs:
416430
pip install -r requirements.txt
417431
pip install -r requirements-dev.txt
418432
pip install -r requirements-override.txt
433+
- save_cache:
434+
paths:
435+
- ~/telemetry-airflow/.venv
436+
key: telemetry-airflow-deps-v1-{{ checksum "~/telemetry-airflow/requirements.txt" }}-{{ checksum "~/telemetry-airflow/requirements-dev.txt" }}-{{ checksum "~/telemetry-airflow/requirements-override.txt" }}
419437
- run:
420438
name: 🧪 Test valid DAGs
421439
command: |
@@ -610,6 +628,7 @@ jobs:
610628
--ignore derived_view_schemas \
611629
--output-dir /tmp/workspace/generated-sql/sql/ \
612630
--target-project moz-fx-data-shared-prod
631+
PATH="venv/bin:$PATH" script/bqetl format /tmp/workspace/generated-sql/sql/
613632
else
614633
echo "Changes made don't affect generated SQL. Use content from generated-sql"
615634
@@ -618,18 +637,26 @@ jobs:
618637
cp -a sql/. /tmp/workspace/generated-sql/sql
619638
fi
620639
621-
PATH="venv/bin:$PATH" script/bqetl format /tmp/workspace/generated-sql/sql/
622640
PATH="venv/bin:$PATH" script/bqetl dependency record \
623641
--skip-existing \
624642
"/tmp/workspace/generated-sql/sql/"
625643
PATH="venv/bin:$PATH" script/bqetl metadata update \
626644
--sql-dir /tmp/workspace/generated-sql/sql/ \
627645
/tmp/workspace/generated-sql/sql/
628646
PATH="venv/bin:$PATH" script/bqetl monitoring update /tmp/workspace/generated-sql/sql/
647+
- run:
648+
name: Copy schema cache for reuse in other jobs
649+
command: |
650+
# Copy stable table schema cache to workspace for reuse in generate-dags
651+
mkdir -p /tmp/workspace/schema-cache
652+
if [ -d /tmp/bigquery_etl_schemas ]; then
653+
cp -r /tmp/bigquery_etl_schemas /tmp/workspace/schema-cache/
654+
fi
629655
- persist_to_workspace:
630656
root: /tmp/workspace
631657
paths:
632658
- generated-sql
659+
- schema-cache
633660
- unless:
634661
condition: *validate-sql-or-routines
635662
steps:
@@ -646,6 +673,7 @@ jobs:
646673
- *restore_venv_cache
647674
- *build
648675
- *attach_generated_sql
676+
- *restore_schema_cache
649677
- *copy_generated_sql
650678
- add_ssh_keys:
651679
fingerprints:
@@ -1106,8 +1134,8 @@ jobs:
11061134
name: Manually trigger integration tests for fork
11071135
# yamllint disable rule:line-length
11081136
command: |
1109-
apt update
1110-
apt install jq -y
1137+
sudo apt update
1138+
sudo apt install jq -y
11111139
11121140
CIRCLE_PR_BRANCH=`curl -s https://api.github.com/repos/${CIRCLE_PROJECT_USERNAME}/${CIRCLE_PROJECT_REPONAME}/pulls/${CIRCLE_PR_NUMBER} | jq -r '.head.label'`
11131141

bigquery_etl/glam/templates/clients_daily_histogram_aggregates_v1.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ WITH extracted AS (
1717
WHERE
1818
DATE(submission_timestamp) = {{ submission_date }}
1919
AND client_info.client_id IS NOT NULL
20+
AND LOWER(IFNULL(metadata.isp.name, "")) <> "browserstack" -- Removes bots data.
2021
),
2122
histograms AS (
2223
SELECT

bigquery_etl/glam/templates/clients_daily_scalar_aggregates_v1.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ WITH extracted AS (
1717
WHERE
1818
DATE(submission_timestamp) = {{ submission_date }}
1919
AND client_info.client_id IS NOT NULL
20+
AND LOWER(IFNULL(metadata.isp.name, "")) <> "browserstack" -- Removes bots data.
2021
),
2122
unlabeled_metrics AS (
2223
SELECT

bigquery_etl/query_scheduling/dag.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class Dag:
122122
repo: str = attr.ib("bigquery-etl")
123123
tags: List[str] = attr.ib([])
124124
catchup: bool = attr.ib(False)
125+
max_active_runs: Optional[int] = attr.ib(None)
125126

126127
@name.validator
127128
def validate_dag_name(self, attribute, value):

bigquery_etl/query_scheduling/dag_collection.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,31 @@
1010
import yaml
1111
from black import FileMode, format_file_contents
1212

13+
from bigquery_etl.dependency import extract_table_references_without_views
1314
from bigquery_etl.query_scheduling.dag import Dag, InvalidDag, PublicDataJsonDag
1415
from bigquery_etl.query_scheduling.utils import negate_timedelta_string
16+
from bigquery_etl.schema.stable_table_schema import get_stable_table_schemas
17+
18+
19+
def _precompute_task_references(task):
20+
"""Pre-compute expensive table references parsing and return result."""
21+
if (
22+
task.is_python_script
23+
or task.is_bigeye_check
24+
or task.referenced_tables is not None
25+
):
26+
return (task.task_key, task.referenced_tables or [])
27+
28+
query_files = [Path(task.query_file)]
29+
if task.multipart:
30+
query_files = list(Path(task.query_file_path).glob("*.sql"))
31+
32+
table_names = {
33+
tuple(table.split("."))
34+
for query_file in query_files
35+
for table in extract_table_references_without_views(query_file)
36+
}
37+
return (task.task_key, sorted(table_names))
1538

1639

1740
class DagCollection:
@@ -192,10 +215,32 @@ def to_airflow_dags(self, output_dir, dag_to_generate=None):
192215
except Exception:
193216
pass
194217

218+
# Pre-load stable table schemas before spawning workers to avoid loading multiple times
219+
# This downloads and caches schemas once in the main process
220+
try:
221+
get_stable_table_schemas()
222+
except Exception:
223+
# If schema loading fails, continue anyway (some tasks may not need them)
224+
pass
225+
226+
# Pre-compute referenced tables for all tasks in parallel
227+
# This is the expensive I/O-heavy part (SQL parsing via extract_table_references_without_views)
228+
all_tasks = [task for dag in self.dags for task in dag.tasks]
229+
230+
with get_context("spawn").Pool(8) as p:
231+
task_references = p.map(_precompute_task_references, all_tasks)
232+
233+
# Update tasks with precomputed references
234+
task_map = {task.task_key: task for task in all_tasks}
235+
for task_key, referenced_tables in task_references:
236+
task_map[task_key].referenced_tables = referenced_tables
237+
238+
# Resolve dependencies sequentially
195239
for dag in self.dags:
196240
dag.with_upstream_dependencies(self)
197241
dag.with_downstream_dependencies(self)
198242

243+
# Finally, parallelize DAG-to-Airflow conversion
199244
to_airflow_dag = partial(self.dag_to_airflow, output_dir)
200245
with get_context("spawn").Pool(8) as p:
201246
p.map(to_airflow_dag, self.dags)

bigquery_etl/query_scheduling/templates/airflow_dag.j2

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,19 @@ default_args = {{
8282

8383
tags = {{ tags }}
8484

85-
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md=docs, tags=tags, catchup={{ catchup }}) as dag:
85+
with DAG(
86+
'{{ name }}',
87+
default_args=default_args,
88+
{% if schedule_interval != None -%}
89+
schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }},
90+
{%- endif %}
91+
{% if max_active_runs -%}
92+
max_active_runs={{ max_active_runs }},
93+
{%- endif %}
94+
doc_md=docs,
95+
tags=tags,
96+
catchup={{ catchup }},
97+
) as dag:
8698
{% for task_group in task_groups | sort %}
8799
task_group_{{ task_group }} = TaskGroup('{{ task_group }}')
88100
{% endfor %}

bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,18 @@ default_args = {{
3838

3939
tags = {{ tags }}
4040

41-
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md = docs, tags = tags) as dag:
41+
with DAG(
42+
'{{ name }}',
43+
default_args=default_args,
44+
{% if schedule_interval != None -%}
45+
schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }},
46+
{%- endif %}
47+
{% if max_active_runs -%}
48+
max_active_runs={{ max_active_runs }},
49+
{%- endif %}
50+
doc_md=docs,
51+
tags=tags,
52+
) as dag:
4253
docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"
4354
{% for task_group in task_groups | sort %}
4455
task_group_{{ task_group }} = TaskGroup('{{ task_group }}')

bigquery_etl/util/common.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import string
99
import tempfile
1010
import warnings
11+
from functools import cache
1112
from pathlib import Path
1213
from typing import List, Optional, Set, Tuple
1314
from uuid import uuid4
@@ -17,7 +18,7 @@
1718
from google.cloud import bigquery
1819
from jinja2 import Environment, FileSystemLoader
1920

20-
from bigquery_etl.config import ConfigLoader
21+
from bigquery_etl.config import BQETL_PROJECT_CONFIG, ConfigLoader
2122
from bigquery_etl.format_sql.formatter import reformat
2223
from bigquery_etl.metrics import MetricHub
2324

@@ -70,6 +71,18 @@ def random_str(length: int = 12) -> str:
7071
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
7172

7273

74+
@cache
75+
def get_bqetl_project_root() -> Path | None:
76+
"""Return the root path of the bqetl project the user is currently in."""
77+
cwd = Path.cwd()
78+
search_paths = [cwd]
79+
search_paths.extend(cwd.parents)
80+
for possible_project_root in search_paths:
81+
if (possible_project_root / BQETL_PROJECT_CONFIG).exists():
82+
return possible_project_root
83+
return None
84+
85+
7386
def render(
7487
sql_filename,
7588
template_folder=".",
@@ -78,7 +91,8 @@ def render(
7891
**kwargs,
7992
) -> str:
8093
"""Render a given template query using Jinja."""
81-
path = Path(template_folder) / sql_filename
94+
template_folder_path = Path(template_folder)
95+
path = template_folder_path / sql_filename
8296
skip = {
8397
file
8498
for skip in ConfigLoader.get("render", "skip", fallback=[])
@@ -122,14 +136,20 @@ def render(
122136
checks_template.write_text(
123137
macro_imports
124138
+ "\n"
125-
+ (Path(template_folder) / sql_filename).read_text()
139+
+ (template_folder_path / sql_filename).read_text()
126140
)
127141

128142
file_loader = FileSystemLoader(f"{str(checks_template.parent)}")
129143
env = Environment(loader=file_loader)
130144
main_sql = env.get_template(checks_template.name)
131145
else:
132-
file_loader = FileSystemLoader(f"{template_folder}")
146+
# Add the bigquery-etl project root to the search path to support Jinja imports/includes.
147+
file_loader_search_paths = [template_folder_path, ROOT]
148+
# Also dynamically detect the project root so imports/includes in the private-bigquery-etl repo work.
149+
if bqetl_project_root := get_bqetl_project_root():
150+
if bqetl_project_root not in file_loader_search_paths:
151+
file_loader_search_paths.append(bqetl_project_root)
152+
file_loader = FileSystemLoader(file_loader_search_paths)
133153
env = Environment(loader=file_loader)
134154
main_sql = env.get_template(sql_filename)
135155

bqetl_project.yaml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ generate:
374374
- firefox_reality_pc
375375
- lockwise_android
376376
- lockwise_ios
377+
- mozilla_vpn # VPN events are in `main` pings rather than `events` pings (DENG-8596).
377378
- tiktokreporter_ios
378379
- tiktokreporter_android
379380
metrics_as_struct_apps:
@@ -530,19 +531,21 @@ generate:
530531
fenix:
531532
templates:
532533
- terms_of_use_status_v1
534+
- terms_of_use_events_v1
533535
bigeye:
534536
collection: Operational Checks
535537
notification_channel: '#de-bigeye-triage'
536538
firefox_ios:
537539
templates:
538540
- terms_of_use_status_v1
541+
- terms_of_use_events_v1
539542
bigeye:
540543
collection: Operational Checks
541544
notification_channel: '#de-bigeye-triage'
542545
firefox_desktop:
543546
templates:
544547
- terms_of_use_status_v1
545-
- terms_of_use_events_v1
548+
- terms_of_use_messages_v1
546549
bigeye:
547550
collection: Operational Checks
548551
notification_channel: '#de-bigeye-triage'
@@ -562,6 +565,23 @@ generate:
562565
geo_deprecation:
563566
include_app_ids:
564567
- org_mozilla_ios_klar
568+
- org_mozilla_klar
569+
- org_mozilla_ios_focus
570+
- org_mozilla_focus
571+
- org_mozilla_focus_beta
572+
- org_mozilla_focus_nightly
573+
- org_mozilla_ios_firefox
574+
- org_mozilla_ios_firefoxbeta
575+
- org_mozilla_ios_fennec
576+
- org_mozilla_firefox
577+
- org_mozilla_firefox_beta
578+
- org_mozilla_fenix
579+
- org_mozilla_fenix_nightly
580+
- org_mozilla_fennec_aurora
581+
- firefox_desktop
582+
- firefox_desktop_background_update
583+
- firefox_desktop_background_defaultagent
584+
- firefox_desktop_background_tasks
565585
skip_tables:
566586
- newtab
567587

@@ -570,6 +590,7 @@ retention_exclusion_list:
570590
- sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_v3
571591
- sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_28_days_later_v1
572592
- sql/moz-fx-data-shared-prod/telemetry_derived/clients_first_seen_28_days_later_v3
593+
- sql/moz-fx-data-shared-prod/firefox_desktop_derived/clients_first_seen_28_days_later_v1
573594
- sql/moz-fx-data-shared-prod/org_mozilla_firefox_derived/baseline_clients_first_seen_v1
574595
- sql/moz-fx-data-shared-prod/org_mozilla_fenix_derived/baseline_clients_first_seen_v1
575596
- sql/moz-fx-data-shared-prod/fenix_derived/ltv_states_v1
@@ -653,3 +674,4 @@ retention_exclusion_list:
653674
- sql/moz-fx-data-shared-prod/firefox_desktop_background_update/baseline_clients_city_seen_v1
654675
- sql/moz-fx-data-shared-prod/firefox_desktop_background_defaultagent/baseline_clients_city_seen_v1
655676
- sql/moz-fx-data-shared-prod/firefox_desktop_background_tasks/baseline_clients_city_seen_v1
677+
- sql/moz-fx-data-shared-prod/firefox_desktop_derived/metrics_clients_first_seen_v1

0 commit comments

Comments
 (0)