Skip to content

Commit 2137c89

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents 886aff8 + b79a7b9 commit 2137c89

File tree

13 files changed

+338
-30
lines changed

13 files changed

+338
-30
lines changed

bigquery_etl/backfill/shredder_mitigation.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
DEFAULT_PROJECT_ID = "moz-fx-data-shared-prod"
3131
SHREDDER_MITIGATION_QUERY_NAME = "shredder_mitigation_query"
3232
SHREDDER_MITIGATION_CHECKS_NAME = "shredder_mitigation_checks"
33+
DEFAULT_FOR_NULLS = "??"
3334
WILDCARD_STRING = "???????"
3435
WILDCARD_NUMBER = -9999999
3536
QUERY_FILE_RE = re.compile(
@@ -202,7 +203,7 @@ def generate_query(
202203
if not select_list or not from_clause:
203204
raise click.ClickException(
204205
f"Missing required clause to generate query.\n"
205-
f"Actuals: SELECT: {select_list}, FROM: {self.full_table_id}"
206+
f"Actual: SELECT: {select_list}, FROM: {self.full_table_id}"
206207
)
207208
query = f"SELECT {', '.join(map(str, select_list))}"
208209
query += f" FROM {from_clause}" if from_clause is not None else ""
@@ -575,7 +576,8 @@ def generate_query_with_shredder_mitigation(
575576
common_select = (
576577
[previous.partitioning["field"]]
577578
+ [
578-
f"COALESCE({dim.name}, '{WILDCARD_STRING}') AS {dim.name}"
579+
f"IF({dim.name} IS NULL OR {dim.name} = '{DEFAULT_FOR_NULLS}', '{WILDCARD_STRING}',"
580+
f" {dim.name}) AS {dim.name}"
579581
for dim in common_dimensions
580582
if (
581583
dim.name != previous.partitioning["field"]
@@ -688,7 +690,7 @@ def generate_query_with_shredder_mitigation(
688690
if metric.data_type != DataTypeGroup.FLOAT
689691
]
690692
+ [
691-
f"ROUND({previous_agg.query_cte}.{metric.name}, 10) - " # Round FLOAT to avoid exponentials.
693+
f"ROUND({previous_agg.query_cte}.{metric.name}, 10) - " # Round FLOAT to avoid exponential numbers.
692694
f"ROUND(COALESCE({new_agg.query_cte}.{metric.name}, 0), 10) AS {metric.name}"
693695
for metric in metrics
694696
if metric.data_type == DataTypeGroup.FLOAT
@@ -758,13 +760,13 @@ def generate_query_with_shredder_mitigation(
758760
final_select = f"{', '.join(combined_list)}"
759761

760762
# Generate formatted output strings to display generated-query information in console.
761-
common_ouput = "".join(
763+
common_output = "".join(
762764
[
763765
f"{dim.column_type.name} > {dim.name}:{dim.data_type.name}\n"
764766
for dim in common_dimensions
765767
]
766768
)
767-
metrics_ouput = "".join(
769+
metrics_output = "".join(
768770
[
769771
f"{dim.column_type.name} > {dim.name}:{dim.data_type.name}\n"
770772
for dim in metrics
@@ -778,7 +780,7 @@ def generate_query_with_shredder_mitigation(
778780
)
779781
click.echo(
780782
click.style(
781-
f"Query columns:\n" f"{common_ouput + metrics_ouput + changed_output}",
783+
f"Query columns:\n" f"{common_output + metrics_output + changed_output}",
782784
fg="yellow",
783785
)
784786
)
@@ -816,10 +818,22 @@ def generate_query_with_shredder_mitigation(
816818
# Generate checks to compare versions after each partition backfill.
817819
checks_select = (
818820
[new.partitioning["field"]]
821+
+ [
822+
f"IF({dim.name} IS NULL OR {dim.name} = '{DEFAULT_FOR_NULLS}', '{WILDCARD_STRING}',"
823+
f" {dim.name}) AS {dim.name}"
824+
for dim in common_dimensions
825+
if (
826+
dim.name != previous.partitioning["field"]
827+
and dim.data_type == DataTypeGroup.STRING
828+
)
829+
]
819830
+ [
820831
dim.name
821832
for dim in common_dimensions
822-
if (dim.name != new.partitioning["field"])
833+
if (
834+
dim.name != new.partitioning["field"]
835+
and dim.data_type != DataTypeGroup.STRING
836+
)
823837
]
824838
+ [f"SUM({metric.name})" f" AS {metric.name}" for metric in metrics]
825839
)

bigquery_etl/backfill/shredder_mitigation_checks_template.sql

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ SELECT
2727
CONCAT(
2828
((SELECT COUNT(*) FROM previous_not_matching)),
2929
" rows in the previous data don't match backfilled data! Run auto-generated checks for ",
30-
"all mismatches & search for rows missing or with differences in metrics. 5 sample rows: ",
31-
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM previous_not_matching LIMIT 5)))
30+
"all mismatches & search for rows missing or with differences in metrics. Sample row in previous version: ",
31+
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM previous_not_matching LIMIT 1)))
3232
)
3333
),
3434
NULL
@@ -61,8 +61,8 @@ SELECT
6161
CONCAT(
6262
((SELECT COUNT(*) FROM backfilled_not_matching)),
6363
" rows in backfill don't match previous version of data! Run auto-generated checks for ",
64-
"all mismatches & search for rows added or with differences in metrics. 5 sample rows: ",
65-
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM backfilled_not_matching LIMIT 5)))
64+
"all mismatches & search for rows added or with differences in metrics. Sample row in new_version: ",
65+
(SELECT TO_JSON_STRING(ARRAY(SELECT AS STRUCT * FROM backfilled_not_matching LIMIT 1)))
6666
)
6767
),
6868
NULL
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
friendly_name: Cinder Decisions Raw
2+
description: |-
3+
Download of decisions regarding addon moderations
4+
owners:
5+
6+
labels:
7+
incremental: true
8+
owner1: example
9+
scheduling:
10+
dag_name: bqetl_addons
11+
bigquery:
12+
time_partitioning:
13+
type: day
14+
field: 'date'
15+
require_partition_filter: false
16+
expiration_days: null
17+
range_partitioning: null
18+
references: {}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
"""Cinder API Addon Moderations Decisions - download decisions, clean and upload to BigQuery."""
2+
3+
import csv
4+
import json
5+
import os
6+
import tempfile
7+
from argparse import ArgumentParser
8+
from time import sleep
9+
10+
import requests
11+
from google.cloud import bigquery
12+
13+
CSV_FIELDS = [
14+
"user",
15+
"queue_slug",
16+
"job_id",
17+
"uuid",
18+
"applied_policies",
19+
"entity",
20+
"entity_slug",
21+
"entity_id",
22+
"created_at",
23+
"decision_type",
24+
"job_assigned_at",
25+
"typed_metadata",
26+
]
27+
28+
CINDER_BEARER_TOKEN = os.environ.get("CINDER_TOKEN")
29+
30+
31+
def post_response(url, headers, data):
32+
"""POST response function."""
33+
response = requests.post(url, headers=headers, data=data)
34+
if (response.status_code == 401) or (response.status_code == 400):
35+
print(f"***Error: {response.status_code}***")
36+
print(response.text)
37+
return response
38+
39+
40+
def get_response(url, headers):
41+
"""GET response function."""
42+
response = requests.get(url, headers=headers)
43+
if (response.status_code == 401) or (response.status_code == 400):
44+
print(f"***Error: {response.status_code}***")
45+
print(response.text)
46+
return response
47+
48+
49+
def read_json(filename: str) -> dict:
50+
"""Read JSON file."""
51+
with open(filename, "r") as f:
52+
data = json.loads(f.read())
53+
return data
54+
55+
56+
def write_dict_to_csv(json_data, filename):
57+
"""Write a dictionary to a csv."""
58+
with open(filename, "w") as out_file:
59+
dict_writer = csv.DictWriter(out_file, CSV_FIELDS)
60+
dict_writer.writeheader()
61+
dict_writer.writerows(json_data)
62+
63+
64+
def cinder_addon_decisions_download(date, bearer_token):
65+
"""Download data from Cinder - bearer_token are called here."""
66+
# getting overview metrics for different kpis / Deliverables
67+
url = "https://stage.cinder.nonprod.webservices.mozgcp.net/api/v1/decisions/"
68+
headers = {"accept": "application/json", "authorization": f"Bearer {bearer_token}"}
69+
print(url)
70+
response = get_response(url, headers)
71+
return response
72+
73+
74+
def check_json(cinder_addon_decisions_response_text):
75+
"""Script will return an empty dictionary for apps on days when there is no data. Check for that here."""
76+
with tempfile.NamedTemporaryFile() as tmp_json:
77+
with open(tmp_json.name, "w") as f_json:
78+
f_json.write(cinder_addon_decisions_response_text)
79+
try:
80+
query_export = read_json(f_json.name)
81+
except (
82+
ValueError
83+
): # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
84+
return None
85+
return query_export
86+
87+
88+
def clean_json(query_export, date):
89+
"""Turn the json file into a list to be input into a CSV for bq upload."""
90+
fields_list = []
91+
for item in query_export["items"]:
92+
field_dict = {
93+
"user": item["user"],
94+
"queue_slug": item["queue_slug"],
95+
"job_id": item["job_id"],
96+
"uuid": item["uuid"],
97+
"applied_policies": item["applied_policies"],
98+
"entity": item["entity"],
99+
"entity_slug": item["entity_slug"],
100+
"entity_id": item["entity_id"],
101+
"created_at": item["created_at"],
102+
"decision_type": item["decision_type"],
103+
"job_assigned_at": item["job_assigned_at"],
104+
"typed_metadata": item["typed_metadata"],
105+
}
106+
fields_list.append(field_dict)
107+
return fields_list
108+
109+
110+
def upload_to_bigquery(csv_data, project, dataset, table_name, date):
111+
"""Upload the data to bigquery."""
112+
date = date
113+
print("writing json to csv")
114+
partition = f"{date}".replace("-", "")
115+
print(partition)
116+
with tempfile.NamedTemporaryFile() as tmp_csv:
117+
with open(tmp_csv.name, "w+b") as f_csv:
118+
write_dict_to_csv(csv_data, f_csv.name)
119+
client = bigquery.Client(project)
120+
job_config = bigquery.LoadJobConfig(
121+
create_disposition="CREATE_IF_NEEDED",
122+
write_disposition="WRITE_TRUNCATE",
123+
time_partitioning=bigquery.TimePartitioning(
124+
type_=bigquery.TimePartitioningType.DAY,
125+
field="date",
126+
),
127+
skip_leading_rows=1,
128+
schema=[
129+
bigquery.SchemaField("date", "DATE"),
130+
bigquery.SchemaField("user", "STRING"),
131+
bigquery.SchemaField("queue_slug", "STRING"),
132+
bigquery.SchemaField("job_id", "STRING"),
133+
bigquery.SchemaField("uuid", "STRING"),
134+
bigquery.SchemaField("applied_policies", "STRING"),
135+
bigquery.SchemaField("entity", "STRING"),
136+
bigquery.SchemaField("entity_slug", "STRING"),
137+
bigquery.SchemaField("entity_id", "STRING"),
138+
bigquery.SchemaField("created_at", "DATE"),
139+
bigquery.SchemaField("decision_type", "STRING"),
140+
bigquery.SchemaField("job_assigned_at", "STRING"),
141+
bigquery.SchemaField("typed_metadata", "STRING"),
142+
],
143+
)
144+
destination = f"{project}.{dataset}.{table_name}${partition}"
145+
job = client.load_table_from_file(f_csv, destination, job_config=job_config)
146+
print(
147+
f"Writing Decisions data to {destination}. BigQuery job ID: {job.job_id}"
148+
)
149+
job.result()
150+
151+
152+
def main():
153+
"""Input data, call functions, get stuff done."""
154+
parser = ArgumentParser(description=__doc__)
155+
parser.add_argument("--date", required=True)
156+
parser.add_argument("--project", default="moz-fx-data-shared-prod")
157+
parser.add_argument("--dataset", default="addon_moderations_derived")
158+
159+
args = parser.parse_args()
160+
161+
project = args.project
162+
dataset = args.dataset
163+
table_name = "cinder_decisions_raw_v1"
164+
165+
date = args.date
166+
bearer_token = CINDER_BEARER_TOKEN
167+
168+
data = []
169+
170+
json_file = cinder_addon_decisions_download(date, bearer_token)
171+
query_export = check_json(json_file.text)
172+
173+
if query_export is not None:
174+
# This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table
175+
cinder_addon_decisions_data = clean_json(query_export, date)
176+
data.extend(cinder_addon_decisions_data)
177+
else:
178+
print("no data for today")
179+
sleep(5)
180+
181+
upload_to_bigquery(data, project, dataset, table_name, date)
182+
183+
184+
if __name__ == "__main__":
185+
main()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
fields:
2+
3+
- mode: NULLABLE
4+
name: date
5+
type: DATE
6+
description: date when job run and field that the table is partitioned by
7+
8+
- mode: NULLABLE
9+
name: user
10+
type: STRING
11+
description: User who submitted the report
12+
13+
- mode: NULLABLE
14+
name: queue_slug
15+
type: STRING
16+
description: Queue_slug
17+
18+
- mode: NULLABLE
19+
name: job_id
20+
type: STRING
21+
description: Job_id of Decision
22+
23+
- mode: NULLABLE
24+
name: uuid
25+
type: STRING
26+
description: ID of UU
27+
28+
- mode: REPEATED
29+
name: applied_policies
30+
type: STRING
31+
description: Policies applied to moderate addon
32+
33+
- mode: REPEATED
34+
name: entity
35+
type: STRING
36+
description: Information about the entity
37+
38+
- mode: NULLABLE
39+
name: entity_slug
40+
type: STRING
41+
description: Entity Slug
42+
43+
- mode: NULLABLE
44+
name: entity_id
45+
type: STRING
46+
description: Add on ID
47+
48+
- mode: NULLABLE
49+
name: created_at
50+
type: STRING
51+
description: Date decision made
52+
53+
- mode: NULLABLE
54+
name: decision_type
55+
type: STRING
56+
description: type of decision
57+
58+
- mode: NULLABLE
59+
name: job_assigned_at
60+
type: STRING
61+
description: Date addon report was assigned to a moderator
62+
63+
- mode: NULLABLE
64+
name: typed_metadata
65+
type: STRING
66+
description: Contains more data
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
friendly_name: Addon Moderations
2+
description: |-
3+
Dataset for anything to do with Addon Moderations
4+
dataset_base_acl: derived
5+
user_facing: false
6+
labels: {}
7+
default_table_workgroup_access:
8+
- role: roles/bigquery.dataViewer
9+
members:
10+
- workgroup:mozilla-confidential
11+
workgroup_access:
12+
- role: roles/bigquery.dataViewer
13+
members:
14+
- workgroup:mozilla-confidential
15+
syndication: {}

0 commit comments

Comments
 (0)