Skip to content

Commit 63b77e5

Browse files
committed
Address feedback
1 parent 851c6f5 commit 63b77e5

File tree

2 files changed

+47
-83
lines changed

2 files changed

+47
-83
lines changed

.circleci/workflows.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,7 @@ workflows:
12461246
- validate-views
12471247
- validate-metadata
12481248
- dry-run-sql
1249+
- test-routines
12491250
- test-routines:
12501251
requires:
12511252
- deploy-changes-to-stage

bigquery_etl/dryrun.py

Lines changed: 46 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -248,15 +248,15 @@ def _get_cache_key(self, sql):
248248
cache_input = f"{sql}|{self.project}|{self.dataset}|{self.table}"
249249
return hashlib.sha256(cache_input.encode()).hexdigest()
250250

251-
def _get_cached_result(self, cache_key, ttl_seconds=None):
252-
"""Load cached dry run result from disk."""
253-
if ttl_seconds is None:
254-
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
255-
251+
@staticmethod
252+
def _get_cache_dir():
253+
"""Get the cache directory path."""
256254
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
257255
cache_dir.mkdir(parents=True, exist_ok=True)
258-
cache_file = cache_dir / f"dryrun_{cache_key}.pkl"
256+
return cache_dir
259257

258+
def _read_cache_file(self, cache_file, ttl_seconds):
259+
"""Read and return cached data from a pickle file with TTL check."""
260260
try:
261261
if not cache_file.exists():
262262
return None
@@ -271,116 +271,79 @@ def _get_cached_result(self, cache_key, ttl_seconds=None):
271271
return None
272272

273273
cached_data = pickle.loads(cache_file.read_bytes())
274-
cache_age = time.time() - cache_file.stat().st_mtime
275-
print(f"[DRYRUN CACHE HIT] {self.sqlfile} (age: {cache_age:.0f}s)")
276274
return cached_data
277275
except (pickle.PickleError, EOFError, OSError, FileNotFoundError) as e:
278-
print(f"[DRYRUN CACHE] Failed to load cache: {e}")
276+
print(f"[CACHE] Failed to load {cache_file}: {e}")
279277
try:
280278
if cache_file.exists():
281279
cache_file.unlink()
282280
except OSError:
283281
pass
284282
return None
285283

286-
def _save_cached_result(self, cache_key, result):
287-
"""Save dry run result to disk cache using atomic write."""
288-
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
289-
cache_dir.mkdir(parents=True, exist_ok=True)
290-
cache_file = cache_dir / f"dryrun_{cache_key}.pkl"
291-
284+
@staticmethod
285+
def _write_cache_file(cache_file, data):
286+
"""Write data to a cache file using atomic write."""
292287
try:
293288
# write to temporary file first, then atomically rename
294289
# this prevents race conditions where readers get partial files
295-
temp_file = Path(str(cache_file) + f".tmp.{os.getpid()}")
290+
# include random bytes to handle thread pool scenarios where threads share same PID
291+
temp_file = Path(
292+
str(cache_file) + f".tmp.{os.getpid()}.{os.urandom(4).hex()}"
293+
)
296294
with open(temp_file, "wb") as f:
297-
pickle.dump(result, f)
295+
pickle.dump(data, f)
298296
f.flush()
299297
os.fsync(f.fileno()) # Ensure data is written to disk
300298

301299
temp_file.replace(cache_file)
302-
303-
# save table metadata separately if present
304-
if (
305-
result
306-
and "tableMetadata" in result
307-
and self.project
308-
and self.dataset
309-
and self.table
310-
):
311-
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
312-
self._save_cached_table_metadata(
313-
table_identifier, result["tableMetadata"]
314-
)
315300
except (pickle.PickleError, OSError) as e:
316-
print(f"[DRYRUN CACHE] Failed to save cache: {e}")
301+
print(f"[CACHE] Failed to save {cache_file}: {e}")
317302
try:
318-
temp_file = Path(str(cache_file) + f".tmp.{os.getpid()}")
319-
if temp_file.exists():
303+
if "temp_file" in locals() and temp_file.exists():
320304
temp_file.unlink()
321-
except OSError:
305+
except (OSError, NameError):
322306
pass
323307

