99
1010import json
1111import logging
12+ import re
1213from datetime import datetime , timedelta
1314from functools import partial
1415from itertools import groupby
1516from multiprocessing .pool import ThreadPool
17+ from typing import List
1618
1719import click
1820from google .api_core .exceptions import BadRequest
1921from google .cloud import bigquery
2022
2123from bigquery_etl .cli .utils import table_matches_patterns
24+ from bigquery_etl .config import ConfigLoader
2225from bigquery_etl .util .bigquery_id import sql_table_id
2326from bigquery_etl .util .client_queue import ClientQueue
2427from bigquery_etl .util .common import TempDatasetReference
7376 --
7477 -- Retain only one document for each ID.
7578SELECT
76- * EXCEPT(_n)
79+ * EXCEPT(_n){replace_geo}
7780FROM
7881 numbered_duplicates
7982WHERE
8083 _n = 1
8184"""
8285
8386
87+ def _has_field_path (schema : List [bigquery .SchemaField ], path : List [str ]) -> bool :
88+ """Return True if nested field path (e.g., ['metadata','geo','city']) exists."""
89+ for name in path :
90+ f = next ((field for field in schema if field .name == name ), None )
91+ if not f :
92+ return False
93+ schema = getattr (f , "fields" , []) or []
94+ return True
95+
96+
97+ def _select_geo (live_table : str , client : bigquery .Client ) -> str :
98+ """Build a SELECT REPLACE clause that NULLs metadata.geo.* if applicable."""
99+ _ , dataset_id , table_id = live_table .split ("." )
100+
101+ excluded_tables = set (
102+ ConfigLoader .get ("geo_deprecation" , "skip_tables" , fallback = [])
103+ )
104+ if re .sub (r"_v\d+$" , "" , table_id ) in excluded_tables :
105+ return ""
106+
107+ app_id = dataset_id .removesuffix ("_live" )
108+ included_apps = set (
109+ ConfigLoader .get ("geo_deprecation" , "include_app_ids" , fallback = [])
110+ )
111+ if app_id not in included_apps :
112+ return ""
113+
114+ table = client .get_table (live_table )
115+
116+ include_client_id = table .labels .get ("include_client_id" ) == "true"
117+ if not include_client_id :
118+ return ""
119+
120+ # Check schema to ensure geo fields exists
121+ schema = table .schema
122+ required_fields = ("city" , "subdivision1" , "subdivision2" )
123+ has_required_fields = all (
124+ _has_field_path (schema , ["metadata" , "geo" , field ]) for field in required_fields
125+ )
126+ if not has_required_fields :
127+ return ""
128+
129+ return """
130+ REPLACE (
131+ (SELECT AS STRUCT
132+ metadata.* REPLACE (
133+ (SELECT AS STRUCT
134+ metadata.geo.* REPLACE (
135+ CAST(NULL AS STRING) AS city,
136+ CAST(NULL AS STRING) AS subdivision1,
137+ CAST(NULL AS STRING) AS subdivision2
138+ )
139+ ) AS geo
140+ )
141+ ) AS metadata)
142+ """
143+
144+
84145def _get_query_job_configs (
85146 client ,
86147 live_table ,
@@ -92,7 +153,8 @@ def _get_query_job_configs(
92153 num_retries ,
93154 temp_dataset ,
94155):
95- sql = QUERY_TEMPLATE .format (live_table = live_table )
156+ replace_geo = _select_geo (live_table , client )
157+ sql = QUERY_TEMPLATE .format (live_table = live_table , replace_geo = replace_geo )
96158 stable_table = f"{ live_table .replace ('_live.' , '_stable.' , 1 )} ${ date :%Y%m%d} "
97159 kwargs = dict (use_legacy_sql = False , dry_run = dry_run , priority = priority )
98160 start_time = datetime (* date .timetuple ()[:6 ])
0 commit comments