Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ workflows:
- validate-views
- validate-metadata
- dry-run-sql
- test-routines
- test-routines:
requires:
- deploy-changes-to-stage
Expand Down
3 changes: 3 additions & 0 deletions bigquery_etl/cli/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2300,6 +2300,9 @@ def _update_query_schema(
query_schema = Schema.from_query_file(
query_file_path,
content=sql_content,
project=project_name,
dataset=dataset_name,
table=table_name,
Comment on lines +2303 to +2305
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out this particular change is causing problems with deploying ETLs to stage:

use_cloud_function=use_cloud_function,
respect_skip=respect_dryrun_skip,
sql_dir=sql_dir,
Expand Down
169 changes: 149 additions & 20 deletions bigquery_etl/dryrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
"""

import glob
import hashlib
import json
import os
import pickle
import random
import re
import shutil
import sys
import tempfile
import time
from enum import Enum
from os.path import basename, dirname, exists
Expand Down Expand Up @@ -106,10 +111,12 @@ def __init__(
dataset=None,
table=None,
billing_project=None,
use_cache=True,
):
"""Instantiate DryRun class."""
self.sqlfile = sqlfile
self.content = content
self.use_cache = use_cache
self.query_parameters = query_parameters
self.strip_dml = strip_dml
self.use_cloud_function = use_cloud_function
Expand Down Expand Up @@ -192,6 +199,17 @@ def skipped_files(sql_dir=ConfigLoader.get("default", "sql_dir")) -> Set[str]:

return skip_files

@staticmethod
def clear_cache():
"""Clear dry run cache directory."""
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
if cache_dir.exists():
try:
shutil.rmtree(cache_dir)
print(f"Cleared dry run cache at {cache_dir}")
except OSError as e:
print(f"Warning: Failed to clear dry run cache: {e}")

def skip(self):
"""Determine if dry run should be skipped."""
return self.respect_skip and self.sqlfile in self.skipped_files(
Expand Down Expand Up @@ -225,6 +243,108 @@ def get_sql(self):

return sql

def _get_cache_key(self, sql):
"""Generate cache key based on SQL content and other parameters."""
cache_input = f"{sql}|{self.project}|{self.dataset}|{self.table}"
return hashlib.sha256(cache_input.encode()).hexdigest()

@staticmethod
def _get_cache_dir():
"""Get the cache directory path."""
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir

def _read_cache_file(self, cache_file, ttl_seconds):
"""Read and return cached data from a pickle file with TTL check."""
try:
if not cache_file.exists():
return None

# check if cache is expired
file_age = time.time() - cache_file.stat().st_mtime
if file_age > ttl_seconds:
try:
cache_file.unlink()
except OSError:
pass
return None

cached_data = pickle.loads(cache_file.read_bytes())
return cached_data
except (pickle.PickleError, EOFError, OSError, FileNotFoundError) as e:
print(f"[CACHE] Failed to load {cache_file}: {e}")
try:
if cache_file.exists():
cache_file.unlink()
except OSError:
pass
return None

@staticmethod
def _write_cache_file(cache_file, data):
"""Write data to a cache file using atomic write."""
try:
# write to temporary file first, then atomically rename
# this prevents race conditions where readers get partial files
# include random bytes to handle thread pool scenarios where threads share same PID
temp_file = Path(
str(cache_file) + f".tmp.{os.getpid()}.{os.urandom(4).hex()}"
)
with open(temp_file, "wb") as f:
pickle.dump(data, f)
f.flush()
os.fsync(f.fileno()) # Ensure data is written to disk

temp_file.replace(cache_file)
except (pickle.PickleError, OSError) as e:
print(f"[CACHE] Failed to save {cache_file}: {e}")
try:
if "temp_file" in locals() and temp_file.exists():
temp_file.unlink()
except (OSError, NameError):
pass

def _get_cached_result(self, cache_key, ttl_seconds=None):
"""Load cached dry run result from disk."""
if ttl_seconds is None:
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)

cache_file = self._get_cache_dir() / f"dryrun_{cache_key}.pkl"
return self._read_cache_file(cache_file, ttl_seconds)

def _save_cached_result(self, cache_key, result):
"""Save dry run result to disk cache using atomic write."""
cache_file = self._get_cache_dir() / f"dryrun_{cache_key}.pkl"
self._write_cache_file(cache_file, result)

# save table metadata separately if present
if (
result
and "tableMetadata" in result
and self.project
and self.dataset
and self.table
):
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
self._save_cached_table_metadata(table_identifier, result["tableMetadata"])

def _get_cached_table_metadata(self, table_identifier, ttl_seconds=None):
"""Load cached table metadata from disk based on table identifier."""
if ttl_seconds is None:
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)

# table identifier as cache key
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
cache_file = self._get_cache_dir() / f"table_metadata_{table_cache_key}.pkl"
return self._read_cache_file(cache_file, ttl_seconds)

def _save_cached_table_metadata(self, table_identifier, metadata):
"""Save table metadata to disk cache using atomic write."""
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
cache_file = self._get_cache_dir() / f"table_metadata_{table_cache_key}.pkl"
self._write_cache_file(cache_file, metadata)

@cached_property
def dry_run_result(self):
"""Dry run the provided SQL file."""
Expand All @@ -233,6 +353,14 @@ def dry_run_result(self):
else:
sql = self.get_sql()

# check cache first (if caching is enabled)
if sql is not None and self.use_cache:
cache_key = self._get_cache_key(sql)
cached_result = self._get_cached_result(cache_key)
if cached_result is not None:
self.dry_run_duration = 0 # Cached result, no actual dry run
return cached_result

query_parameters = []
if self.query_parameters:
for parameter_name, parameter_type in self.query_parameters.items():
Expand Down Expand Up @@ -351,6 +479,12 @@ def dry_run_result(self):
}

self.dry_run_duration = time.time() - start_time

# Save to cache (if caching is enabled and result is valid)
# Don't cache errors to allow retries
if self.use_cache and result.get("valid"):
self._save_cached_result(cache_key, result)

return result

except Exception as e:
Expand Down Expand Up @@ -476,6 +610,13 @@ def get_table_schema(self):
):
return self.dry_run_result["tableMetadata"]["schema"]

# Check if table metadata is cached (if caching is enabled)
if self.use_cache and self.project and self.dataset and self.table:
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
cached_metadata = self._get_cached_table_metadata(table_identifier)
if cached_metadata:
return cached_metadata["schema"]

return []

def get_dataset_labels(self):
Expand Down Expand Up @@ -565,6 +706,13 @@ def validate_schema(self):
return True

query_file_path = Path(self.sqlfile)
table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name
self.project = project_name
self.dataset = dataset_name
self.table = table_name

query_schema = Schema.from_json(self.get_schema())
if self.errors():
# ignore file when there are errors that self.get_schema() did not raise
Expand All @@ -576,26 +724,7 @@ def validate_schema(self):
click.echo(f"No schema file defined for {query_file_path}", err=True)
return True

table_name = query_file_path.parent.name
dataset_name = query_file_path.parent.parent.name
project_name = query_file_path.parent.parent.parent.name

partitioned_by = None
if (
self.metadata
and self.metadata.bigquery
and self.metadata.bigquery.time_partitioning
):
partitioned_by = self.metadata.bigquery.time_partitioning.field

table_schema = Schema.for_table(
project_name,
dataset_name,
table_name,
client=self.client,
id_token=self.id_token,
partitioned_by=partitioned_by,
)
table_schema = Schema.from_json(self.get_table_schema())

# This check relies on the new schema being deployed to prod
if not query_schema.compatible(table_schema):
Expand Down
17 changes: 14 additions & 3 deletions bigquery_etl/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from google.cloud.bigquery import SchemaField

from .. import dryrun
from ..config import ConfigLoader

SCHEMA_FILE = "schema.yaml"

Expand Down Expand Up @@ -58,24 +59,34 @@ def from_json(cls, json_schema):
return cls(json_schema)

@classmethod
def for_table(cls, project, dataset, table, partitioned_by=None, *args, **kwargs):
def for_table(
cls,
project,
dataset,
table,
partitioned_by=None,
filename="query.sql",
*args,
**kwargs,
):
"""Get the schema for a BigQuery table."""
query = f"SELECT * FROM `{project}.{dataset}.{table}`"

if partitioned_by:
query += f" WHERE DATE(`{partitioned_by}`) = DATE('2020-01-01')"

try:
sql_dir = ConfigLoader.get("default", "sql_dir")
return cls(
dryrun.DryRun(
os.path.join(project, dataset, table, "query.sql"),
os.path.join(sql_dir, project, dataset, table, filename),
query,
project=project,
dataset=dataset,
table=table,
*args,
**kwargs,
).get_schema()
).get_table_schema()
)
except Exception as e:
print(f"Cannot get schema for {project}.{dataset}.{table}: {e}")
Expand Down
9 changes: 8 additions & 1 deletion bigquery_etl/schema/stable_table_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def prod_schemas_uri():
with the most recent production schemas deploy.
"""
dryrun = DryRun(
"moz-fx-data-shared-prod/telemetry_derived/foo/query.sql", content="SELECT 1"
"moz-fx-data-shared-prod/telemetry_derived/foo/query.sql",
content="SELECT 1",
use_cache=False,
)
build_id = dryrun.get_dataset_labels()["schemas_build_id"]
commit_hash = build_id.split("_")[-1]
Expand Down Expand Up @@ -88,6 +90,11 @@ def get_stable_table_schemas() -> List[SchemaFile]:
print(f"Failed to load cached schemas: {e}, re-downloading...")

print(f"Downloading schemas from {schemas_uri}")

# Clear dry run cache when downloading new schemas
# Schema changes could affect dry run results
DryRun.clear_cache()

with urllib.request.urlopen(schemas_uri) as f:
tarbytes = BytesIO(f.read())

Expand Down
1 change: 1 addition & 0 deletions bqetl_project.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dry_run:
function_accounts:
- bigquery-etl-dryrun@moz-fx-data-shared-prod.iam.gserviceaccount.com
- bigquery-etl-dryrun@moz-fx-data-shar-nonprod-efed.iam.gserviceaccount.com
cache_ttl_seconds: 900 # Cache dry run results for 15 minutes (900 seconds)
skip:
## skip all data-observability-dev queries due to CI lacking permissions in that project.
# TODO: once data observability platform assessment concludes this should be removed.
Expand Down
1 change: 1 addition & 0 deletions sql_generators/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ The directories in `sql_generators/` represent the generated queries and will co
Each `__init__.py` file needs to implement a `generate()` method that is configured as a [click command](https://click.palletsprojects.com/en/8.0.x/). The `bqetl` CLI will automatically add these commands to the `./bqetl query generate` command group.

After changes to a schema or adding new tables, the schema is automatically derived from the query and deployed the next day in DAG [bqetl_artifact_deployment](https://workflow.telemetry.mozilla.org/dags/bqetl_artifact_deployment/grid). Alternatively, it can be manually generated and deployed using `./bqetl generate all` and `./bqetl query schema deploy`.

Loading