Skip to content
Merged
6 changes: 6 additions & 0 deletions QualityControl/lib/QCModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ export const setupQcModel = async (ws, eventEmitter) => {
const intervalsService = new IntervalsService();

const bookkeepingService = new BookkeepingService(config.bookkeeping);
try {
await bookkeepingService.connect();
} catch (error) {
logger.errorMessage(`Failed connecting to Bookkeeping: ${error.message || error}`);
}

const filterService = new FilterService(bookkeepingService, config);
const runModeService = new RunModeService(config.bookkeeping, bookkeepingService, ccdbService, eventEmitter, ws);
const objectController = new ObjectController(qcObjectService, runModeService, qcdbDownloadService);
Expand Down
44 changes: 42 additions & 2 deletions QualityControl/lib/services/BookkeepingService.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const GET_DATA_PASSES_PATH = '/api/dataPasses';

const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/bkp-service`;

const RECENT_RUN_THRESHOLD_MS = 1 * 24 * 60 * 60 * 1000; // -1 day in milliseconds

/**
* BookkeepingService class to be used to retrieve data from Bookkeeping
*/
Expand Down Expand Up @@ -75,12 +77,12 @@ export class BookkeepingService {
*/
async connect() {
if (!this.validateConfig()) {
this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
return;
}
this.active = await this.simulateConnection();
if (!this.active) {
this._logger.infoMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
this._logger.warnMessage(`Bookkeeping service will not be used. Reason: ${this.error}`);
}
}

Expand Down Expand Up @@ -200,6 +202,44 @@ export class BookkeepingService {
}
}

/**
* Retrieves runs that are currently ongoing (started within the last \@see {RECENT_RUN_THRESHOLD_MS}
* but have not yet ended).
* @returns {Promise<Array<object>|undefined>} A promise that resolves to an array of run objects,
* or undefined if the service is inactive, no data is found, or an error occurs
*/
async retrieveOngoingRuns() {
if (!this.active) {
return;
}

const timestamp = Date.now() - RECENT_RUN_THRESHOLD_MS;

const queryParams = `page[offset]=0&page[limit]=20&filter[o2start][from]=${timestamp}&token=${this._token}`;

try {
const { data } = await httpGetJson(
this._hostname,
this._port,
`${GET_RUN_PATH}?${queryParams}`,
{
protocol: this._protocol,
rejectUnauthorized: false,
},
);

if (data.length === 0) {
return [];
}

return data.filter((run) => !run.timeO2End);
} catch (error) {
const msg = error?.message ?? String(error);
this._logger.errorMessage(msg);
return;
}
}

/**
* Retrieves the information about the detectors from the Bookkeeping service.
* @returns {Promise<object[]>} Array of detector summaries.
Expand Down
1 change: 0 additions & 1 deletion QualityControl/lib/services/FilterService.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export class FilterService {
* @returns {Promise<void>} - resolves when the filter service is initialized
*/
async initFilters() {
await this._bookkeepingService.connect();
await this.getRunTypes();
await this._initializeDetectors();
await this.getDataPasses();
Expand Down
44 changes: 35 additions & 9 deletions QualityControl/lib/services/RunModeService.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export class RunModeService {

this._logger = LogManager.getLogger(`${process.env.npm_config_log_label ?? 'qcg'}/run-mode-service`);
this._listenToEvents();
this._fetchOnGoingRunsAtStart();
}

/**
Expand Down Expand Up @@ -115,6 +116,22 @@ export class RunModeService {
this._eventEmitter.on(EmitterKeys.RUN_TRACK, (runEvent) => this._onRunTrackEvent(runEvent));
}

/**
* Fetches the already ongoing runs from Bookkeeping service, becaue Kafka only sends an event at START of run.
* @returns {Promise<void>}
*/
async _fetchOnGoingRunsAtStart() {
const ongoingRuns = await this._bookkeepingService.retrieveOngoingRuns();
if (!ongoingRuns || ongoingRuns.length === 0) {
this._logger.infoMessage('No ongoing runs detected at server start');
return;
}

const runNumbers = ongoingRuns.map(({ runNumber }) => runNumber);
const tasks = runNumbers.map(async (runNumber) => await this._initializeRunData(runNumber));
await Promise.all(tasks);
}

/**
* Handles run track events emitted by the event emitter.
* Updates the ongoing runs cache based on the transition type.
Expand All @@ -125,15 +142,7 @@ export class RunModeService {
*/
async _onRunTrackEvent({ runNumber, transition }) {
if (transition === Transition.START_ACTIVITY) {
let rawPaths = [];
try {
rawPaths = await this._dataService.getObjectsLatestVersionList({
filters: { RunNumber: runNumber },
});
} catch (error) {
this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`);
}
this._ongoingRuns.set(runNumber, rawPaths);
await this._initializeRunData(runNumber);

