From 9a01f758f307158c2ea7f3a966a5cf4a24f7ce89 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Tue, 31 Oct 2023 14:50:33 +0100 Subject: [PATCH] [OGUI-1412] Add new CacheService and Broadcast new calibration information (#2185) * adds a new cacheservice(in-memory) that is to be used by all the other components for updates * broadcast calibration run to all clients if information is new * udpates `RunController` to initially serve data from cache and if missing, to request from Bookkeeping --- Control/lib/api.js | 27 +++--- Control/lib/common/cacheKeys.enum.js | 22 +++++ Control/lib/controllers/Run.controller.js | 22 ++++- Control/lib/services/Bookkeeping.service.js | 2 +- Control/lib/services/Broadcast.service.js | 50 +++++++++++ Control/lib/services/Cache.service.js | 82 +++++++++++++++++++ Control/lib/services/Run.service.js | 40 ++++----- Control/public/Model.js | 8 +- .../CalibrationRuns/CalibrationRuns.model.js | 9 ++ .../controllers/mocha-run-controller.test.js | 51 ++++++++++-- 10 files changed, 266 insertions(+), 47 deletions(-) create mode 100644 Control/lib/common/cacheKeys.enum.js create mode 100644 Control/lib/services/Broadcast.service.js create mode 100644 Control/lib/services/Cache.service.js diff --git a/Control/lib/api.js b/Control/lib/api.js index f2bcd2728..a6d0c1da3 100644 --- a/Control/lib/api.js +++ b/Control/lib/api.js @@ -25,22 +25,24 @@ const {WorkflowTemplateController} = require('./controllers/WorkflowTemplate.con // local services const {BookkeepingService} = require('./services/Bookkeeping.service.js'); +const {BroadcastService} = require('./services/Broadcast.service.js'); +const {CacheService} = require('./services/Cache.service.js'); const {EnvironmentService} = require('./services/Environment.service.js'); const {Intervals} = require('./services/Intervals.service.js'); -const Lock = require('./services/Lock.js'); const {RunService} = require('./services/Run.service.js'); const {StatusService} = require('./services/Status.service.js'); const {WorkflowTemplateService} = require('./services/WorkflowTemplate.service.js'); +const Lock = require('./services/Lock.js'); // web-ui services const {NotificationService, ConsulService} = require('@aliceo2/web-ui'); // AliECS Core -const GrpcProxy = require('./control-core/GrpcProxy.js'); -const ControlService = require('./control-core/ControlService.js'); -const ApricotService = require('./control-core/ApricotService.js'); const AliecsRequestHandler = require('./control-core/RequestHandler.js'); +const ApricotService = require('./control-core/ApricotService.js'); +const ControlService = require('./control-core/ControlService.js'); const EnvCache = require('./control-core/EnvCache.js'); +const GrpcProxy = require('./control-core/GrpcProxy.js'); const path = require('path'); const O2_CONTROL_PROTO_PATH = path.join(__dirname, './../protobuf/o2control.proto'); @@ -65,6 +67,8 @@ module.exports.setup = (http, ws) => { consulService = new ConsulService(config.consul); } const wsService = new WebSocketService(ws); + const broadcastService = new BroadcastService(ws); + const cacheService = new CacheService(broadcastService); const consulController = new ConsulController(consulService, config.consul); consulController.testConsulStatus(); @@ -89,9 +93,9 @@ module.exports.setup = (http, ws) => { envCache.setWs(ws); const bkpService = new BookkeepingService(config.bookkeeping ?? {}); - const runService = new RunService(bkpService, apricotService); + const runService = new RunService(bkpService, apricotService, cacheService); runService.init(); - const runController = new RunController(runService); + const runController = new RunController(runService, cacheService); const notificationService = new NotificationService(config.kafka); if (notificationService.isConfigured()) { @@ -185,8 +189,11 @@ function initializeIntervals(intervalsService, statusService, runService, bkpSer intervalsService.register(statusService.retrieveNotificationSystemStatus.bind(statusService), SERVICES_REFRESH_RATE); intervalsService.register(statusService.retrieveAliECSIntegratedInfo.bind(statusService), SERVICES_REFRESH_RATE); - intervalsService.register( - runService.retrieveCalibrationRunsGroupedByDetector.bind(runService), - CALIBRATION_RUNS_REFRESH_RATE - ); + + if (config.bookkeeping) { + intervalsService.register( + runService.retrieveCalibrationRunsGroupedByDetector.bind(runService), + CALIBRATION_RUNS_REFRESH_RATE + ); + } } diff --git a/Control/lib/common/cacheKeys.enum.js b/Control/lib/common/cacheKeys.enum.js new file mode 100644 index 000000000..2ce4a536a --- /dev/null +++ b/Control/lib/common/cacheKeys.enum.js @@ -0,0 +1,22 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. +*/ + +/** + * Keys that are used to set/get information to/from the CacheService + */ +const CacheKeys = Object.freeze({ + CALIBRATION_RUNS_BY_DETECTOR: 'CALIBRATION_RUNS_BY_DETECTOR' +}); + +exports.CacheKeys = CacheKeys; diff --git a/Control/lib/controllers/Run.controller.js b/Control/lib/controllers/Run.controller.js index 33230a81c..c755029fe 100644 --- a/Control/lib/controllers/Run.controller.js +++ b/Control/lib/controllers/Run.controller.js @@ -13,6 +13,7 @@ */ const {Log} = require('@aliceo2/web-ui'); const {updateExpressResponseFromNativeError} = require('./../errors/updateExpressResponseFromNativeError.js'); +const {CacheKeys} = require('./../common/cacheKeys.enum.js'); /** * Controller for dealing with all API requests on retrieving information on runs @@ -21,14 +22,20 @@ class RunController { /** * Constructor for initializing controller of runs * @param {RunService} runService - service to use to build information on runs + * @param {CacheService} cacheService - service to use for retrieving information stored in-memory */ - constructor(runService) { + constructor(runService, cacheService) { this._logger = new Log(`${process.env.npm_config_log_label ?? 'cog'}/run-ctrl`); /** * @type {RunService} */ this._runService = runService; + + /** + * @type {CacheService} + */ + this._cacheService = cacheService; } /** @@ -37,9 +44,18 @@ class RunController { * @param {Response} res - HTTP Response object * @returns {void} */ - getCalibrationRunsHandler(_, res) { + async getCalibrationRunsHandler(_, res) { + let calibrationRuns; + try { + calibrationRuns = this._cacheService.getByKey(CacheKeys.CALIBRATION_RUNS_BY_DETECTOR); + } catch (error) { + this._logger.debug(`Unable to serve from cache due to: ${error}`); + } try { - res.status(200).json(this._runService.calibrationRunsPerDetector); + if (!calibrationRuns) { + calibrationRuns = await this._runService.retrieveCalibrationRunsGroupedByDetector(); + } + res.status(200).json(calibrationRuns); } catch (error) { this._logger.debug(error); updateExpressResponseFromNativeError(res, error); diff --git a/Control/lib/services/Bookkeeping.service.js b/Control/lib/services/Bookkeeping.service.js index bfcab3cc2..763a8d082 100644 --- a/Control/lib/services/Bookkeeping.service.js +++ b/Control/lib/services/Bookkeeping.service.js @@ -17,7 +17,7 @@ const {httpGetJson} = require('./../utils.js'); const RunSummaryAdapter = require('./../adapters/RunSummaryAdapter.js'); const {BookkeepingFilterAdapter} = require('./../adapters/external/BookkeepingFilterAdapter.js'); -const DEFAULT_REFRESH_RATE = 10000; +const DEFAULT_REFRESH_RATE = 30000; /** * BookkeepingService class to be used to retrieve data from Bookkeeping diff --git a/Control/lib/services/Broadcast.service.js b/Control/lib/services/Broadcast.service.js new file mode 100644 index 000000000..e09f0d152 --- /dev/null +++ b/Control/lib/services/Broadcast.service.js @@ -0,0 +1,50 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. +*/ + +const {WebSocketMessage} = require('@aliceo2/web-ui'); + +/** + * @class + * BroadcastService class is to be used for building a websocket message and broadcasting it via web sockets + */ +class BroadcastService { + /** + * @constructor + * Constructor for initializing the service with AliceO2/websocket service instance to use + * @param {WebSocket} wsService - which is to be used for broadcasting + */ + constructor(wsService) { + /** + * @type {WebSocket} + */ + this._wsService = wsService; + } + + /** + * Method to receive command and payload to build a WebSocket message and broadcast it to all listening clients + * @param {String} command - command to be added to websocket message + * @param {Object} payload - payload to be sent to the clients + * @return {void} + */ + broadcast(command, payload) { + if (payload) { + const message = new WebSocketMessage() + .setCommand(command) + .setPayload(payload); + this._wsService?.broadcast(message); + } + } +} + +module.exports = {BroadcastService}; diff --git a/Control/lib/services/Cache.service.js b/Control/lib/services/Cache.service.js new file mode 100644 index 000000000..8b4a39d27 --- /dev/null +++ b/Control/lib/services/Cache.service.js @@ -0,0 +1,82 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. +*/ + +const {Log} = require('@aliceo2/web-ui'); +const {deepStrictEqual, AssertionError} = require('assert'); + +/** + * @class + * CacheService class is designed to store in-memory information and allow users to also broadcast new information to the all or registered clients. + */ +class CacheService { + /** + * @constructor + * Constructor for initializing the service with: + * - empty maps for needed information + * - optional service for broadcasting information + * @param {BroadcastService} broadcastService - which is to be used for broadcasting + */ + constructor(broadcastService) { + + /** + * @type {Object} + */ + this._memory = {}; + + /** + * @type {BroadcastService} + */ + this._broadcastService = broadcastService; + + this._logger = new Log(`${process.env.npm_config_log_label ?? 'cog'}/cache-service`); + } + + /** + * Method to receive a function for retrieval of information and a key under which the information should be updated + * @param {String} key - key under which the information should be stored + * @param {String} value - command to be used for broadcasting message + * @param {Object} broadcastConfig - object containing broadcast information; if present information will be broadcasted + * @return {void} + */ + async updateByKeyAndBroadcast(key, value, {command} = {}) { + if (value) { + try { + deepStrictEqual(value, this._memory[key]); + } catch (error) { + if (error instanceof AssertionError) { + this._memory[key] = value; + if (command) { + this._broadcastService.broadcast(command, value); + } + } else { + this._logger.debug(`Unable to update key ${key} due to ${error}`); + } + } + } + } + + /** + * Getter for retrieving a copy of the information stored in-memory under a certain key + * @param {key} - key under which information is stored + * @return {Object} + */ + getByKey(key) { + if (this._memory[key]) { + return JSON.parse(JSON.stringify(this._memory[key])); + } + return null; + } +} + +module.exports = {CacheService}; diff --git a/Control/lib/services/Run.service.js b/Control/lib/services/Run.service.js index 079f7f5d9..f8fe4c585 100644 --- a/Control/lib/services/Run.service.js +++ b/Control/lib/services/Run.service.js @@ -13,12 +13,13 @@ */ const {Log} = require('@aliceo2/web-ui'); -const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js'); -const {RUNTIME_COMPONENT: {COG}, RUNTIME_KEY: {CALIBRATION_MAPPING}} = require('./../common/kvStore/runtime.enum.js'); -const {RunDefinitions} = require('./../common/runDefinition.enum.js') +const {CacheKeys} = require('../common/cacheKeys.enum.js'); +const {grpcErrorToNativeError} = require('./../errors/grpcErrorToNativeError.js'); const {LOG_LEVEL} = require('./../common/logLevel.enum.js'); const {RunCalibrationStatus} = require('./../common/runCalibrationStatus.enum.js'); +const {RunDefinitions} = require('./../common/runDefinition.enum.js') +const {RUNTIME_COMPONENT: {COG}, RUNTIME_KEY: {CALIBRATION_MAPPING}} = require('./../common/kvStore/runtime.enum.js'); /** * @class @@ -33,8 +34,9 @@ class RunService { * Constructor for configuring the service to retrieve data via passed services * @param {BookkeepingService} bkpService - service for retrieving RUNs information * @param {ApricotService} apricotService - service for retrieving information through AliECS Apricot gRPC connection, mainly KV Store data + * @param {CacheService} cacheService - service to store information in-memory */ - constructor(bkpService, apricotService) { + constructor(bkpService, apricotService, cacheService) { /** * @type {BookkeepingService} */ @@ -45,6 +47,11 @@ class RunService { */ this._apricotService = apricotService; + /** + * @type {CacheService} + */ + this._cacheService = cacheService; + /** * @type {Object} */ @@ -56,16 +63,6 @@ class RunService { */ this._calibrationConfigurationPerDetectorMap = {}; - /** - * @type {Object>} - */ - this._calibrationRunsPerDetector = {}; - - /** - * @type {Object>} - */ - this._calibrationRunsPerDetector = {}; - this._logger = new Log(`${process.env.npm_config_log_label ?? 'cog'}/run-service`); } @@ -76,7 +73,7 @@ class RunService { async init() { this._calibrationConfigurationPerDetectorMap = await this._retrieveCalibrationConfigurationsForDetectors(); this._runTypes = await this._bkpService.getRunTypes(); - this._calibrationRunsPerDetector = await this.retrieveCalibrationRunsGroupedByDetector(); + await this.retrieveCalibrationRunsGroupedByDetector(); } /** @@ -109,7 +106,11 @@ class RunService { } } } - this._calibrationRunsPerDetector = calibrationRunsPerDetector; + this._cacheService?.updateByKeyAndBroadcast( + CacheKeys.CALIBRATION_RUNS_BY_DETECTOR, + calibrationRunsPerDetector, + {command: CacheKeys.CALIBRATION_RUNS_BY_DETECTOR} + ); return calibrationRunsPerDetector; } @@ -157,13 +158,6 @@ class RunService { return this._calibrationConfigurationPerDetectorMap; } - /** - * Return the object containing a KV object with detector and its corresponding last calibration runs - * @return {Object>} - */ - get calibrationRunsPerDetector() { - return this._calibrationRunsPerDetector; - } } module.exports = {RunService}; diff --git a/Control/public/Model.js b/Control/public/Model.js index b5b03dbba..b218a15f1 100644 --- a/Control/public/Model.js +++ b/Control/public/Model.js @@ -180,7 +180,13 @@ export default class Model extends Observable { if (message?.payload[STATUS_COMPONENTS_KEYS.GENERAL_SYSTEM_KEY]) { this.about.updateComponentStatus('system', message.payload[STATUS_COMPONENTS_KEYS.GENERAL_SYSTEM_KEY]) } - + break + case 'CALIBRATION_RUNS_BY_DETECTOR': + if (message.payload) { + this.calibrationRunsModel.calibrationRuns = RemoteData.success(message?.payload) + this.notify(); + } + break } } diff --git a/Control/public/pages/CalibrationRuns/CalibrationRuns.model.js b/Control/public/pages/CalibrationRuns/CalibrationRuns.model.js index ab9bb9296..0054bb58b 100644 --- a/Control/public/pages/CalibrationRuns/CalibrationRuns.model.js +++ b/Control/public/pages/CalibrationRuns/CalibrationRuns.model.js @@ -68,4 +68,13 @@ export class CalibrationRunsModel extends Observable { get calibrationRuns() { return this._calibrationRuns; } + + /** + * Setter for updating the calibration runs object with a new RemoteData object + * @param {RemoteData} remoteDataRuns - updated information + * @return {void} + */ + set calibrationRuns(remoteDataRuns) { + this._calibrationRuns = remoteDataRuns; + } } diff --git a/Control/test/lib/controllers/mocha-run-controller.test.js b/Control/test/lib/controllers/mocha-run-controller.test.js index 3e112df2c..2134bc52b 100644 --- a/Control/test/lib/controllers/mocha-run-controller.test.js +++ b/Control/test/lib/controllers/mocha-run-controller.test.js @@ -25,27 +25,60 @@ describe(`'RunController' test suite`, () => { } describe(`'getCalibrationRunsHandler' test suite`, () => { - it('should successfully return calibrations runs grouped by detector', () => { + it('should successfully return calibrations runs grouped by detector by requesting information as cache is not enabled', async () => { const runs = { TPC: [ {runNumber: 1}, {runNumber: 2}, ] }; - const runController = new RunController({ - calibrationRunsPerDetector: runs - }); - runController.getCalibrationRunsHandler({}, res); + const runController = new RunController( + { + retrieveCalibrationRunsGroupedByDetector: sinon.stub().resolves(runs) + } + ); + await runController.getCalibrationRunsHandler({}, res); assert.ok(res.status.calledWith(200)); assert.ok(res.json.calledWith(runs)); }); - it('should return an empty object if no runs were loaded', () => { - const runController = new RunController({calibrationRunsPerDetector: {}}); - runController.getCalibrationRunsHandler({}, res); + it('should successfully return calibrations runs grouped by detector by from cache', async () => { + const runs = { + TPC: [ + {runNumber: 1}, + {runNumber: 2}, + ] + }; + const runController = new RunController( + { + getByKey: runs + } + ); + await runController.getCalibrationRunsHandler({}, res); + assert.ok(res.status.calledWith(200)); + assert.ok(res.json.calledWith(runs)); + }); + + it('should successfully return an empty object if no runs were loaded', async () => { + const runController = new RunController( + { + retrieveCalibrationRunsGroupedByDetector: sinon.stub().resolves({}) + } + ); + await runController.getCalibrationRunsHandler({}, res); assert.ok(res.status.calledWith(200)); assert.ok(res.json.calledWith({})); }); - }); + it('should return error if both cache and retrieve live options failed', async () => { + const runController = new RunController( + { + retrieveCalibrationRunsGroupedByDetector: sinon.stub().rejects(new Error('Unable to retrieve such runs')) + } + ); + await runController.getCalibrationRunsHandler({}, res); + assert.ok(res.status.calledWith(500)); + assert.ok(res.json.calledWith({message: 'Unable to retrieve such runs'})); + }); + }); });