Skip to content

Commit 12c31f3

Browse files
authored
fix(SubPlat): Limit the bqetl_subplat_hourly DAG to one active run at a time (DENG-10213). (#8489)
1 parent 6dda57a commit 12c31f3

File tree

4 files changed

+28
-2
lines changed

4 files changed

+28
-2
lines changed

bigquery_etl/query_scheduling/dag.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class Dag:
122122
repo: str = attr.ib("bigquery-etl")
123123
tags: List[str] = attr.ib([])
124124
catchup: bool = attr.ib(False)
125+
max_active_runs: Optional[int] = attr.ib(None)
125126

126127
@name.validator
127128
def validate_dag_name(self, attribute, value):

bigquery_etl/query_scheduling/templates/airflow_dag.j2

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,19 @@ default_args = {{
8282

8383
tags = {{ tags }}
8484

85-
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md=docs, tags=tags, catchup={{ catchup }}) as dag:
85+
with DAG(
86+
'{{ name }}',
87+
default_args=default_args,
88+
{% if schedule_interval != None -%}
89+
schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }},
90+
{%- endif %}
91+
{% if max_active_runs -%}
92+
max_active_runs={{ max_active_runs }},
93+
{%- endif %}
94+
doc_md=docs,
95+
tags=tags,
96+
catchup={{ catchup }},
97+
) as dag:
8698
{% for task_group in task_groups | sort %}
8799
task_group_{{ task_group }} = TaskGroup('{{ task_group }}')
88100
{% endfor %}

bigquery_etl/query_scheduling/templates/public_data_json_airflow_dag.j2

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,18 @@ default_args = {{
3838

3939
tags = {{ tags }}
4040

41-
with DAG('{{ name }}', default_args=default_args{%+ if schedule_interval != None -%}, schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }}{%+ endif -%}, doc_md = docs, tags = tags) as dag:
41+
with DAG(
42+
'{{ name }}',
43+
default_args=default_args,
44+
{% if schedule_interval != None -%}
45+
schedule_interval={{ schedule_interval | format_timedelta | format_schedule_interval }},
46+
{%- endif %}
47+
{% if max_active_runs -%}
48+
max_active_runs={{ max_active_runs }},
49+
{%- endif %}
50+
doc_md=docs,
51+
tags=tags,
52+
) as dag:
4253
docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"
4354
{% for task_group in task_groups | sort %}
4455
task_group_{{ task_group }} = TaskGroup('{{ task_group }}')

dags.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ bqetl_subplat:
184184

185185
bqetl_subplat_hourly:
186186
schedule_interval: 30 * * * *
187+
# Limit concurrency to avoid any ETLs running while the Fivetran Stripe sync may be in progress.
188+
max_active_runs: 1
187189
catchup: true
188190
description: |
189191
Hourly imports for Subscription Platform data from Stripe and Firestore,

0 commit comments

Comments
 (0)