diff --git a/QualityControl/lib/QCModel.js b/QualityControl/lib/QCModel.js index b057be390..12b621456 100644 --- a/QualityControl/lib/QCModel.js +++ b/QualityControl/lib/QCModel.js @@ -117,6 +117,12 @@ export const setupQcModel = async (ws, eventEmitter) => { const intervalsService = new IntervalsService(); const bookkeepingService = new BookkeepingService(config.bookkeeping); + try { + await bookkeepingService.connect(); + } catch (error) { + logger.errorMessage(`Failed connecting to Bookkeeping: ${error.message || error}`); + } + const filterService = new FilterService(bookkeepingService, config); const runModeService = new RunModeService(config.bookkeeping, bookkeepingService, ccdbService, eventEmitter, ws); const objectController = new ObjectController(qcObjectService, runModeService, qcdbDownloadService); diff --git a/QualityControl/lib/services/BookkeepingService.js b/QualityControl/lib/services/BookkeepingService.js index 493979c6d..3525e7c10 100644 --- a/QualityControl/lib/services/BookkeepingService.js +++ b/QualityControl/lib/services/BookkeepingService.js @@ -25,6 +25,8 @@ const GET_DATA_PASSES_PATH = '/api/dataPasses'; const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/bkp-service`; +const RECENT_RUN_THRESHOLD_MS = 1 * 24 * 60 * 60 * 1000; // -1 day in milliseconds + /** * BookkeepingService class to be used to retrieve data from Bookkeeping */ @@ -75,12 +77,12 @@ export class BookkeepingService { */ async connect() { if (!this.validateConfig()) { - this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); + this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); return; } this.active = await this.simulateConnection(); if (!this.active) { - this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); + this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`); } } @@ -200,6 +202,44 @@ export class BookkeepingService { } } + /** + * Retrieves runs that are currently ongoing (started within the last \@see {RECENT_RUN_THRESHOLD_MS} + * but have not yet ended). + * @returns {Promise|undefined>} A promise that resolves to an array of run objects, + * or undefined if the service is inactive, no data is found, or an error occurs + */ + async retrieveOngoingRuns() { + if (!this.active) { + return; + } + + const timestamp = Date.now() - RECENT_RUN_THRESHOLD_MS; + + const queryParams = `page[offset]=0&page[limit]=20&filter[o2start][from]=${timestamp}&token=${this._token}`; + + try { + const { data } = await httpGetJson( + this._hostname, + this._port, + `${GET_RUN_PATH}?${queryParams}`, + { + protocol: this._protocol, + rejectUnauthorized: false, + }, + ); + + if (data.length === 0) { + return []; + } + + return data.filter((run) => !run.timeO2End); + } catch (error) { + const msg = error?.message ?? String(error); + this._logger.errorMessage(msg); + return; + } + } + /** * Retrieves the information about the detectors from the Bookkeeping service. * @returns {Promise} Array of detector summaries. diff --git a/QualityControl/lib/services/FilterService.js b/QualityControl/lib/services/FilterService.js index 655c12f37..fa80e6bba 100644 --- a/QualityControl/lib/services/FilterService.js +++ b/QualityControl/lib/services/FilterService.js @@ -49,7 +49,6 @@ export class FilterService { * @returns {Promise} - resolves when the filter service is initialized */ async initFilters() { - await this._bookkeepingService.connect(); await this.getRunTypes(); await this._initializeDetectors(); await this.getDataPasses(); diff --git a/QualityControl/lib/services/RunModeService.js b/QualityControl/lib/services/RunModeService.js index ea73b558b..1294289b2 100644 --- a/QualityControl/lib/services/RunModeService.js +++ b/QualityControl/lib/services/RunModeService.js @@ -53,6 +53,7 @@ export class RunModeService { this._logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'qcg'}/run-mode-service`); this._listenToEvents(); + this._fetchOnGoingRunsAtStart(); } /** @@ -115,6 +116,22 @@ export class RunModeService { this._eventEmitter.on(EmitterKeys.RUN_TRACK, (runEvent) => this._onRunTrackEvent(runEvent)); } + /** + * Fetches the already ongoing runs from Bookkeeping service, becaue Kafka only sends an event at START of run. + * @returns {Promise} + */ + async _fetchOnGoingRunsAtStart() { + const ongoingRuns = await this._bookkeepingService.retrieveOngoingRuns(); + if (!ongoingRuns || ongoingRuns.length === 0) { + this._logger.infoMessage('No ongoing runs detected at server start'); + return; + } + + const runNumbers = ongoingRuns.map(({ runNumber }) => runNumber); + const tasks = runNumbers.map(async (runNumber) => await this._initializeRunData(runNumber)); + await Promise.all(tasks); + } + /** * Handles run track events emitted by the event emitter. * Updates the ongoing runs cache based on the transition type. @@ -125,15 +142,7 @@ export class RunModeService { */ async _onRunTrackEvent({ runNumber, transition }) { if (transition === Transition.START_ACTIVITY) { - let rawPaths = []; - try { - rawPaths = await this._dataService.getObjectsLatestVersionList({ - filters: { RunNumber: runNumber }, - }); - } catch (error) { - this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`); - } - this._ongoingRuns.set(runNumber, rawPaths); + await this._initializeRunData(runNumber); const wsMessage = new WebSocketMessage(); wsMessage.command = `${EmitterKeys.RUN_TRACK}:${Transition.START_ACTIVITY}`; @@ -146,6 +155,23 @@ export class RunModeService { } } + /** + * Fetches the latest object versions for each run and populates the local `ongoingRuns` map. + * @param {number} runNumber - The run number associated with the event. + * @returns {Promise} + */ + async _initializeRunData(runNumber) { + let rawPaths = []; + try { + rawPaths = await this._dataService.getObjectsLatestVersionList({ + filters: { RunNumber: runNumber }, + }); + } catch (error) { + this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`); + } + this._ongoingRuns.set(runNumber, rawPaths); + } + /** * Returns the last time the ongoing runs cache was refreshed. * @returns {number} - Timestamp of the last refresh. (ms) diff --git a/QualityControl/test/lib/services/BookkeepingService.test.js b/QualityControl/test/lib/services/BookkeepingService.test.js index b8b09cb52..d60869ebd 100644 --- a/QualityControl/test/lib/services/BookkeepingService.test.js +++ b/QualityControl/test/lib/services/BookkeepingService.test.js @@ -529,5 +529,68 @@ export const bookkeepingServiceTestSuite = async () => { strictEqual(result.length, 0); }); }); + suite('`retrieveOngoingRuns()` tests', () => { + let bkpService = null; + const runsPathPattern = new RegExp(`/api/runs\\?.*token=${VALID_CONFIG.bookkeeping.token}`); + + beforeEach(() => { + bkpService = new BookkeepingService(VALID_CONFIG.bookkeeping); + bkpService.validateConfig(); + bkpService.active = true; + }); + + afterEach(() => { + nock.cleanAll(); + }); + + test('should return all already ongoing runs', async () => { + const mockResponse = { + data: [ + { + runNumber: 1, + timeO2End: undefined, + }, + { + runNumber: 2, + timeO2End: 1, + }, + { + runNumber: 3, + timeO2End: undefined, + }, + ], + }; + + nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse); + const ongoingRuns = await bkpService.retrieveOngoingRuns(); + strictEqual(ongoingRuns.length, 2); + deepStrictEqual(ongoingRuns.map((run) => run.runNumber), [1, 3]); + }); + + test('should return an empty array when data when no runs are retrieved', async () => { + const mockResponse = { + data: [], + }; + + nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse); + const ongoingRuns = await bkpService.retrieveOngoingRuns(); + strictEqual(ongoingRuns.length, 0); + }); + + test('should return an empty array when all runs have an end time specified', async () => { + const mockResponse = { + data: [ + { + runNumber: 99, + timeO2End: 1, + }, + ], + }; + + nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse); + const ongoingRuns = await bkpService.retrieveOngoingRuns(); + strictEqual(ongoingRuns.length, 0); + }); + }); }); }; diff --git a/QualityControl/test/lib/services/RunModeService.test.js b/QualityControl/test/lib/services/RunModeService.test.js index 2fe54d9b0..c28f81341 100644 --- a/QualityControl/test/lib/services/RunModeService.test.js +++ b/QualityControl/test/lib/services/RunModeService.test.js @@ -34,6 +34,7 @@ export const runModeServiceTestSuite = async () => { beforeEach(() => { bookkeepingService = { retrieveRunInformation: sinon.stub(), + retrieveOngoingRuns: sinon.stub(), }; dataService = { @@ -134,6 +135,21 @@ export const runModeServiceTestSuite = async () => { }); }); + suite('`_fetchOnGoingRunsAtStart` tests', () => { + test('should populate ongoing runs on startup', async () => { + const runNumber = 1; + const mockRun = { runNumber }; + const mockPaths = [{ path: '/run/path1' }]; + + bookkeepingService.retrieveOngoingRuns.resolves([mockRun]); + + dataService.getObjectsLatestVersionList.resolves(mockPaths); + + await runModeService._fetchOnGoingRunsAtStart(); + strictEqual(runModeService._ongoingRuns.has(runNumber), true); + }); + }); + suite('_onRunTrackEvent - test suite', () => { test('should correctly parse event to RUN_TRACK and update ongoing runs map', async () => { const runEvent = { runNumber: 1234, transition: 'START_ACTIVITY' };