Skip to content

Commit bb6d9da

Browse files
DENG-877 Check backfills entries for tables with shredder_mitigation in metadata (#6588)
* Use attrs instead of attr for correct validators and attributes. Add constraint to stop backfills when table metadata has shredder_mitigation label and the backfill doesn't use mitigation. * Use attrs instead of attr for correct validators and attributes. Add constraint to stop backfills when table metadata has shredder_mitigation label and the backfill doesn't use mitigation. * Fix test. * add validator and test for empty destination_table. * add function docs
1 parent f89089d commit bb6d9da

File tree

4 files changed

+220
-41
lines changed

4 files changed

+220
-41
lines changed

bigquery_etl/backfill/shredder_mitigation.py

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from types import NoneType
1212
from typing import Any, Optional, Tuple
1313

14-
import attr
14+
import attrs
1515
import click
1616
from dateutil import parser
1717
from gcloud.exceptions import NotFound # type: ignore
@@ -83,44 +83,46 @@ def from_schema_type(schema_type):
8383
return None
8484

8585

86-
@attr.define(eq=True)
86+
@attrs.define(eq=True)
8787
class Column:
8888
"""Representation of a column in a query, with relevant details for shredder mitigation."""
8989

9090
name: str
91-
data_type: DataTypeGroup = attr.field(default=DataTypeGroup.UNDETERMINED)
92-
column_type: ColumnType = attr.field(default=ColumnType.UNDETERMINED)
93-
status: ColumnStatus = attr.field(default=ColumnStatus.UNDETERMINED)
94-
95-
@data_type.validator
96-
def validate_data_type(self, _attribute, value):
97-
"""Check that the type of data_type is as expected."""
98-
if not isinstance(value, DataTypeGroup):
99-
raise ValueError(f"Invalid {value} with type: {type(value)}.")
100-
101-
@column_type.validator
102-
def validate_column_type(self, _attribute, value):
103-
"""Check that the type of parameter column_type is as expected."""
104-
if not isinstance(value, ColumnType):
105-
raise ValueError(f"Invalid data type for: {value}.")
106-
107-
@status.validator
108-
def validate_status(self, _attribute, value):
109-
"""Check that the type of parameter column_status is as expected."""
110-
if not isinstance(value, ColumnStatus):
111-
raise ValueError(f"Invalid data type for: {value}.")
112-
113-
114-
@attr.define(eq=True)
91+
data_type: DataTypeGroup = attrs.field(
92+
default=DataTypeGroup.UNDETERMINED,
93+
validator=attrs.validators.instance_of(DataTypeGroup),
94+
)
95+
column_type: ColumnType = attrs.field(
96+
default=ColumnType.UNDETERMINED,
97+
validator=attrs.validators.instance_of(ColumnType),
98+
)
99+
status: ColumnStatus = attrs.field(
100+
default=ColumnStatus.UNDETERMINED,
101+
validator=attrs.validators.instance_of(ColumnStatus),
102+
)
103+
104+
105+
@attrs.define(eq=True)
115106
class Subset:
116107
"""Representation of a subset/CTEs in the query and the actions related to this subset."""
117108

109+
@staticmethod
110+
def attr_not_null(instance, attribute, value):
111+
"""Raise an exception if the value is None or empty."""
112+
if value is None or value == "":
113+
raise click.ClickException(
114+
f"{attribute.name} not given and it's required to continue."
115+
)
116+
118117
client: bigquery.Client
119-
destination_table: str = attr.field(default="")
120-
query_cte: str = attr.field(default="")
121-
dataset: str = attr.field(default=TEMP_DATASET)
122-
project_id: str = attr.field(default=DEFAULT_PROJECT_ID)
123-
expiration_days: Optional[float] = attr.field(default=None)
118+
destination_table: str = attrs.field(
119+
default="",
120+
validator=attr_not_null,
121+
)
122+
query_cte: str = attrs.field(default="")
123+
dataset: str = attrs.field(default=TEMP_DATASET)
124+
project_id: str = attrs.field(default=DEFAULT_PROJECT_ID)
125+
expiration_days: Optional[float] = attrs.field(default=None)
124126

125127
@property
126128
def version(self):
@@ -243,7 +245,7 @@ def get_query_path_results(
243245
partition_type = (
244246
"DATE" if self.partitioning["type"] == "DAY" else self.partitioning["type"]
245247
)
246-
parameters = None
248+
parameters = []
247249
if partition_field is not None:
248250
parameters = [
249251
bigquery.ScalarQueryParameter(
@@ -546,9 +548,9 @@ def generate_query_with_shredder_mitigation(
546548
) = classify_columns(
547549
new_table_row, previous_group_by, new_group_by, existing_schema, new_schema
548550
)
549-
except TypeError as e:
551+
except TypeError:
550552
raise click.ClickException(
551-
f"Table {destination_table} did not return any rows for {backfill_date}.\n{e}"
553+
f"Table {destination_table} did not return any rows for {backfill_date}."
552554
)
553555

554556
if not common_dimensions or not added_dimensions or not metrics:
@@ -690,7 +692,8 @@ def generate_query_with_shredder_mitigation(
690692
if metric.data_type != DataTypeGroup.FLOAT
691693
]
692694
+ [
693-
f"ROUND({previous_agg.query_cte}.{metric.name}, 10) - " # Round FLOAT to avoid exponential numbers.
695+
# Round FLOAT to avoid exponential numbers.
696+
f"ROUND({previous_agg.query_cte}.{metric.name}, 10) - "
694697
f"ROUND(COALESCE({new_agg.query_cte}.{metric.name}, 0), 10) AS {metric.name}"
695698
for metric in metrics
696699
if metric.data_type == DataTypeGroup.FLOAT

bigquery_etl/cli/backfill.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from ..config import ConfigLoader
5353
from ..deploy import FailedDeployException, SkippedDeployException, deploy_table
5454
from ..metadata.parse_metadata import METADATA_FILE, Metadata
55+
from ..metadata.validate_metadata import SHREDDER_MITIGATION_LABEL
5556
from ..schema import SCHEMA_FILE, Schema
5657

5758
logging.basicConfig(level=logging.INFO)
@@ -523,6 +524,25 @@ def _initiate_backfill(
523524
custom_query_path = None
524525
checks = None
525526
custom_checks_name = None
527+
528+
metadata = Metadata.from_file(
529+
Path("sql") / project / dataset / table / METADATA_FILE
530+
)
531+
532+
# Stop if the metadata contains shredder mitigation label and the backfill doesn't.
533+
if (
534+
SHREDDER_MITIGATION_LABEL in metadata.labels
535+
and entry.shredder_mitigation is False
536+
):
537+
click.echo(
538+
click.style(
539+
f"This backfill cannot continue.\nManaged backfills for tables with metadata label"
540+
f" {SHREDDER_MITIGATION_LABEL} require using --shredder_mitigation.",
541+
fg="yellow",
542+
)
543+
)
544+
sys.exit(1)
545+
526546
if entry.shredder_mitigation is True:
527547
click.echo(
528548
click.style(

tests/backfill/test_shredder_mitigation.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,24 @@ def test_version(self, mock_client):
756756
== f"Invalid or missing table version in {test_subset.destination_table}."
757757
)
758758

759+
@patch("google.cloud.bigquery.Client")
760+
def test_destination_table_not_given(self, mock_client):
761+
"""Test valid version in destination table."""
762+
test_destination_table = ""
763+
764+
with pytest.raises(click.ClickException) as e:
765+
Subset(
766+
mock_client,
767+
test_destination_table,
768+
None,
769+
self.dataset,
770+
self.project_id,
771+
None,
772+
)
773+
assert str(e.value.message) == (
774+
"destination_table not given and it's required to continue."
775+
)
776+
759777
@patch("google.cloud.bigquery.Client")
760778
def test_partitioning(self, mock_client, runner):
761779
"""Test that partitioning type and value associated to a subset are returned as expected."""
@@ -1153,10 +1171,10 @@ def test_generate_query_as_expected(
11531171
@patch("google.cloud.bigquery.Client")
11541172
@patch("bigquery_etl.backfill.shredder_mitigation.classify_columns")
11551173
def test_generate_query_failed_for_missing_partitioning(
1156-
self, mock_classify_columns, mock_client, runner
1174+
self, mock_classify_columns, mock_client, runner, capfd
11571175
):
1158-
"""Test that function raises exception for required query parameter missing
1159-
in metadata, instead of generating wrong query."""
1176+
"""Test that function raises exception for required query parameter -partitioning-
1177+
missing in metadata, instead of generating wrong query."""
11601178
existing_schema = {
11611179
"fields": [
11621180
{"name": "column_1", "type": "DATE", "mode": "NULLABLE"},
@@ -1229,7 +1247,7 @@ def test_generate_query_failed_for_missing_partitioning(
12291247
[],
12301248
)
12311249

1232-
with pytest.raises(TypeError) as e:
1250+
with pytest.raises((TypeError, IndexError)) as e:
12331251
generate_query_with_shredder_mitigation(
12341252
client=mock_client,
12351253
project_id=self.project_id,
@@ -1238,7 +1256,13 @@ def test_generate_query_failed_for_missing_partitioning(
12381256
staging_table_name=self.staging_table_name,
12391257
backfill_date=PREVIOUS_DATE,
12401258
)
1241-
assert str(e.value) == "'NoneType' object is not iterable"
1259+
1260+
assert isinstance(e.value, (TypeError, IndexError))
1261+
assert str(e.value) in {
1262+
"'NoneType' object is not iterable",
1263+
"Invalid type",
1264+
"list index out of range",
1265+
}
12421266

12431267
@patch("google.cloud.bigquery.Client")
12441268
@patch("bigquery_etl.backfill.shredder_mitigation.classify_columns")

tests/cli/test_cli_backfill.py

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,6 @@ def test_validate_backfill_invalid_start_date_greater_than_end_date(self, runner
10291029
assert result.exit_code == 1
10301030
assert "Invalid start date" in str(result.exception)
10311031

1032-
#
10331032
def test_validate_backfill_invalid_start_date_greater_than_entry_date(self, runner):
10341033
with runner.isolated_filesystem():
10351034
SQL_DIR = "sql/moz-fx-data-shared-prod/test/test_query_v1"
@@ -2657,3 +2656,136 @@ def test_initiate_partitioned_backfill_with_invalid_billing_project_from_entry_s
26572656

26582657
assert result.exit_code == 1
26592658
assert "Invalid billing project" in str(result.exception)
2659+
2660+
@patch("bigquery_etl.cli.backfill.deploy_table")
2661+
@patch("bigquery_etl.backfill.utils._should_initiate")
2662+
def test_dont_initiate_if_label_and_backfill_entry_dont_match(
2663+
self, mock_should_initiate, mock_deploy_table, runner
2664+
):
2665+
"""Test that process stops if the table has label shredder_mitigation and the backfill doesn't."""
2666+
path = Path("sql/moz-fx-data-shared-prod/test/test_query_v1")
2667+
mock_should_initiate.return_value = True
2668+
mock_deploy_table.return_value = None
2669+
expected_error_output = (
2670+
"This backfill cannot continue.\nManaged backfills for "
2671+
"tables with metadata label shredder_mitigation require "
2672+
"using --shredder_mitigation."
2673+
)
2674+
2675+
with runner.isolated_filesystem():
2676+
os.makedirs(path, exist_ok=True)
2677+
2678+
with open(
2679+
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
2680+
) as f:
2681+
f.write("SELECT 1")
2682+
2683+
with open(path / "metadata.yaml", "w") as f:
2684+
f.write(
2685+
"friendly_name: Test\ndescription: Test\nlabels:\n incremental: true\n shredder_mitigation: true"
2686+
)
2687+
2688+
with open(
2689+
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
2690+
) as f:
2691+
f.write(yaml.dump(DATASET_METADATA_CONF))
2692+
2693+
backfill_entry_1 = Backfill(
2694+
date(2021, 5, 3),
2695+
date(2021, 1, 3),
2696+
date(2021, 5, 3),
2697+
[date(2021, 2, 3)],
2698+
VALID_REASON,
2699+
[VALID_WATCHER],
2700+
BackfillStatus.INITIATE,
2701+
shredder_mitigation=False,
2702+
)
2703+
2704+
backfill_file = (
2705+
Path("sql/moz-fx-data-shared-prod/test/test_query_v1") / BACKFILL_FILE
2706+
)
2707+
backfill_file.write_text(backfill_entry_1.to_yaml())
2708+
assert BACKFILL_FILE in os.listdir(
2709+
"sql/moz-fx-data-shared-prod/test/test_query_v1"
2710+
)
2711+
backfills = Backfill.entries_from_file(backfill_file)
2712+
assert backfills[0] == backfill_entry_1
2713+
2714+
result = runner.invoke(
2715+
initiate,
2716+
[
2717+
"moz-fx-data-shared-prod.test.test_query_v1",
2718+
"--parallelism=0",
2719+
],
2720+
)
2721+
2722+
assert result.exit_code == 1
2723+
assert expected_error_output in result.output
2724+
2725+
@patch("bigquery_etl.cli.backfill.deploy_table")
2726+
@patch("bigquery_etl.backfill.utils._should_initiate")
2727+
def test_initiate_if_label_and_backfill_entry_match(
2728+
self, mock_should_initiate, mock_deploy_table, runner
2729+
):
2730+
"""Test that the process continues if both table & backfill use shredder_mitigation."""
2731+
path = Path("sql/moz-fx-data-shared-prod/test/test_query_v1")
2732+
mock_should_initiate.return_value = True
2733+
mock_deploy_table.return_value = None
2734+
2735+
with runner.isolated_filesystem():
2736+
os.makedirs(path, exist_ok=True)
2737+
2738+
with open(
2739+
"sql/moz-fx-data-shared-prod/test/test_query_v1/query.sql", "w"
2740+
) as f:
2741+
f.write("SELECT 1")
2742+
2743+
with open(path / "metadata.yaml", "w") as f:
2744+
f.write(
2745+
"friendly_name: Test\ndescription: Test\nlabels:\n incremental: true\n shredder_mitigation: true"
2746+
)
2747+
2748+
with open(
2749+
"sql/moz-fx-data-shared-prod/test/dataset_metadata.yaml", "w"
2750+
) as f:
2751+
f.write(yaml.dump(DATASET_METADATA_CONF))
2752+
2753+
backfill_entry_1 = Backfill(
2754+
date(2021, 5, 3),
2755+
date(2021, 1, 3),
2756+
date(2021, 5, 3),
2757+
[date(2021, 2, 3)],
2758+
VALID_REASON,
2759+
[VALID_WATCHER],
2760+
BackfillStatus.INITIATE,
2761+
shredder_mitigation=True,
2762+
)
2763+
2764+
backfill_file = (
2765+
Path("sql/moz-fx-data-shared-prod/test/test_query_v1") / BACKFILL_FILE
2766+
)
2767+
backfill_file.write_text(backfill_entry_1.to_yaml())
2768+
assert BACKFILL_FILE in os.listdir(
2769+
"sql/moz-fx-data-shared-prod/test/test_query_v1"
2770+
)
2771+
backfills = Backfill.entries_from_file(backfill_file)
2772+
assert backfills[0] == backfill_entry_1
2773+
2774+
with patch(
2775+
"bigquery_etl.cli.backfill.query_backfill", return_value=None
2776+
) as mock_backfill:
2777+
with patch(
2778+
"bigquery_etl.cli.backfill.generate_query_with_shredder_mitigation",
2779+
return_value=("a", "b"),
2780+
) as mock_shredder_mitigation:
2781+
result = runner.invoke(
2782+
initiate,
2783+
[
2784+
"moz-fx-data-shared-prod.test.test_query_v1",
2785+
"--parallelism=0",
2786+
],
2787+
)
2788+
2789+
assert result.exit_code == 0
2790+
mock_shredder_mitigation.call_count == 2
2791+
mock_backfill.call_count == 2

0 commit comments

Comments
 (0)