Skip to content

Commit b79a7b9

Browse files
initial add of scripts for addon_moderations (#6484)
* initial add of scripts for addon_moderations * move files over to addon_moderations_derived, update metadata.yaml to partition by date * update metadata.yaml to type:day and field:date * change DAG from bqetl_cinder to bqetl_addons * remove query file, update query.py file" * take out bexplicit bearer token * take out clustering
1 parent 47559f8 commit b79a7b9

File tree

4 files changed

+266
-2
lines changed

4 files changed

+266
-2
lines changed

sql/moz-fx-data-shared-prod/addon_moderations_derived/cinder_decisions_raw_v1/metadata.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,4 @@ bigquery:
1515
require_partition_filter: false
1616
expiration_days: null
1717
range_partitioning: null
18-
clustering:
19-
fields: []
2018
references: {}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
"""Cinder API Addon Moderations Decisions - download decisions, clean and upload to BigQuery."""
2+
3+
import csv
4+
import json
5+
import os
6+
import tempfile
7+
from argparse import ArgumentParser
8+
from time import sleep
9+
10+
import requests
11+
from google.cloud import bigquery
12+
13+
CSV_FIELDS = [
14+
"user",
15+
"queue_slug",
16+
"job_id",
17+
"uuid",
18+
"applied_policies",
19+
"entity",
20+
"entity_slug",
21+
"entity_id",
22+
"created_at",
23+
"decision_type",
24+
"job_assigned_at",
25+
"typed_metadata",
26+
]
27+
28+
CINDER_BEARER_TOKEN = os.environ.get("CINDER_TOKEN")
29+
30+
31+
def post_response(url, headers, data):
32+
"""POST response function."""
33+
response = requests.post(url, headers=headers, data=data)
34+
if (response.status_code == 401) or (response.status_code == 400):
35+
print(f"***Error: {response.status_code}***")
36+
print(response.text)
37+
return response
38+
39+
40+
def get_response(url, headers):
41+
"""GET response function."""
42+
response = requests.get(url, headers=headers)
43+
if (response.status_code == 401) or (response.status_code == 400):
44+
print(f"***Error: {response.status_code}***")
45+
print(response.text)
46+
return response
47+
48+
49+
def read_json(filename: str) -> dict:
50+
"""Read JSON file."""
51+
with open(filename, "r") as f:
52+
data = json.loads(f.read())
53+
return data
54+
55+
56+
def write_dict_to_csv(json_data, filename):
57+
"""Write a dictionary to a csv."""
58+
with open(filename, "w") as out_file:
59+
dict_writer = csv.DictWriter(out_file, CSV_FIELDS)
60+
dict_writer.writeheader()
61+
dict_writer.writerows(json_data)
62+
63+
64+
def cinder_addon_decisions_download(date, bearer_token):
65+
"""Download data from Cinder - bearer_token are called here."""
66+
# getting overview metrics for different kpis / Deliverables
67+
url = "https://stage.cinder.nonprod.webservices.mozgcp.net/api/v1/decisions/"
68+
headers = {"accept": "application/json", "authorization": f"Bearer {bearer_token}"}
69+
print(url)
70+
response = get_response(url, headers)
71+
return response
72+
73+
74+
def check_json(cinder_addon_decisions_response_text):
75+
"""Script will return an empty dictionary for apps on days when there is no data. Check for that here."""
76+
with tempfile.NamedTemporaryFile() as tmp_json:
77+
with open(tmp_json.name, "w") as f_json:
78+
f_json.write(cinder_addon_decisions_response_text)
79+
try:
80+
query_export = read_json(f_json.name)
81+
except (
82+
ValueError
83+
): # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
84+
return None
85+
return query_export
86+
87+
88+
def clean_json(query_export, date):
89+
"""Turn the json file into a list to be input into a CSV for bq upload."""
90+
fields_list = []
91+
for item in query_export["items"]:
92+
field_dict = {
93+
"user": item["user"],
94+
"queue_slug": item["queue_slug"],
95+
"job_id": item["job_id"],
96+
"uuid": item["uuid"],
97+
"applied_policies": item["applied_policies"],
98+
"entity": item["entity"],
99+
"entity_slug": item["entity_slug"],
100+
"entity_id": item["entity_id"],
101+
"created_at": item["created_at"],
102+
"decision_type": item["decision_type"],
103+
"job_assigned_at": item["job_assigned_at"],
104+
"typed_metadata": item["typed_metadata"],
105+
}
106+
fields_list.append(field_dict)
107+
return fields_list
108+
109+
110+
def upload_to_bigquery(csv_data, project, dataset, table_name, date):
111+
"""Upload the data to bigquery."""
112+
date = date
113+
print("writing json to csv")
114+
partition = f"{date}".replace("-", "")
115+
print(partition)
116+
with tempfile.NamedTemporaryFile() as tmp_csv:
117+
with open(tmp_csv.name, "w+b") as f_csv:
118+
write_dict_to_csv(csv_data, f_csv.name)
119+
client = bigquery.Client(project)
120+
job_config = bigquery.LoadJobConfig(
121+
create_disposition="CREATE_IF_NEEDED",
122+
write_disposition="WRITE_TRUNCATE",
123+
time_partitioning=bigquery.TimePartitioning(
124+
type_=bigquery.TimePartitioningType.DAY,
125+
field="date",
126+
),
127+
skip_leading_rows=1,
128+
schema=[
129+
bigquery.SchemaField("date", "DATE"),
130+
bigquery.SchemaField("user", "STRING"),
131+
bigquery.SchemaField("queue_slug", "STRING"),
132+
bigquery.SchemaField("job_id", "STRING"),
133+
bigquery.SchemaField("uuid", "STRING"),
134+
bigquery.SchemaField("applied_policies", "STRING"),
135+
bigquery.SchemaField("entity", "STRING"),
136+
bigquery.SchemaField("entity_slug", "STRING"),
137+
bigquery.SchemaField("entity_id", "STRING"),
138+
bigquery.SchemaField("created_at", "DATE"),
139+
bigquery.SchemaField("decision_type", "STRING"),
140+
bigquery.SchemaField("job_assigned_at", "STRING"),
141+
bigquery.SchemaField("typed_metadata", "STRING"),
142+
],
143+
)
144+
destination = f"{project}.{dataset}.{table_name}${partition}"
145+
job = client.load_table_from_file(f_csv, destination, job_config=job_config)
146+
print(
147+
f"Writing Decisions data to {destination}. BigQuery job ID: {job.job_id}"
148+
)
149+
job.result()
150+
151+
152+
def main():
153+
"""Input data, call functions, get stuff done."""
154+
parser = ArgumentParser(description=__doc__)
155+
parser.add_argument("--date", required=True)
156+
parser.add_argument("--project", default="moz-fx-data-shared-prod")
157+
parser.add_argument("--dataset", default="addon_moderations_derived")
158+
159+
args = parser.parse_args()
160+
161+
project = args.project
162+
dataset = args.dataset
163+
table_name = "cinder_decisions_raw_v1"
164+
165+
date = args.date
166+
bearer_token = CINDER_BEARER_TOKEN
167+
168+
data = []
169+
170+
json_file = cinder_addon_decisions_download(date, bearer_token)
171+
query_export = check_json(json_file.text)
172+
173+
if query_export is not None:
174+
# This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table
175+
cinder_addon_decisions_data = clean_json(query_export, date)
176+
data.extend(cinder_addon_decisions_data)
177+
else:
178+
print("no data for today")
179+
sleep(5)
180+
181+
upload_to_bigquery(data, project, dataset, table_name, date)
182+
183+
184+
if __name__ == "__main__":
185+
main()
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
fields:
2+
3+
- mode: NULLABLE
4+
name: date
5+
type: DATE
6+
description: date when job run and field that the table is partitioned by
7+
8+
- mode: NULLABLE
9+
name: user
10+
type: STRING
11+
description: User who submitted the report
12+
13+
- mode: NULLABLE
14+
name: queue_slug
15+
type: STRING
16+
description: Queue_slug
17+
18+
- mode: NULLABLE
19+
name: job_id
20+
type: STRING
21+
description: Job_id of Decision
22+
23+
- mode: NULLABLE
24+
name: uuid
25+
type: STRING
26+
description: ID of UU
27+
28+
- mode: REPEATED
29+
name: applied_policies
30+
type: STRING
31+
description: Policies applied to moderate addon
32+
33+
- mode: REPEATED
34+
name: entity
35+
type: STRING
36+
description: Information about the entity
37+
38+
- mode: NULLABLE
39+
name: entity_slug
40+
type: STRING
41+
description: Entity Slug
42+
43+
- mode: NULLABLE
44+
name: entity_id
45+
type: STRING
46+
description: Add on ID
47+
48+
- mode: NULLABLE
49+
name: created_at
50+
type: STRING
51+
description: Date decision made
52+
53+
- mode: NULLABLE
54+
name: decision_type
55+
type: STRING
56+
description: type of decision
57+
58+
- mode: NULLABLE
59+
name: job_assigned_at
60+
type: STRING
61+
description: Date addon report was assigned to a moderator
62+
63+
- mode: NULLABLE
64+
name: typed_metadata
65+
type: STRING
66+
description: Contains more data
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
friendly_name: Addon Moderations
2+
description: |-
3+
Dataset for anything to do with Addon Moderations
4+
dataset_base_acl: derived
5+
user_facing: false
6+
labels: {}
7+
default_table_workgroup_access:
8+
- role: roles/bigquery.dataViewer
9+
members:
10+
- workgroup:mozilla-confidential
11+
workgroup_access:
12+
- role: roles/bigquery.dataViewer
13+
members:
14+
- workgroup:mozilla-confidential
15+
syndication: {}

0 commit comments

Comments
 (0)