324-
def _get_cached_table_metadata(self, table_identifier, ttl_seconds=None):
325-
"""Load cached table metadata from disk based on table identifier."""
308+
def _get_cached_result(self, cache_key, ttl_seconds=None):
309+
"""Load cached dry run result from disk."""
326310
if ttl_seconds is None:
327311
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
328312

329-
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
330-
cache_dir.mkdir(parents=True, exist_ok=True)
331-
# table identifier as cache key
332-
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
333-
cache_file = cache_dir / f"table_metadata_{table_cache_key}.pkl"
313+
cache_file = self._get_cache_dir() / f"dryrun_{cache_key}.pkl"
314+
return self._read_cache_file(cache_file, ttl_seconds)
334315

335-
try:
336-
if not cache_file.exists():
337-
return None
316+
def _save_cached_result(self, cache_key, result):
317+
"""Save dry run result to disk cache using atomic write."""
318+
cache_file = self._get_cache_dir() / f"dryrun_{cache_key}.pkl"
319+
self._write_cache_file(cache_file, result)
338320

339-
# check if cache is expired
340-
file_age = time.time() - cache_file.stat().st_mtime
321+
# save table metadata separately if present
322+
if (
323+
result
324+
and "tableMetadata" in result
325+
and self.project
326+
and self.dataset
327+
and self.table
328+
):
329+
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
330+
self._save_cached_table_metadata(table_identifier, result["tableMetadata"])
341331

342-
if file_age > ttl_seconds:
343-
try:
344-
cache_file.unlink()
345-
except OSError:
346-
pass
347-
return None
332+
def _get_cached_table_metadata(self, table_identifier, ttl_seconds=None):
333+
"""Load cached table metadata from disk based on table identifier."""
334+
if ttl_seconds is None:
335+
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
348336

349-
cached_data = pickle.loads(cache_file.read_bytes())
350-
return cached_data
351-
except (pickle.PickleError, EOFError, OSError, FileNotFoundError) as e:
352-
print(f"[TABLE METADATA] Failed to load cache for {table_identifier}: {e}")
353-
try:
354-
if cache_file.exists():
355-
cache_file.unlink()
356-
except OSError:
357-
pass
358-
return None
337+
# table identifier as cache key
338+
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
339+
cache_file = self._get_cache_dir() / f"table_metadata_{table_cache_key}.pkl"
340+
return self._read_cache_file(cache_file, ttl_seconds)
359341

360342
def _save_cached_table_metadata(self, table_identifier, metadata):
361343
"""Save table metadata to disk cache using atomic write."""
362-
cache_dir = Path(tempfile.gettempdir()) / "bigquery_etl_dryrun_cache"
363-
cache_dir.mkdir(parents=True, exist_ok=True)
364344
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
365-
cache_file = cache_dir / f"table_metadata_{table_cache_key}.pkl"
366-
367-
try:
368-
# write to temporary file first, then atomically rename
369-
temp_file = Path(str(cache_file) + f".tmp.{os.getpid()}")
370-
with open(temp_file, "wb") as f:
371-
pickle.dump(metadata, f)
372-
f.flush()
373-
os.fsync(f.fileno())
374-
375-
temp_file.replace(cache_file)
376-
except (pickle.PickleError, OSError) as e:
377-
print(f"[TABLE METADATA] Failed to save cache for {table_identifier}: {e}")
378-
try:
379-
temp_file = Path(str(cache_file) + f".tmp.{os.getpid()}")
380-
if temp_file.exists():
381-
temp_file.unlink()
382-
except OSError:
383-
pass
345+
cache_file = self._get_cache_dir() / f"table_metadata_{table_cache_key}.pkl"
346+
self._write_cache_file(cache_file, metadata)
384347

385348
@cached_property
386349
def dry_run_result(self):

0 commit comments

Comments
 (0)