diff --git a/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py b/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py index 9e04ebb6..3c9a3f87 100644 --- a/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py +++ b/jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py @@ -63,14 +63,23 @@ def ensure_table(self, table_type: str) -> Table: schema_cls = get_record_cls(table_type) schema = generate_schema(schema_cls) - partition = TimePartitioning( - type_=TimePartitioningType.DAY, - field="submission_date", - require_partition_filter=True, - ) - table = Table(self.table_name, schema=schema) - table.time_partitioning = partition - self.client.create_table(table, exists_ok=True) + try: + table = self.client.get_table(self.table_name) + + # Table exists, validate the schema + if schema != table.schema: + raise Exception(f"Schema mismatch detected for {self.table_name}!") + + except NotFound: + # Table doesn't exist, create it + table = Table(self.table_name, schema=schema) + table.time_partitioning = TimePartitioning( + type_=TimePartitioningType.DAY, + field="submission_date", + require_partition_filter=True, + ) + self.client.create_table(table) + return table def replace(self, submission_date: str, records: list[Record] | Record): diff --git a/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py b/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py index 87c73141..1ed91952 100644 --- a/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py +++ b/jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py @@ -93,6 +93,9 @@ def __init__(self, config: Config, **kwargs: Any): self.task_records: list[Record] = [] self.run_records: list[Record] = [] + self.task_loader = BigQueryLoader(self.config, "tasks") + self.run_loader = BigQueryLoader(self.config, "runs") + self._convert_camel_case_re = re.compile(r"(?