From 5f45fb4f3dedce9fd4bede18f6a9146a05e91583 Mon Sep 17 00:00:00 2001 From: Alex Janson Date: Thu, 15 Jan 2026 11:46:32 +0100 Subject: [PATCH 1/7] feat: add fetching of already ongoing runs at server start --- QualityControl/lib/QCModel.js | 1 + .../lib/services/BookkeepingService.js | 39 +++++++++++++++ QualityControl/lib/services/FilterService.js | 1 - QualityControl/lib/services/RunModeService.js | 49 +++++++++++++++---- 4 files changed, 80 insertions(+), 10 deletions(-) diff --git a/QualityControl/lib/QCModel.js b/QualityControl/lib/QCModel.js index aa5364af6..a1faae493 100644 --- a/QualityControl/lib/QCModel.js +++ b/QualityControl/lib/QCModel.js @@ -116,6 +116,7 @@ export const setupQcModel = async (eventEmitter) => { const intervalsService = new IntervalsService(); const bookkeepingService = new BookkeepingService(config.bookkeeping); + await bookkeepingService.connect(); const filterService = new FilterService(bookkeepingService, config); const runModeService = new RunModeService(config.bookkeeping, bookkeepingService, ccdbService, eventEmitter); const objectController = new ObjectController(qcObjectService, runModeService, qcdbDownloadService); diff --git a/QualityControl/lib/services/BookkeepingService.js b/QualityControl/lib/services/BookkeepingService.js index f5d9f6353..f8c412665 100644 --- a/QualityControl/lib/services/BookkeepingService.js +++ b/QualityControl/lib/services/BookkeepingService.js @@ -23,6 +23,8 @@ const GET_RUN_PATH = '/api/runs'; const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/bkp-service`; +const RECENT_RUN_THRESHOLD_MS = 2 * 24 * 60 * 60 * 1000; // -2 days in milliseconds + /** * BookkeepingService class to be used to retrieve data from Bookkeeping */ @@ -181,6 +183,43 @@ export class BookkeepingService { } } + /** + * Retrieves runs that are currently ongoing (started within the last 48 hours but have not yet ended). + * @returns {Promise|undefined>} A promise that resolves to an array of run objects, + * or undefined if the service is inactive, no data is found, or an error occurs + */ + async retrieveOnGoingRuns() { + if (!this.active) { + return; + } + + const timestamp = Date.now() - RECENT_RUN_THRESHOLD_MS; + + const queryParams = `page[offset]=0&page[limit]=100&filter[o2start][from]=${timestamp}&token=${this._token}`; + + try { + const { data } = await httpGetJson( + this._hostname, + this._port, + `${GET_RUN_PATH}?${queryParams}`, + { + protocol: this._protocol, + rejectUnauthorized: false, + }, + ); + + if (data.length === 0) { + return []; + } + + return data.filter((run) => !run.timeO2End); + } catch (error) { + const msg = error?.message ?? String(error); + this._logger.errorMessage(msg); + return; + } + } + /** * Helper method to construct a URL path with the required authentication token. * Appends the service's token as a query parameter to the provided path. diff --git a/QualityControl/lib/services/FilterService.js b/QualityControl/lib/services/FilterService.js index d6da52c57..9f31b88dc 100644 --- a/QualityControl/lib/services/FilterService.js +++ b/QualityControl/lib/services/FilterService.js @@ -46,7 +46,6 @@ export class FilterService { * @returns {Promise} - resolves when the filter service is initialized */ async initFilters() { - await this._bookkeepingService.connect(); await this.getRunTypes(); } diff --git a/QualityControl/lib/services/RunModeService.js b/QualityControl/lib/services/RunModeService.js index 4c773410a..67dcf203a 100644 --- a/QualityControl/lib/services/RunModeService.js +++ b/QualityControl/lib/services/RunModeService.js @@ -50,6 +50,7 @@ export class RunModeService { this._logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'qcg'}/run-mode-service`); this._listenToEvents(); + this._fetchOnGoingRunsAtStart(); } /** @@ -112,6 +113,27 @@ export class RunModeService { this._eventEmitter.on(EmitterKeys.RUN_TRACK, (runEvent) => this._onRunTrackEvent(runEvent)); } + /** + * Fetches the already ongoing runs from Bookkeeping service, becaue Kafka only sends an event at START of run. + * @returns {Promise} + */ + async _fetchOnGoingRunsAtStart() { + if (!this._bookkeepingService.active) { + return; + } + + const alreadyOngoingRuns = await this._bookkeepingService.retrieveOnGoingRuns(); + if (!alreadyOngoingRuns || alreadyOngoingRuns.length === 0) { + this._logger.infoMessage('No already ongoing runs detected at server start'); + return; + } + + const runNumbers = alreadyOngoingRuns.map((run) => run.runNumber); + const tasks = runNumbers.map(async (runNumber) => await this._initializeRunData(runNumber)); + await Promise.all(tasks); + await this.refreshRunsCache(); + } + /** * Handles run track events emitted by the event emitter. * Updates the ongoing runs cache based on the transition type. @@ -122,20 +144,29 @@ export class RunModeService { */ async _onRunTrackEvent({ runNumber, transition }) { if (transition === Transition.START_ACTIVITY) { - let rawPaths = []; - try { - rawPaths = await this._dataService.getObjectsLatestVersionList({ - filters: { RunNumber: runNumber }, - }); - } catch (error) { - this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`); - } - this._ongoingRuns.set(runNumber, rawPaths); + await this._initializeRunData(runNumber); } else if (transition === Transition.STOP_ACTIVITY) { this._ongoingRuns.delete(runNumber); } } + /** + * Fetches the latest object versions for each run and populates the local `ongoingRuns` map. + * @param {number} runNumber - The run number associated with the event. + * @returns {Promise} + */ + async _initializeRunData(runNumber) { + let rawPaths = []; + try { + rawPaths = await this._dataService.getObjectsLatestVersionList({ + filters: { RunNumber: runNumber }, + }); + } catch (error) { + this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`); + } + this._ongoingRuns.set(runNumber, rawPaths); + } + /** * Returns the last time the ongoing runs cache was refreshed. * @returns {number} - Timestamp of the last refresh. (ms) From 882fa2bb925f93fcc37b63e5f4746663600d4f5f Mon Sep 17 00:00:00 2001 From: Alex Janson Date: Thu, 15 Jan 2026 12:51:21 +0100 Subject: [PATCH 2/7] test: add tests for fetching ongoing runs at server start --- .../lib/services/BookkeepingService.js | 2 +- QualityControl/lib/services/RunModeService.js | 6 +- .../lib/services/BookkeepingService.test.js | 64 +++++++++++++++++++ .../test/lib/services/RunModeService.test.js | 16 +++++ 4 files changed, 82 insertions(+), 6 deletions(-) diff --git a/QualityControl/lib/services/BookkeepingService.js b/QualityControl/lib/services/BookkeepingService.js index f8c412665..987d0f133 100644 --- a/QualityControl/lib/services/BookkeepingService.js +++ b/QualityControl/lib/services/BookkeepingService.js @@ -188,7 +188,7 @@ export class BookkeepingService { * @returns {Promise|undefined>} A promise that resolves to an array of run objects, * or undefined if the service is inactive, no data is found, or an error occurs */ - async retrieveOnGoingRuns() { + async retrieveOngoingRuns() { if (!this.active) { return; } diff --git a/QualityControl/lib/services/RunModeService.js b/QualityControl/lib/services/RunModeService.js index 67dcf203a..8d65bfae9 100644 --- a/QualityControl/lib/services/RunModeService.js +++ b/QualityControl/lib/services/RunModeService.js @@ -118,11 +118,7 @@ export class RunModeService { * @returns {Promise} */ async _fetchOnGoingRunsAtStart() { - if (!this._bookkeepingService.active) { - return; - } - - const alreadyOngoingRuns = await this._bookkeepingService.retrieveOnGoingRuns(); + const alreadyOngoingRuns = await this._bookkeepingService.retrieveOngoingRuns(); if (!alreadyOngoingRuns || alreadyOngoingRuns.length === 0) { this._logger.infoMessage('No already ongoing runs detected at server start'); return; diff --git a/QualityControl/test/lib/services/BookkeepingService.test.js b/QualityControl/test/lib/services/BookkeepingService.test.js index 9b1c0de5e..08d07a321 100644 --- a/QualityControl/test/lib/services/BookkeepingService.test.js +++ b/QualityControl/test/lib/services/BookkeepingService.test.js @@ -332,5 +332,69 @@ export const bookkeepingServiceTestSuite = async () => { strictEqual(runStatus, RunStatus.BOOKKEEPING_UNAVAILABLE); }); }); + + suite('`retrieveOngoingRuns()` tests', () => { + let bkpService = null; + const runsPathPattern = new RegExp(`/api/runs\\?.*token=${VALID_CONFIG.bookkeeping.token}`); + + beforeEach(() => { + bkpService = new BookkeepingService(VALID_CONFIG.bookkeeping); + bkpService.validateConfig(); + bkpService.active = true; + }); + + afterEach(() => { + nock.cleanAll(); + }); + + test('should return all already ongoing runs', async () => { + const mockResponse = { + data: [ + { + runNumber: 1, + timeO2End: undefined, + }, + { + runNumber: 2, + timeO2End: 1, + }, + { + runNumber: 3, + timeO2End: undefined, + }, + ], + }; + + nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse); + const ongoingRuns = await bkpService.retrieveOngoingRuns(); + strictEqual(ongoingRuns.length, 2); + deepStrictEqual(ongoingRuns.map((run) => run.runNumber), [1, 3]); + }); + + test('should return an empty array when data when no runs are retrieved', async () => { + const mockResponse = { + data: [], + }; + + nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse); + const ongoingRuns = await bkpService.retrieveOngoingRuns(); + strictEqual(ongoingRuns.length, 0); + }); + + test('should return an empty array when all runs have an end time specified', async () => { + const mockResponse = { + data: [ + { + runNumber: 99, + timeO2End: 1, + }, + ], + }; + + nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse); + const ongoingRuns = await bkpService.retrieveOngoingRuns(); + strictEqual(ongoingRuns.length, 0); + }); + }); }); }; diff --git a/QualityControl/test/lib/services/RunModeService.test.js b/QualityControl/test/lib/services/RunModeService.test.js index 5802005a4..f53594543 100644 --- a/QualityControl/test/lib/services/RunModeService.test.js +++ b/QualityControl/test/lib/services/RunModeService.test.js @@ -32,6 +32,7 @@ export const runModeServiceTestSuite = async () => { beforeEach(() => { bookkeepingService = { retrieveRunInformation: sinon.stub(), + retrieveOngoingRuns: sinon.stub(), }; dataService = { @@ -128,6 +129,21 @@ export const runModeServiceTestSuite = async () => { }); }); + suite('`_fetchOnGoingRunsAtStart` tests', () => { + test('should populate ongoing runs on startup', async () => { + const runNumber = 1; + const mockRun = { runNumber }; + const mockPaths = [{ path: '/run/path1' }]; + + bookkeepingService.retrieveOngoingRuns.resolves([mockRun]); + + dataService.getObjectsLatestVersionList.resolves(mockPaths); + + await runModeService._fetchOnGoingRunsAtStart(); + strictEqual(runModeService._ongoingRuns.has(runNumber), true); + }); + }); + suite('_onRunTrackEvent - test suite', () => { test('should correctly parse event to RUN_TRACK and update ongoing runs map', async () => { const runEvent = { runNumber: 1234, transition: 'START_ACTIVITY' }; From 04d785b5f7c09e8d1a9874e287ad2727df7e648b Mon Sep 17 00:00:00 2001 From: Alex Janson Date: Thu, 15 Jan 2026 17:04:55 +0100 Subject: [PATCH 3/7] fix: add try catch for bookkeeping service connect --- QualityControl/lib/QCModel.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/QualityControl/lib/QCModel.js b/QualityControl/lib/QCModel.js index b4c42a9c1..fe17513b7 100644 --- a/QualityControl/lib/QCModel.js +++ b/QualityControl/lib/QCModel.js @@ -117,7 +117,12 @@ export const setupQcModel = async (ws, eventEmitter) => { const intervalsService = new IntervalsService(); const bookkeepingService = new BookkeepingService(config.bookkeeping); - await bookkeepingService.connect(); + try { + await bookkeepingService.connect(); + } catch (error) { + logger.errorMessage(`Failed connecting to Bookkeeping: ${error.message || error}`); + } + const filterService = new FilterService(bookkeepingService, config); const runModeService = new RunModeService(config.bookkeeping, bookkeepingService, ccdbService, eventEmitter, ws); const objectController = new ObjectController(qcObjectService, runModeService, qcdbDownloadService); From b0a4b557ada42e1af190943d6ace95dff19464d7 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Sat, 17 Jan 2026 20:21:10 +0100 Subject: [PATCH 4/7] Reduce query to BKP for ongoing runs --- QualityControl/lib/services/BookkeepingService.js | 4 ++-- QualityControl/lib/services/RunModeService.js | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/QualityControl/lib/services/BookkeepingService.js b/QualityControl/lib/services/BookkeepingService.js index 8f8841e73..e20353e18 100644 --- a/QualityControl/lib/services/BookkeepingService.js +++ b/QualityControl/lib/services/BookkeepingService.js @@ -25,7 +25,7 @@ const GET_DATA_PASSES_PATH = '/api/dataPasses'; const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/bkp-service`; -const RECENT_RUN_THRESHOLD_MS = 2 * 24 * 60 * 60 * 1000; // -2 days in milliseconds +const RECENT_RUN_THRESHOLD_MS = 1 * 24 * 60 * 60 * 1000; // -1 day in milliseconds /** * BookkeepingService class to be used to retrieve data from Bookkeeping @@ -214,7 +214,7 @@ export class BookkeepingService { const timestamp = Date.now() - RECENT_RUN_THRESHOLD_MS; - const queryParams = `page[offset]=0&page[limit]=100&filter[o2start][from]=${timestamp}&token=${this._token}`; + const queryParams = `page[offset]=0&page[limit]=20&filter[o2start][from]=${timestamp}&token=${this._token}`; try { const { data } = await httpGetJson( diff --git a/QualityControl/lib/services/RunModeService.js b/QualityControl/lib/services/RunModeService.js index f43819580..90b5fee54 100644 --- a/QualityControl/lib/services/RunModeService.js +++ b/QualityControl/lib/services/RunModeService.js @@ -123,7 +123,7 @@ export class RunModeService { async _fetchOnGoingRunsAtStart() { const alreadyOngoingRuns = await this._bookkeepingService.retrieveOngoingRuns(); if (!alreadyOngoingRuns || alreadyOngoingRuns.length === 0) { - this._logger.infoMessage('No already ongoing runs detected at server start'); + this._logger.infoMessage('No ongoing runs detected at server start'); return; } From 6c96b0f777be841b15ef0f539b3aa7963774711e Mon Sep 17 00:00:00 2001 From: George Raduta Date: Sat, 17 Jan 2026 20:34:28 +0100 Subject: [PATCH 5/7] Cache refresh not needed in this case --- QualityControl/lib/services/RunModeService.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/QualityControl/lib/services/RunModeService.js b/QualityControl/lib/services/RunModeService.js index 90b5fee54..154097e33 100644 --- a/QualityControl/lib/services/RunModeService.js +++ b/QualityControl/lib/services/RunModeService.js @@ -121,16 +121,15 @@ export class RunModeService { * @returns {Promise} */ async _fetchOnGoingRunsAtStart() { - const alreadyOngoingRuns = await this._bookkeepingService.retrieveOngoingRuns(); - if (!alreadyOngoingRuns || alreadyOngoingRuns.length === 0) { + const ongoingRuns = await this._bookkeepingService.retrieveOngoingRuns(); + if (!ongoingRuns?.length === 0) { this._logger.infoMessage('No ongoing runs detected at server start'); return; } - const runNumbers = alreadyOngoingRuns.map((run) => run.runNumber); + const runNumbers = ongoingRuns.map((run) => run.runNumber); const tasks = runNumbers.map(async (runNumber) => await this._initializeRunData(runNumber)); await Promise.all(tasks); - await this.refreshRunsCache(); } /** From 8dcc2dcb5c374e61df9c6c9cf66e29de3597be60 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Sat, 17 Jan 2026 20:34:34 +0100 Subject: [PATCH 6/7] Small improvement on logging --- QualityControl/lib/services/BookkeepingService.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/QualityControl/lib/services/BookkeepingService.js b/QualityControl/lib/services/BookkeepingService.js index e20353e18..3525e7c10 100644 --- a/QualityControl/lib/services/BookkeepingService.js +++ b/QualityControl/lib/services/BookkeepingService.js @@ -77,12 +77,12 @@ export class BookkeepingService { */ async connect() { if (!this.validateConfig()) { - this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); + this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); return; } this.active = await this.simulateConnection(); if (!this.active) { - this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); + this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); } } @@ -203,7 +203,8 @@ export class BookkeepingService { } /** - * Retrieves runs that are currently ongoing (started within the last 48 hours but have not yet ended). + * Retrieves runs that are currently ongoing (started within the last \@see {RECENT_RUN_THRESHOLD_MS} + * but have not yet ended). * @returns {Promise|undefined>} A promise that resolves to an array of run objects, * or undefined if the service is inactive, no data is found, or an error occurs */ From f155a1e2228649e7fb186d324d6037b9a61d5c73 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Sat, 17 Jan 2026 20:36:00 +0100 Subject: [PATCH 7/7] Use JSON destruct --- QualityControl/lib/services/RunModeService.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/QualityControl/lib/services/RunModeService.js b/QualityControl/lib/services/RunModeService.js index 154097e33..1294289b2 100644 --- a/QualityControl/lib/services/RunModeService.js +++ b/QualityControl/lib/services/RunModeService.js @@ -122,12 +122,12 @@ export class RunModeService { */ async _fetchOnGoingRunsAtStart() { const ongoingRuns = await this._bookkeepingService.retrieveOngoingRuns(); - if (!ongoingRuns?.length === 0) { + if (!ongoingRuns || ongoingRuns.length === 0) { this._logger.infoMessage('No ongoing runs detected at server start'); return; } - const runNumbers = ongoingRuns.map((run) => run.runNumber); + const runNumbers = ongoingRuns.map(({ runNumber }) => runNumber); const tasks = runNumbers.map(async (runNumber) => await this._initializeRunData(runNumber)); await Promise.all(tasks); }