Skip to content

Commit f8e7ab7

Browse files
authored
Merge branch 'develop' into release/0.5.20
2 parents e251f0b + a09364a commit f8e7ab7

File tree

14 files changed

+419
-32
lines changed

14 files changed

+419
-32
lines changed

src/main/resources/db_scripts/db_script_latest.sql

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ create table "job_instance" (
3434
"id" BIGSERIAL NOT NULL PRIMARY KEY,
3535
"application_id" VARCHAR,
3636
"step_id" VARCHAR,
37-
"job_parameters" JSONB NOT NULL DEFAULT '{}'
37+
"job_parameters" JSONB NOT NULL DEFAULT '{}',
38+
"diagnostics" VARCHAR
3839
);
3940

4041
create table "job_definition" (
@@ -148,7 +149,8 @@ create table archive_job_instance
148149
references archive_dag_instance,
149150
id bigint primary key,
150151
application_id varchar,
151-
step_id varchar
152+
step_id varchar,
153+
diagnostics varchar
152154
);
153155

154156
create table archive_event
@@ -317,8 +319,8 @@ BEGIN
317319
GET DIAGNOSTICS _cnt = ROW_COUNT;
318320
RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id;
319321

320-
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id)
321-
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id
322+
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics)
323+
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics
322324
FROM job_instance ji
323325
JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id
324326
ON CONFLICT (id) DO NOTHING;
@@ -361,3 +363,11 @@ BEGIN
361363
END;
362364
$$ LANGUAGE plpgsql;
363365

366+
create table if not exists housekeepinglock
367+
(
368+
locked boolean not null,
369+
started_at timestamp
370+
);
371+
372+
insert into "housekeepinglock" ("locked", "started_at")
373+
values (false, null);
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#! /bin/bash
2+
#
3+
# Copyright 2018 ABSA Group Limited
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
RECIPIENTS=$1
18+
RETENTION_DAYS=$2
19+
ENV=$3
20+
CONFIG_FILE_LOCATION=${4:-"../../application.properties"}
21+
22+
if [[ -n "$RECIPIENTS" && -n "$RETENTION_DAYS" && -n "$ENV" ]]; then
23+
echo "Starting runs clean up with retention days: $RETENTION_DAYS, env: $ENV and recipients: $RECIPIENTS"
24+
else
25+
echo "Incorrect input. Exiting the program."
26+
exit 1
27+
fi
28+
29+
db_url=$(grep '^db.url=' "$CONFIG_FILE_LOCATION" | awk -F'jdbc:postgresql://' '{print $2}' | awk -F'\\?' '{print $1}')
30+
db_user=$(grep '^db.user=' "$CONFIG_FILE_LOCATION" | awk -F'=' '{print $2}')
31+
db_password=$(grep '^db.password=' "$CONFIG_FILE_LOCATION" | awk -F'=' '{print $2}')
32+
connection_url="postgresql://$db_user:$db_password@$db_url"
33+
34+
obtain_lock_response=$(psql -X $connection_url -c "UPDATE housekeepinglock SET locked = 'true', started_at = now() WHERE locked = false AND started_at is null;" 2>&1)
35+
36+
if [[ $obtain_lock_response == "UPDATE 1" ]]; then
37+
echo "Exclusive lock for running runs clean up job obtained."
38+
39+
archive_dag_instances_response=$(psql -X $connection_url -c "CALL archive_dag_instances(i_to_ts => (now() - '$RETENTION_DAYS days'::interval)::timestamp without time zone);" 2>&1)
40+
release_lock_response=$(psql -X $connection_url -c "UPDATE housekeepinglock SET locked = 'false', started_at = null;" 2>&1)
41+
42+
message=""
43+
subject=""
44+
if [[ $archive_dag_instances_response == *"ERROR"* ]]; then
45+
subject="Hyperdrive Notifications - $ENV - Runs clean up failed!"
46+
message+="Runs clean up job failed with following output:"
47+
else
48+
subject="Hyperdrive Notifications - $ENV - Runs clean up succeeded"
49+
message+="Runs clean up job succeeded with following output:"
50+
fi
51+
message+="\n$archive_dag_instances_response"
52+
53+
if [[ $release_lock_response == "UPDATE 1" ]]; then
54+
message+="\n\nLock for running runs clean job was successfully released"
55+
else
56+
message+="\n\nLock for running runs clean job was not released!"
57+
fi
58+
echo -e "$message"
59+
echo -e "$message" | mailx -s "$subject" -r "[email protected]" "$RECIPIENTS"
60+
61+
else
62+
obtain_lock_time_response=$(psql -X $connection_url -c "SELECT started_at FROM housekeepinglock;" 2>&1)
63+
64+
timestamp_string=$(echo "$obtain_lock_time_response" | grep -oE '[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}')
65+
timestamp_unix=$(date -jf "%Y-%m-%d %H:%M:%S" "$timestamp_string" +%s)
66+
current_unix=$(date +%s)
67+
time_difference=$((current_unix - timestamp_unix))
68+
days_difference=$((time_difference / 86400))
69+
70+
if [ "$days_difference" -gt "2" ]; then
71+
message="Runs clean up has been running for more than 3 days."
72+
echo -e "$message"
73+
echo -e "$message" | mailx -s "Hyperdrive Notifications - $ENV - Runs clean up locked" -r "[email protected]" "$RECIPIENTS"
74+
else
75+
echo "Could not execute runs clean up job because there is another instance currently running for ($((time_difference / 60)) minutes)."
76+
fi
77+
fi

