Skip to content

Commit cbd4139

Browse files
authored
[DENG-9397] Publish UDFs in parallel using ParallelTopologicalSorter (#8112)
1 parent 29f3e27 commit cbd4139

File tree

1 file changed

+69
-15
lines changed

1 file changed

+69
-15
lines changed

bigquery_etl/routine/publish_routines.py

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import re
88
from argparse import ArgumentParser
9+
from functools import partial
910

1011
from google.cloud import storage # type: ignore
1112
from google.cloud import bigquery
@@ -14,6 +15,7 @@
1415
from bigquery_etl.routine.parse_routine import accumulate_dependencies, read_routine_dir
1516
from bigquery_etl.util import standard_args
1617
from bigquery_etl.util.common import project_dirs
18+
from bigquery_etl.util.parallel_topological_sorter import ParallelTopologicalSorter
1719

1820
OPTIONS_LIB_RE = re.compile(r'library = "gs://[^"]+/([^"]+)"')
1921
OPTIONS_RE = re.compile(r"OPTIONS(\n|\s)*\(")
@@ -57,6 +59,13 @@
5759
default=False,
5860
help="The published UDFs should be publicly accessible.",
5961
)
62+
parser.add_argument(
63+
"--parallelism",
64+
"-p",
65+
type=int,
66+
default=8,
67+
help="Number of parallel processes to use for publishing routines.",
68+
)
6069
standard_args.add_log_level(parser)
6170
parser.add_argument(
6271
"pattern",
@@ -86,6 +95,7 @@ def main():
8695
args.gcs_path,
8796
args.public,
8897
pattern=args.pattern,
98+
parallelism=args.parallelism,
8999
)
90100

91101

@@ -101,6 +111,30 @@ def skipped_routines():
101111
}
102112

103113

114+
def _publish_udf_worker(
115+
udf_name,
116+
followup_queue,
117+
raw_routines,
118+
project_id,
119+
gcs_bucket,
120+
gcs_path,
121+
public,
122+
dry_run,
123+
):
124+
"""Worker function for publishing a single UDF."""
125+
client = bigquery.Client(project_id)
126+
publish_routine(
127+
raw_routines[udf_name],
128+
client,
129+
project_id,
130+
gcs_bucket,
131+
gcs_path,
132+
list(raw_routines.keys()),
133+
public,
134+
dry_run=dry_run,
135+
)
136+
137+
104138
def publish(
105139
target,
106140
project_id,
@@ -110,18 +144,17 @@ def publish(
110144
public,
111145
pattern=None,
112146
dry_run=False,
147+
parallelism=8,
113148
):
114149
"""Publish routines in the provided directory."""
115-
client = bigquery.Client(project_id)
116-
117150
if dependency_dir and os.path.exists(dependency_dir):
118151
push_dependencies_to_gcs(
119152
gcs_bucket, gcs_path, dependency_dir, os.path.basename(target)
120153
)
121154

122155
raw_routines = read_routine_dir(target)
123156

124-
published_routines = []
157+
all_udfs_to_publish = set()
125158

126159
for raw_routine in (
127160
raw_routines if pattern is None else fnmatch.filter(raw_routines, pattern)
@@ -134,20 +167,41 @@ def publish(
134167

135168
for dep in udfs_to_publish:
136169
if (
137-
dep not in published_routines
170+
dep not in all_udfs_to_publish
138171
and raw_routines[dep].filepath not in skipped_routines()
139172
):
140-
publish_routine(
141-
raw_routines[dep],
142-
client,
143-
project_id,
144-
gcs_bucket,
145-
gcs_path,
146-
raw_routines.keys(),
147-
public,
148-
dry_run=dry_run,
149-
)
150-
published_routines.append(dep)
173+
all_udfs_to_publish.add(dep)
174+
175+
unique_udfs_to_publish = list(all_udfs_to_publish)
176+
dependencies = {}
177+
all_udfs = set(unique_udfs_to_publish)
178+
179+
for udf in unique_udfs_to_publish:
180+
udf_deps = accumulate_dependencies([], raw_routines, udf)
181+
dependencies[udf] = set(
182+
dep for dep in udf_deps if dep in all_udfs and dep != udf
183+
)
184+
185+
publish_udf = partial(
186+
_publish_udf_worker,
187+
raw_routines=raw_routines,
188+
project_id=project_id,
189+
gcs_bucket=gcs_bucket,
190+
gcs_path=gcs_path,
191+
public=public,
192+
dry_run=dry_run,
193+
)
194+
195+
# use topological sorter to publish UDFs in order;
196+
# in theory UDFs could be published in parallel, however if a deploy fails
197+
# it can leave UDFs in a broken state (e.g. referencing a UDF that failed to publish)
198+
if parallelism > 1 and unique_udfs_to_publish:
199+
sorter = ParallelTopologicalSorter(dependencies, parallelism=parallelism)
200+
sorter.map(publish_udf)
201+
else:
202+
# sequential publishing fallback
203+
for udf_name in unique_udfs_to_publish:
204+
publish_udf(udf_name, None)
151205

152206

153207
def publish_routine(

0 commit comments

Comments
 (0)