Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions jobs/fxci-taskcluster-export/fxci_etl/loaders/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,23 @@ def ensure_table(self, table_type: str) -> Table:
schema_cls = get_record_cls(table_type)
schema = generate_schema(schema_cls)

partition = TimePartitioning(
type_=TimePartitioningType.DAY,
field="submission_date",
require_partition_filter=True,
)
table = Table(self.table_name, schema=schema)
table.time_partitioning = partition
self.client.create_table(table, exists_ok=True)
try:
table = self.client.get_table(self.table_name)

# Table exists, validate the schema
if schema != table.schema:
raise Exception(f"Schema mismatch detected for {self.table_name}!")

except NotFound:
# Table doesn't exist, create it
table = Table(self.table_name, schema=schema)
table.time_partitioning = TimePartitioning(
type_=TimePartitioningType.DAY,
field="submission_date",
require_partition_filter=True,
)
self.client.create_table(table)

return table

def replace(self, submission_date: str, records: list[Record] | Record):
Expand Down
9 changes: 5 additions & 4 deletions jobs/fxci-taskcluster-export/fxci_etl/pulse/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def __init__(self, config: Config, **kwargs: Any):
self.task_records: list[Record] = []
self.run_records: list[Record] = []

self.task_loader = BigQueryLoader(self.config, "tasks")
self.run_loader = BigQueryLoader(self.config, "runs")

self._convert_camel_case_re = re.compile(r"(?<!^)(?=[A-Z])")
self._known_tags = set(Tags.__annotations__.keys())

Expand Down Expand Up @@ -168,11 +171,9 @@ def process_event(self, event):
def on_processing_complete(self):
logger.info(f"Processed {self._count} pulse events")
if self.task_records:
task_loader = BigQueryLoader(self.config, "tasks")
task_loader.insert(self.task_records)
self.task_loader.insert(self.task_records)
self.task_records = []

if self.run_records:
run_loader = BigQueryLoader(self.config, "runs")
run_loader.insert(self.run_records)
self.run_loader.insert(self.run_records)
self.run_records = []
77 changes: 77 additions & 0 deletions jobs/fxci-taskcluster-export/test/test_loader_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pytest
from google.cloud.bigquery import SchemaField
from google.cloud.exceptions import NotFound
from google.cloud import storage

from fxci_etl.loaders import bigquery
from fxci_etl.schemas import generate_schema, get_record_cls


@pytest.fixture(autouse=True)
def storage_mock(mocker):
storage_mock = mocker.MagicMock()
mocker.patch.object(storage, "Client", return_value=storage_mock)


@pytest.fixture
def client_mock(mocker):
client_mock = mocker.MagicMock()
mocker.patch.object(bigquery, "Client", return_value=client_mock)
return client_mock


@pytest.fixture
def run_ensure_table(make_config, mocker, client_mock):
table = "tasks"
config = make_config(
**{
"bigquery": {
"project": "project",
"dataset": "dataset",
"tables": {table: "table_v1"},
}
}
)

def inner(schema=None, exists=True):
if not schema:
schema = generate_schema(get_record_cls(table))

if exists:
table_mock = mocker.MagicMock()
table_mock.schema = schema
client_mock.get_table.return_value = table_mock
else:
client_mock.get_table.side_effect = NotFound("message")

return bigquery.BigQueryLoader(config, table)

return inner


def test_ensure_table_schemas_match(run_ensure_table):
loader = run_ensure_table()
loader.client.get_table.assert_called_once_with(loader.table_name)
assert not loader.client.create_table.called


def test_ensure_table_schemas_differ_value(run_ensure_table):
schema = generate_schema(get_record_cls("tasks"))
schema[0]._properties["mode"] = "NULLABLE"

with pytest.raises(Exception):
run_ensure_table(schema)


def test_ensure_table_schemas_differ_extra_column(run_ensure_table):
schema = generate_schema(get_record_cls("tasks"))
schema.append(SchemaField("foo", "STRING", "NULLABLE"))

with pytest.raises(Exception):
run_ensure_table(schema)


def test_ensure_table_missing(run_ensure_table):
loader = run_ensure_table(exists=False)
loader.client.get_table.assert_called_once_with(loader.table_name)
loader.client.create_table.assert_called_once()
9 changes: 4 additions & 5 deletions jobs/fxci-taskcluster-export/test/test_pulse_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
from typing import Any
import pytest

from fxci_etl.pulse import handler
from fxci_etl.pulse.handler import BigQueryHandler, Event, storage


@pytest.fixture(autouse=True)
def mock_event_backup(mocker):
storage_mock = mocker.MagicMock()
storage_mock.bucket.return_value = mocker.MagicMock()

mocker.patch.object(storage, "Client", return_value=storage_mock)
def setup_mocks(mocker):
mocker.patch.object(storage, "Client", return_value=mocker.MagicMock())
mocker.patch.object(handler, "BigQueryLoader", return_value=mocker.MagicMock())


@pytest.fixture
Expand Down