src/main/resources/db_scripts/liquibase/db.changelog.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,9 @@ databaseChangeLog:
9595
- include:
9696
relativeToChangelogFile: true
9797
file: v0.5.14.remove-deprecated-columns.yml
98+
- include:
99+
relativeToChangelogFile: true
100+
file: v0.5.20.add-diagnostics-field.yml
101+
- include:
102+
relativeToChangelogFile: true
103+
file: v0.5.20.add-housekeeping-lock-table.yml
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
ALTER TABLE "job_instance"
17+
ADD COLUMN "diagnostics" VARCHAR;
18+
ALTER TABLE "archive_job_instance"
19+
ADD COLUMN "diagnostics" VARCHAR;
20+
21+
22+
23+
CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk(
24+
IN i_min_id BIGINT,
25+
IN i_max_id BIGINT
26+
)
27+
AS $$
28+
-------------------------------------------------------------------------------
29+
--
30+
-- Procedure: archive_dag_instances_chunk(2)
31+
-- Copies dag_instances with a final status from i_min_id to i_max_id to the
32+
-- archive_dag_instance table.
33+
-- Along with dag_instance, referenced job_instances and events are
34+
-- archived to the archive_job_instance and archive_event tables, respectively.
35+
-- This method should not be called directly. Instead, use archive_dag_instances
36+
--
37+
-- Parameters:
38+
-- i_min_id - Minimum dag instance id to archive
39+
-- i_max_id - Maximum dag instance id to archive
40+
--
41+
-------------------------------------------------------------------------------
42+
DECLARE
43+
_cnt INT;
44+
BEGIN
45+
RAISE NOTICE '=============';
46+
RAISE NOTICE ' START BATCH';
47+
RAISE NOTICE '=============';
48+
49+
CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS
50+
SELECT di.id
51+
FROM dag_instance di
52+
WHERE di.status NOT IN ('Running', 'InQueue')
53+
AND di.id >= i_min_id
54+
AND di.id <= i_max_id;
55+
GET DIAGNOSTICS _cnt = ROW_COUNT;
56+
RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id;
57+
58+
INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by)
59+
SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by
60+
FROM dag_instance di
61+
JOIN dag_instance_ids_to_archive diita ON di.id = diita.id
62+
ON CONFLICT (id) DO NOTHING;
63+
GET DIAGNOSTICS _cnt = ROW_COUNT;
64+
RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id;
65+
66+
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics)
67+
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics
68+
FROM job_instance ji
69+
JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id
70+
ON CONFLICT (id) DO NOTHING;
71+
GET DIAGNOSTICS _cnt = ROW_COUNT;
72+
RAISE NOTICE 'Archived % job instances', _cnt;
73+
74+
INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload)
75+
SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload
76+
FROM "event" e
77+
JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id
78+
ON CONFLICT (id) DO NOTHING;
79+
GET DIAGNOSTICS _cnt = ROW_COUNT;
80+
RAISE NOTICE 'Archived % events', _cnt;
81+
82+
RAISE NOTICE 'Going to delete dag instances';
83+
84+
DELETE FROM job_instance ji
85+
USING dag_instance_ids_to_archive diita
86+
WHERE ji.dag_instance_id = diita.id;
87+
GET DIAGNOSTICS _cnt = ROW_COUNT;
88+
RAISE NOTICE 'Deleted % job instances', _cnt;
89+
90+
DELETE FROM "event" e
91+
USING dag_instance_ids_to_archive diita
92+
WHERE e.dag_instance_id = diita.id;
93+
GET DIAGNOSTICS _cnt = ROW_COUNT;
94+
RAISE NOTICE 'Deleted % events', _cnt;
95+
96+
DELETE FROM dag_instance di
97+
USING dag_instance_ids_to_archive diita
98+
WHERE di.id = diita.id;
99+
GET DIAGNOSTICS _cnt = ROW_COUNT;
100+
RAISE NOTICE 'Deleted % dag instances', _cnt;
101+
102+
DROP TABLE dag_instance_ids_to_archive;
103+
104+
RAISE NOTICE '=============';
105+
RAISE NOTICE ' END BATCH';
106+
RAISE NOTICE '=============';
107+
END;
108+
$$ LANGUAGE plpgsql;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
databaseChangeLog:
17+
- changeSet:
18+
id: v0.5.20.add-diagnostics-field
19+
logicalFilePath: v0.5.20.add-diagnostics-field
20+
21+
context: default
22+
changes:
23+
- sqlFile:
24+
relativeToChangelogFile: true
25+
path: v0.5.20.add-diagnostics-field.sql
26+
splitStatements: false
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
create table if not exists housekeepinglock
17+
(
18+
locked boolean not null,
19+
started_at timestamp
20+
);
21+
22+
insert into "housekeepinglock" ("locked", "started_at")
23+
values (false, null);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
databaseChangeLog:
17+
- changeSet:
18+
id: v0.5.20.add-housekeeping-lock-table
19+
logicalFilePath: v0.5.20.add-housekeeping-lock-table
20+
21+
context: default
22+
changes:
23+
- sqlFile:
24+
relativeToChangelogFile: true
25+
path: v0.5.20.add-housekeeping-lock-table.sql