const wsMessage = new WebSocketMessage();
wsMessage.command = `${EmitterKeys.RUN_TRACK}:${Transition.START_ACTIVITY}`;
Expand All @@ -146,6 +155,23 @@ export class RunModeService {
}
}

/**
* Fetches the latest object versions for each run and populates the local `ongoingRuns` map.
* @param {number} runNumber - The run number associated with the event.
* @returns {Promise<void>}
*/
async _initializeRunData(runNumber) {
let rawPaths = [];
try {
rawPaths = await this._dataService.getObjectsLatestVersionList({
filters: { RunNumber: runNumber },
});
} catch (error) {
this._logger.errorMessage(`Error fetching initial paths for run ${runNumber}: ${error.message || error}`);
}
this._ongoingRuns.set(runNumber, rawPaths);
}

/**
* Returns the last time the ongoing runs cache was refreshed.
* @returns {number} - Timestamp of the last refresh. (ms)
Expand Down
63 changes: 63 additions & 0 deletions QualityControl/test/lib/services/BookkeepingService.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -529,5 +529,68 @@ export const bookkeepingServiceTestSuite = async () => {
strictEqual(result.length, 0);
});
});
suite('`retrieveOngoingRuns()` tests', () => {
let bkpService = null;
const runsPathPattern = new RegExp(`/api/runs\\?.*token=${VALID_CONFIG.bookkeeping.token}`);

beforeEach(() => {
bkpService = new BookkeepingService(VALID_CONFIG.bookkeeping);
bkpService.validateConfig();
bkpService.active = true;
});

afterEach(() => {
nock.cleanAll();
});

test('should return all already ongoing runs', async () => {
const mockResponse = {
data: [
{
runNumber: 1,
timeO2End: undefined,
},
{
runNumber: 2,
timeO2End: 1,
},
{
runNumber: 3,
timeO2End: undefined,
},
],
};

nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse);
const ongoingRuns = await bkpService.retrieveOngoingRuns();
strictEqual(ongoingRuns.length, 2);
deepStrictEqual(ongoingRuns.map((run) => run.runNumber), [1, 3]);
});

test('should return an empty array when data when no runs are retrieved', async () => {
const mockResponse = {
data: [],
};

nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse);
const ongoingRuns = await bkpService.retrieveOngoingRuns();
strictEqual(ongoingRuns.length, 0);
});

test('should return an empty array when all runs have an end time specified', async () => {
const mockResponse = {
data: [
{
runNumber: 99,
timeO2End: 1,
},
],
};

nock(VALID_CONFIG.bookkeeping.url).get(runsPathPattern).reply(200, mockResponse);
const ongoingRuns = await bkpService.retrieveOngoingRuns();
strictEqual(ongoingRuns.length, 0);
});
});
});
};
16 changes: 16 additions & 0 deletions QualityControl/test/lib/services/RunModeService.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export const runModeServiceTestSuite = async () => {
beforeEach(() => {
bookkeepingService = {
retrieveRunInformation: sinon.stub(),
retrieveOngoingRuns: sinon.stub(),
};

dataService = {
Expand Down Expand Up @@ -134,6 +135,21 @@ export const runModeServiceTestSuite = async () => {
});
});

suite('`_fetchOnGoingRunsAtStart` tests', () => {
test('should populate ongoing runs on startup', async () => {
const runNumber = 1;
const mockRun = { runNumber };
const mockPaths = [{ path: '/run/path1' }];

bookkeepingService.retrieveOngoingRuns.resolves([mockRun]);

dataService.getObjectsLatestVersionList.resolves(mockPaths);

await runModeService._fetchOnGoingRunsAtStart();
strictEqual(runModeService._ongoingRuns.has(runNumber), true);
});
});

suite('_onRunTrackEvent - test suite', () => {
test('should correctly parse event to RUN_TRACK and update ongoing runs map', async () => {
const runEvent = { runNumber: 1234, transition: 'START_ACTIVITY' };
Expand Down
Loading