|
19 | 19 | from pathos.multiprocessing import ProcessingPool |
20 | 20 |
|
21 | 21 | from bigquery_etl.cli.utils import use_cloud_function_option |
22 | | -from bigquery_etl.schema.stable_table_schema import SchemaFile, get_stable_table_schemas |
23 | | -from bigquery_etl.dryrun import get_id_token |
24 | 22 | from bigquery_etl.config import ConfigLoader |
| 23 | +from bigquery_etl.dryrun import get_id_token |
| 24 | +from bigquery_etl.schema.stable_table_schema import SchemaFile, get_stable_table_schemas |
25 | 25 |
|
26 | | -BOT_GENERATED = 'LOWER(IFNULL(metadata.isp.name, "")) = "browserstack" AS is_bot_generated' |
| 26 | +BOT_GENERATED = ( |
| 27 | + 'LOWER(IFNULL(metadata.isp.name, "")) = "browserstack" AS is_bot_generated' |
| 28 | +) |
27 | 29 |
|
28 | 30 | VIEW_QUERY_TEMPLATE = """\ |
29 | 31 | -- Generated via ./bqetl generate stable_views |
|
63 | 65 | * REPLACE( |
64 | 66 | {replacements}), |
65 | 67 | `moz-fx-data-shared-prod`.udf.funnel_derived_installs( |
66 | | - silent, |
67 | | - submission_timestamp, |
68 | | - build_id, |
69 | | - attribution, |
| 68 | + silent, |
| 69 | + submission_timestamp, |
| 70 | + build_id, |
| 71 | + attribution, |
70 | 72 | distribution_id |
71 | 73 | ) AS funnel_derived, |
72 | 74 | `moz-fx-data-shared-prod`.udf.distribution_model_installs(distribution_id) AS distribution_model, |
@@ -145,7 +147,7 @@ def write_dataset_metadata_if_not_exists( |
145 | 147 |
|
146 | 148 |
|
147 | 149 | def write_view_if_not_exists( |
148 | | - target_project: str, sql_dir: Path, id_token=None, schema: SchemaFile = None |
| 150 | + target_project: str, sql_dir: Path, schema: SchemaFile, id_token=None |
149 | 151 | ): |
150 | 152 | """If a view.sql does not already exist, write one to the target directory.""" |
151 | 153 | # add imports here to run in multiple processes via pathos |
@@ -224,7 +226,7 @@ def write_view_if_not_exists( |
224 | 226 | for metrics_datetime_field in metrics_field["fields"] |
225 | 227 | ]: |
226 | 228 | datetime_replacements_clause = ( |
227 | | - f"REPLACE (STRUCT(" |
| 229 | + "REPLACE (STRUCT(" |
228 | 230 | + ", ".join( |
229 | 231 | field_select |
230 | 232 | for field in metrics_datetime_fields |
@@ -287,7 +289,7 @@ def write_view_if_not_exists( |
287 | 289 | ) |
288 | 290 |
|
289 | 291 | replacements += [ |
290 | | - f"(SELECT AS STRUCT " |
| 292 | + "(SELECT AS STRUCT " |
291 | 293 | + ", ".join([metrics_select] + metrics_2_aliases) |
292 | 294 | + ") AS metrics" |
293 | 295 | ] |
@@ -457,26 +459,29 @@ def generate(target_project, output_dir, log_level, parallelism, use_cloud_funct |
457 | 459 | skipped_tables_config = ConfigLoader.get( |
458 | 460 | "generate", "stable_views", "skip_tables", fallback={} |
459 | 461 | ) |
| 462 | + skipped_datasets_config = ConfigLoader.get( |
| 463 | + "generate", "stable_views", "skip_datasets", fallback=[] |
| 464 | + ) |
460 | 465 | schemas = [ |
461 | | - schema for schema in |
462 | | - get_stable_table_schemas() |
463 | | - if schema.bq_table_unversioned not in skipped_tables_config.get(schema.bq_dataset_family, []) |
| 466 | + schema |
| 467 | + for schema in get_stable_table_schemas() |
| 468 | + if schema.bq_table_unversioned |
| 469 | + not in skipped_tables_config.get(schema.bq_dataset_family, []) |
| 470 | + and schema.bq_dataset_family not in skipped_datasets_config |
464 | 471 | ] |
465 | 472 | one_schema_per_dataset = [ |
466 | | - last |
467 | | - for k, (*_, last) in groupby(schemas, lambda t: t.bq_dataset_family) |
468 | | - if k |
469 | | - not in ConfigLoader.get( |
470 | | - "generate", "stable_views", "skip_datasets", fallback=[] |
471 | | - ) |
| 473 | + last for k, (*_, last) in groupby(schemas, lambda t: t.bq_dataset_family) |
472 | 474 | ] |
473 | 475 |
|
474 | 476 | id_token = get_id_token() |
475 | 477 |
|
476 | 478 | with ProcessingPool(parallelism) as pool: |
477 | 479 | pool.map( |
478 | 480 | partial( |
479 | | - write_view_if_not_exists, target_project, Path(output_dir), id_token |
| 481 | + write_view_if_not_exists, |
| 482 | + target_project, |
| 483 | + Path(output_dir), |
| 484 | + id_token=id_token, |
480 | 485 | ), |
481 | 486 | schemas, |
482 | 487 | ) |
|
0 commit comments