src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ case class JobInstance(
2323
jobName: String,
2424
jobParameters: JobInstanceParameters,
2525
jobStatus: JobStatus,
26+
diagnostics: Option[String] = None,
2627
executorJobId: Option[String],
2728
applicationId: Option[String],
2829
stepId: Option[String],

src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ trait JobInstanceTable {
3030
def jobName: Rep[String] = column[String]("job_name")
3131
def jobParameters: Rep[JobInstanceParameters] = column[JobInstanceParameters]("job_parameters", O.SqlType("JSONB"))
3232
def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status")
33+
def diagnostics: Rep[Option[String]] = column[Option[String]]("diagnostics")
3334
def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id")
3435
def applicationId: Rep[Option[String]] = column[Option[String]]("application_id")
3536
def stepId: Rep[Option[String]] = column[Option[String]]("step_id")
@@ -46,6 +47,7 @@ trait JobInstanceTable {
4647
jobName,
4748
jobParameters,
4849
jobStatus,
50+
diagnostics,
4951
executorJobId,
5052
applicationId,
5153
stepId,

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark
1717

1818
import play.api.libs.json.{Json, OFormat}
1919

20-
case class App(id: String, name: String, state: String, finalStatus: String)
20+
case class App(id: String, name: String, state: String, finalStatus: String, diagnostics: String)
2121

2222
case class Apps(app: Seq[App])
2323

0 commit comments

Comments
 (0)