diff --git a/platform-linux/build.sh b/platform-linux/build.sh index 5b38c7c6..5e0eb6b0 100755 --- a/platform-linux/build.sh +++ b/platform-linux/build.sh @@ -90,6 +90,10 @@ cd ../carbone-reports npm install cd ../backup-mongo npm install +cd ../mongofw +npm install +cd ../mongowr +npm install cd ../log-io/ui npm install diff --git a/platform-mac/build.sh b/platform-mac/build.sh index 37a4b8be..75046946 100755 --- a/platform-mac/build.sh +++ b/platform-mac/build.sh @@ -82,6 +82,10 @@ cd ../carbone-reports npm install cd ../backup-mongo npm install +cd ../mongofw +npm install +cd ../mongowr +npm install cd ../log-io/ui npm install npm run build diff --git a/platform-windows/build.bat b/platform-windows/build.bat index 6e3098a3..709d4a49 100644 --- a/platform-windows/build.bat +++ b/platform-windows/build.bat @@ -135,6 +135,12 @@ call %NPM% install cd %SRCPATH%\backup-mongo call %NPM% install +cd %SRCPATH\mongofw +call %NPM% install + +cd %SRCPATH\mongowr +call %NPM% install + cd %SRCPATH%\log-io\ui call %NPM% install call %NPM% run build diff --git a/platform-windows/buildupd.bat b/platform-windows/buildupd.bat index 5ca7b15f..9e1c8515 100644 --- a/platform-windows/buildupd.bat +++ b/platform-windows/buildupd.bat @@ -135,6 +135,12 @@ call %NPM% update cd %SRCPATH%\backup-mongo call %NPM% i --package-lock-only call %NPM% update +cd %SRCPATH%\mongofw +call %NPM% i --package-lock-only +call %NPM% update +cd %SRCPATH%\mongowr +call %NPM% i --package-lock-only +call %NPM% update cd %SRCPATH%\log-io\ui call %NPM% i --package-lock-only call %NPM% update diff --git a/src/mongofw/app-defs.js b/src/mongofw/app-defs.js new file mode 100644 index 00000000..fe93cf37 --- /dev/null +++ b/src/mongofw/app-defs.js @@ -0,0 +1,25 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +module.exports = { + NAME: 'MONGOFW', + ENV_PREFIX: 'JS_MONGOFW', + MSG: '{json:scada} - Mongofw - forward data from protocol drivers to another JSON-SCADA installation.', + VERSION: '0.1.0', +} diff --git a/src/mongofw/customized_module.js b/src/mongofw/customized_module.js new file mode 100644 index 00000000..058167f8 --- /dev/null +++ b/src/mongofw/customized_module.js @@ -0,0 +1,163 @@ +'use strict' + +/* + * Customizable processor of mongodb changes via change streams. + * + * THIS FILE IS INTENDED TO BE CUSTOMIZED BY USERS TO DO SPECIAL PROCESSING + * + * {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const Log = require('./simple-logger') +const { Double } = require('mongodb') +const { setInterval } = require('timers') +const dgram = require('dgram'); + +// UDP broadcast options +const udpPort = 12345; +const udpHostDst = "192.168.0.255"; +// Create a UDP socket +const udpSocket = dgram.createSocket('udp4'); + +udpSocket.bind(udpPort, () => { + udpSocket.setBroadcast(true) + // udpSocket.setMulticastInterface('::%eth1'); +}); + + +let maxSz = 0; +let cnt = 0; + +const UserActionsCollectionName = 'userActions' +const RealtimeDataCollectionName = 'realtimeData' +const CommandsQueueCollectionName = 'commandsQueue' +const SoeDataCollectionName = 'soeData' +const ProcessInstancesCollectionName = 'processInstances' +const ProtocolDriverInstancesCollectionName = 'protocolDriverInstances' +const ProtocolConnectionsCollectionName = 'protocolConnections' + +let CyclicIntervalHandle = null + +// this will be called by the main module when mongo is connected (or reconnected) +module.exports.CustomProcessor = function ( + clientMongo, + jsConfig, + Redundancy, + MongoStatus +) { + if (clientMongo === null) return + const db = clientMongo.db(jsConfig.mongoDatabaseName) + + // ------------------------------------------------------------------------------------------- + // EXAMPLE OF CYCLIC PROCESSING AT INTERVALS + // BEGIN EXAMPLE + + let CyclicProcess = async function () { + // do cyclic processing at each CyclicInterval ms + + if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected) + return // do nothing if process is inactive + + try { + let res = await db + .collection(RealtimeDataCollectionName) + .findOne({ _id: -2 }) // id of point tag with number of digital updates + + Log.log( + 'Custom Process - Checking number of digital updates: ' + + res.valueString + ) + } catch (err) { + Log.log(err) + } + + return + } + const CyclicInterval = 5000 // interval time in ms + clearInterval(CyclicIntervalHandle) // clear older instances if any + CyclicIntervalHandle = setInterval(CyclicProcess, CyclicInterval) // start a cyclic processing + + // EXAMPLE OF CYCLIC PROCESSING AT INTERVALS + // END EXAMPLE + // ------------------------------------------------------------------------------------------- + + const changeStreamUserActions = db + .collection(RealtimeDataCollectionName) + .watch( + { $match: { operationType: 'update' } }, + { + fullDocument: 'updateLookup' + } + ) + + try { + changeStreamUserActions.on('error', change => { + if (clientMongo) clientMongo.close() + clientMongo = null + Log.log('Custom Process - Error on changeStreamUserActions!') + }) + changeStreamUserActions.on('close', change => { + Log.log('Custom Process - Closed changeStreamUserActions!') + }) + changeStreamUserActions.on('end', change => { + if (clientMongo) clientMongo.close() + clientMongo = null + Log.log('Custom Process - Ended changeStreamUserActions!') + }) + + // start listen to changes + changeStreamUserActions.on('change', change => { + // Log.log(change.fullDocument) + if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected) + return // do nothing if process is inactive + + // will send only update data from drivers + if (!change.updateDescription.updatedFields?.sourceDataUpdate) + return; + if (change.updateDescription.updatedFields?.sourceDataUpdate?.valueBsonAtSource) + delete change.updateDescription.updatedFields.sourceDataUpdate.valueBsonAtSource; + if (!Redundancy.ProcessStateIsActive()) return // do nothing if process is inactive + const fwObj = { + cnt: cnt++, + operationType: change.operationType, + documentKey: change.documentKey, + updateDescription: change.updateDescription + } + const opData = JSON.stringify(fwObj); + const message = Buffer.from(opData); + if (message.length > maxSz) maxSz = message.length; + + if (message.length > 60000) { + console.log('Message too large: ', opData); + } + else + udpSocket.send(message, 0, message.length, udpPort, udpHostDst, (err, bytes) => { + if (err) { + console.log('UDP error:', err); + } else { + // console.log('Data sent via UDP', opData); + //console.log('Size: ', message.length); + //console.log('Max: ', maxSz); + } + + }); + + }) + } catch (e) { + Log.log('Custom Process - Error: ' + e) + } + +} diff --git a/src/mongofw/index.js b/src/mongofw/index.js new file mode 100644 index 00000000..eb385a83 --- /dev/null +++ b/src/mongofw/index.js @@ -0,0 +1,105 @@ +'use strict' + +/* + * Customizable processor of mongodb changes via change streams. + * DO NOT EDIT THIS FILE! CUSTOMIZE THE customized_module.js file + * {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const Log = require('./simple-logger') +const LoadConfig = require('./load-config') +const Redundancy = require('./redundancy') +const { MongoClient } = require('mongodb') +const CustomProcessor = require('./customized_module').CustomProcessor + +const args = process.argv.slice(2) +let inst = null +if (args.length > 0) inst = parseInt(args[0]) + +let logLevel = null +if (args.length > 1) logLevel = parseInt(args[1]) +let confFile = null +if (args.length > 2) confFile = args[2] +const jsConfig = LoadConfig(confFile, logLevel, inst) +const MongoStatus = {HintMongoIsConnected : false} +Log.log('Connecting to MongoDB server...') +;(async () => { + let clientMongo = null + while (true) { + if (clientMongo === null) + await MongoClient.connect( + jsConfig.mongoConnectionString, + jsConfig.MongoConnectionOptions + ) + .then(async (client) => { + clientMongo = client + MongoStatus.HintMongoIsConnected = true + const db = clientMongo.db(jsConfig.mongoDatabaseName) + Log.log('Connected correctly to MongoDB server') + Redundancy.Start(5000, clientMongo, db, jsConfig, MongoStatus) + CustomProcessor(clientMongo, jsConfig, Redundancy, MongoStatus) + }) + .catch(function (err) { + if (clientMongo) clientMongo.close() + clientMongo = null + Log.log(err) + }) + + // wait 5 seconds + await new Promise((resolve) => setTimeout(resolve, 5000)) + + // detect connection problems, if error will null the client to later reconnect + if (clientMongo === undefined) { + Log.log('Disconnected Mongodb!') + clientMongo = null + } + if (clientMongo) + if (!(await checkConnectedMongo(clientMongo))) { + // not anymore connected, will retry + Log.log('Disconnected Mongodb!') + if (clientMongo) clientMongo.close() + clientMongo = null + } + } +})() + +// test mongoDB connectivity +async function checkConnectedMongo(client) { + if (!client) { + return false + } + const CheckMongoConnectionTimeout = 1000 + let tr = setTimeout(() => { + Log.log('Mongo ping timeout error!') + MongoStatus.HintMongoIsConnected = false + }, CheckMongoConnectionTimeout) + + let res = null + try { + res = await client.db('admin').command({ ping: 1 }) + clearTimeout(tr) + } catch (e) { + Log.log('Error on mongodb connection!') + return false + } + if ('ok' in res && res.ok) { + MongoStatus.HintMongoIsConnected = true + return true + } else { + MongoStatus.HintMongoIsConnected = false + return false + } +} diff --git a/src/mongofw/load-config.js b/src/mongofw/load-config.js new file mode 100644 index 00000000..3e15b392 --- /dev/null +++ b/src/mongofw/load-config.js @@ -0,0 +1,111 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const fs = require('fs') +const Log = require('./simple-logger') +const AppDefs = require('./app-defs') +const { ReadPreference } = require('mongodb') + +// load and parse config file +function LoadConfig (confFileArg, logLevelArg, instArg) { + + let configFile = + confFileArg || process.env.JS_CONFIG_FILE || '../../conf/json-scada.json' + Log.log('Config - Config File: ' + configFile) + + if (!fs.existsSync(configFile)) { + Log.log('Config - Error: config file not found!') + process.exit() + } + + let rawFileContents = fs.readFileSync(configFile) + let configObj = JSON.parse(rawFileContents) + if ( + typeof configObj.mongoConnectionString != 'string' || + configObj.mongoConnectionString === '' + ) { + Log.log('Error reading config file.') + process.exit() + } + + Log.levelCurrent = Log.levelNormal + if (AppDefs.ENV_PREFIX + 'LOGLEVEL' in process.env) + Log.levelCurrent = process.env[AppDefs.ENV_PREFIX + 'LOGLEVEL'] + if (logLevelArg) Log.levelCurrent = parseInt(logLevelArg) + configObj.LogLevel = Log.levelCurrent + + configObj.Instance = + instArg || process.env[AppDefs.ENV_PREFIX + 'INSTANCE'] || 1 + + configObj.GridFsCollectionName = 'files' + configObj.RealtimeDataCollectionName = 'realtimeData' + configObj.UsersCollectionName = 'users' + configObj.SoeDataCollectionName = 'soeData' + configObj.CommandsQueueCollectionName = 'commandsQueue' + configObj.ProtocolDriverInstancesCollectionName = 'protocolDriverInstances' + configObj.ProtocolConnectionsCollectionName = 'protocolConnections' + configObj.ProcessInstancesCollectionName = 'processInstances' + configObj.GroupSep = '~' + configObj.ConnectionNumber = 0 + + Log.log('Config - ' + AppDefs.MSG + ' Version ' + AppDefs.VERSION) + Log.log('Config - Instance: ' + configObj.Instance) + Log.log('Config - Log level: ' + Log.levelCurrent) + + configObj.MongoConnectionOptions = getMongoConnectionOptions(configObj) + return configObj +} + +// prepare mongo connection options +function getMongoConnectionOptions (configObj) { + let connOptions = { + //useNewUrlParser: true, + //useUnifiedTopology: true, + appname: + AppDefs.NAME + + ' Version:' + + AppDefs.VERSION + + ' Instance:' + + configObj.Instance, + maxPoolSize: 20, + readPreference: ReadPreference.PRIMARY + } + + if ( + typeof configObj.tlsCaPemFile === 'string' && + configObj.tlsCaPemFile.trim() !== '' + ) { + configObj.tlsClientKeyPassword = configObj.tlsClientKeyPassword || '' + configObj.tlsAllowInvalidHostnames = + configObj.tlsAllowInvalidHostnames || false + configObj.tlsAllowChainErrors = configObj.tlsAllowChainErrors || false + configObj.tlsInsecure = configObj.tlsInsecure || false + + connOptions.tls = true + connOptions.tlsCAFile = configObj.tlsCaPemFile + connOptions.tlsCertificateKeyFile = configObj.tlsClientPemFile + connOptions.tlsCertificateKeyFilePassword = configObj.tlsClientKeyPassword + connOptions.tlsAllowInvalidHostnames = configObj.tlsAllowInvalidHostnames + connOptions.tlsInsecure = configObj.tlsInsecure + } + + return connOptions +} + +module.exports = LoadConfig diff --git a/src/mongofw/package-lock.json b/src/mongofw/package-lock.json new file mode 100644 index 00000000..d585835c --- /dev/null +++ b/src/mongofw/package-lock.json @@ -0,0 +1,157 @@ +{ + "name": "mongofw", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "mongofw", + "version": "0.1.0", + "license": "GPL-3.0-or-later", + "dependencies": { + "fs": "0.0.1-security", + "mongodb": "^6.6.2" + } + }, + "node_modules/@mongodb-js/saslprep": { + "version": "1.1.7", + "resolved": "https://registry.npmjs.org/@mongodb-js/saslprep/-/saslprep-1.1.7.tgz", + "integrity": "sha512-dCHW/oEX0KJ4NjDULBo3JiOaK5+6axtpBbS+ao2ZInoAL9/YRQLhXzSNAFz7hP4nzLkIqsfYAK/PDE3+XHny0Q==", + "dependencies": { + "sparse-bitfield": "^3.0.3" + } + }, + "node_modules/@types/webidl-conversions": { + "version": "7.0.3", + "resolved": "https://registry.npmjs.org/@types/webidl-conversions/-/webidl-conversions-7.0.3.tgz", + "integrity": "sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==" + }, + "node_modules/@types/whatwg-url": { + "version": "11.0.5", + "resolved": "https://registry.npmjs.org/@types/whatwg-url/-/whatwg-url-11.0.5.tgz", + "integrity": "sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==", + "dependencies": { + "@types/webidl-conversions": "*" + } + }, + "node_modules/bson": { + "version": "6.7.0", + "resolved": "https://registry.npmjs.org/bson/-/bson-6.7.0.tgz", + "integrity": "sha512-w2IquM5mYzYZv6rs3uN2DZTOBe2a0zXLj53TGDqwF4l6Sz/XsISrisXOJihArF9+BZ6Cq/GjVht7Sjfmri7ytQ==", + "engines": { + "node": ">=16.20.1" + } + }, + "node_modules/fs": { + "version": "0.0.1-security", + "resolved": "https://registry.npmjs.org/fs/-/fs-0.0.1-security.tgz", + "integrity": "sha512-3XY9e1pP0CVEUCdj5BmfIZxRBTSDycnbqhIOGec9QYtmVH2fbLpj86CFWkrNOkt/Fvty4KZG5lTglL9j/gJ87w==" + }, + "node_modules/memory-pager": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/memory-pager/-/memory-pager-1.5.0.tgz", + "integrity": "sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==" + }, + "node_modules/mongodb": { + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/mongodb/-/mongodb-6.6.2.tgz", + "integrity": "sha512-ZF9Ugo2JCG/GfR7DEb4ypfyJJyiKbg5qBYKRintebj8+DNS33CyGMkWbrS9lara+u+h+yEOGSRiLhFO/g1s1aw==", + "dependencies": { + "@mongodb-js/saslprep": "^1.1.5", + "bson": "^6.7.0", + "mongodb-connection-string-url": "^3.0.0" + }, + "engines": { + "node": ">=16.20.1" + }, + "peerDependencies": { + "@aws-sdk/credential-providers": "^3.188.0", + "@mongodb-js/zstd": "^1.1.0", + "gcp-metadata": "^5.2.0", + "kerberos": "^2.0.1", + "mongodb-client-encryption": ">=6.0.0 <7", + "snappy": "^7.2.2", + "socks": "^2.7.1" + }, + "peerDependenciesMeta": { + "@aws-sdk/credential-providers": { + "optional": true + }, + "@mongodb-js/zstd": { + "optional": true + }, + "gcp-metadata": { + "optional": true + }, + "kerberos": { + "optional": true + }, + "mongodb-client-encryption": { + "optional": true + }, + "snappy": { + "optional": true + }, + "socks": { + "optional": true + } + } + }, + "node_modules/mongodb-connection-string-url": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/mongodb-connection-string-url/-/mongodb-connection-string-url-3.0.1.tgz", + "integrity": "sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==", + "dependencies": { + "@types/whatwg-url": "^11.0.2", + "whatwg-url": "^13.0.0" + } + }, + "node_modules/punycode": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", + "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", + "engines": { + "node": ">=6" + } + }, + "node_modules/sparse-bitfield": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/sparse-bitfield/-/sparse-bitfield-3.0.3.tgz", + "integrity": "sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==", + "dependencies": { + "memory-pager": "^1.0.2" + } + }, + "node_modules/tr46": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/tr46/-/tr46-4.1.1.tgz", + "integrity": "sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==", + "dependencies": { + "punycode": "^2.3.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/webidl-conversions": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", + "integrity": "sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==", + "engines": { + "node": ">=12" + } + }, + "node_modules/whatwg-url": { + "version": "13.0.0", + "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-13.0.0.tgz", + "integrity": "sha512-9WWbymnqj57+XEuqADHrCJ2eSXzn8WXIW/YSGaZtb2WKAInQ6CHfaUUcTyyver0p8BDg5StLQq8h1vtZuwmOig==", + "dependencies": { + "tr46": "^4.1.1", + "webidl-conversions": "^7.0.0" + }, + "engines": { + "node": ">=16" + } + } + } +} diff --git a/src/mongofw/package.json b/src/mongofw/package.json new file mode 100644 index 00000000..30ae36ea --- /dev/null +++ b/src/mongofw/package.json @@ -0,0 +1,15 @@ +{ + "name": "mongofw", + "version": "0.1.0", + "description": "{json:scada} - Mongofw - forward data from protocol drivers to another JSON-SCADA installation.", + "main": "index", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "Ricardo Lastra Olsen", + "license": "GPL-3.0-or-later", + "dependencies": { + "fs": "0.0.1-security", + "mongodb": "^6.6.2" + } +} diff --git a/src/mongofw/redundancy.js b/src/mongofw/redundancy.js new file mode 100644 index 00000000..422b85b7 --- /dev/null +++ b/src/mongofw/redundancy.js @@ -0,0 +1,169 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2021 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const { setInterval } = require('timers') +const { Double } = require('mongodb') +const Log = require('./simple-logger') +const AppDefs = require('./app-defs') + +let ProcessActive = false // redundancy state +let redundancyIntervalHandle = null // timer handle + +// start processing redundancy +function Start(interval, clientMongo, db, configObj, MongoStatus) { + // check and update redundancy control + ProcessRedundancy(clientMongo, db, configObj) + clearInterval(redundancyIntervalHandle) + redundancyIntervalHandle = setInterval(function () { + if (!MongoStatus.HintMongoIsConnected){ + ProcessActive = false + return + } + + ProcessRedundancy(clientMongo, db, configObj) + }, interval) +} + +// process JSON-SCADA redundancy state for this driver module +async function ProcessRedundancy(clientMongo, db, configObj) { + if (!clientMongo || !db) return + + Log.levelCurrent = configObj.LogLevel + + const countKeepAliveUpdatesLimit = 4 + + // poor man's local static variables + if (typeof ProcessRedundancy.countKeepAliveNotUpdated === 'undefined') { + ProcessRedundancy.lastActiveNodeKeepAliveTimeTag = null + ProcessRedundancy.countKeepAliveNotUpdated = 0 + } + + Log.log('Redundancy - Process ' + (ProcessActive ? 'Active' : 'Inactive')) + + try { + // look for process instance entry, if not found create a new entry + const result = await db + .collection(configObj.ProcessInstancesCollectionName) + .findOne({ + processName: AppDefs.NAME, + processInstanceNumber: new Double(configObj.Instance), + }) + + if (!result) { + // not found, then create + ProcessActive = true + Log.log('Redundancy - Instance config not found, creating one...') + db.collection(configObj.ProcessInstancesCollectionName).insertOne({ + processName: AppDefs.NAME, + processInstanceNumber: new Double(configObj.Instance), + enabled: true, + logLevel: new Double(1), + nodeNames: [], + activeNodeName: configObj.nodeName, + activeNodeKeepAliveTimeTag: new Date(), + }) + } else { + // check for disabled or node not allowed + const instance = result + let instKeepAliveTimeTag = null + + if ('activeNodeKeepAliveTimeTag' in instance) + instKeepAliveTimeTag = instance.activeNodeKeepAliveTimeTag.toISOString() + + if (instance?.enabled === false) { + Log.log('Redundancy - Instance disabled, exiting...') + process.exit() + } + if (instance?.nodeNames !== null && instance.nodeNames.length > 0) { + if (!instance.nodeNames.includes(configObj.nodeName)) { + Log.log('Redundancy - Node name not allowed, exiting...') + process.exit() + } + } + if (instance?.activeNodeName === configObj.nodeName) { + if (!ProcessActive) Log.log('Redundancy - Node activated!') + ProcessRedundancy.countKeepAliveNotUpdated = 0 + ProcessActive = true + } else { + // other node active + if (ProcessActive) { + Log.log('Redundancy - Node deactivated!') + ProcessRedundancy.countKeepAliveNotUpdated = 0 + } + ProcessActive = false + if ( + ProcessRedundancy.lastActiveNodeKeepAliveTimeTag === + instKeepAliveTimeTag + ) { + ProcessRedundancy.countKeepAliveNotUpdated++ + Log.log( + 'Redundancy - Keep-alive from active node not updated. ' + + ProcessRedundancy.countKeepAliveNotUpdated + ) + } else { + ProcessRedundancy.countKeepAliveNotUpdated = 0 + Log.log( + 'Redundancy - Keep-alive updated by active node. Staying inactive.' + ) + } + ProcessRedundancy.lastActiveNodeKeepAliveTimeTag = instKeepAliveTimeTag + if ( + ProcessRedundancy.countKeepAliveNotUpdated > + countKeepAliveUpdatesLimit + ) { + // cnt exceeded, be active + ProcessRedundancy.countKeepAliveNotUpdated = 0 + Log.log('Redundancy - Node activated!') + ProcessActive = true + } + } + + if (ProcessActive) { + // process active, then update keep alive + db.collection(configObj.ProcessInstancesCollectionName).updateOne( + { + processName: AppDefs.NAME, + processInstanceNumber: new Double(configObj.Instance), + }, + { + $set: { + activeNodeName: configObj.nodeName, + activeNodeKeepAliveTimeTag: new Date(), + softwareVersion: AppDefs.VERSION, + stats: {}, + }, + } + ) + } + } + } + catch (err) { + Log.log('Redundancy - Error: ' + err) + } +} + +function ProcessStateIsActive() { + return ProcessActive +} + +module.exports = { + ProcessRedundancy: ProcessRedundancy, + Start: Start, + ProcessStateIsActive: ProcessStateIsActive, +} diff --git a/src/mongofw/simple-logger.js b/src/mongofw/simple-logger.js new file mode 100644 index 00000000..2ee43cfd --- /dev/null +++ b/src/mongofw/simple-logger.js @@ -0,0 +1,35 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const Log = { + // simple message logger + levelMin: 0, + levelNormal: 1, + levelDetailed: 2, + levelDebug: 3, + levelCurrent: 1, + log: function (msg, level = 1) { + if (level <= this.levelCurrent) { + let dt = new Date() + console.log(dt.toISOString() + ' - ' + msg) + } + } + } + +module.exports = Log diff --git a/src/mongowr/README.md b/src/mongowr/README.md new file mode 100644 index 00000000..88798ee8 --- /dev/null +++ b/src/mongowr/README.md @@ -0,0 +1,34 @@ +# {json:scada} cs_custom_processor.js + +This process can be customized for special data processing on mongodb changes. + +Requires Node.js. + +## Customization of Processing + +Custom processing can be + +* CYCLIC - At regular adjustable intervals. +* BY EXCEPTION - By change on any mongodb collection (by exception). +* BY EXTERNAL SOURCE - By external events (requires nodejs coding, no example provided). + +Check the _customized_module.js_ file for examples of cyclic and by exception processing. +The _cs_custom_processor.js_ should not be edited, it provides MongoDB connection handling and redundancy control. + +## Process Command Line Arguments And Environment Variables + +This process has the following command line arguments and equivalent environment variables. + +* _**1st arg. - Instance Number**_ [Integer] - Instance number to be executed. **Optional argument, default=1**. Env. variable: **JS_CSCUSTOMPROC_INSTANCE**. +* _**2nd arg. - Log. Level**_ [Integer] - Log level (0=minimum,1=basic,2=detailed,3=debug). **Optional argument, default=1**. Env. variable: **JS_CSCUSTOMPROC_LOGLEVEL**. +* _**3rd arg. - Config File Path/Name**_ [String] - Path/name of the JSON-SCADA config file. **Optional argument, default="../conf/json-scada.json"**. Env. variable: **JS_CONFIG_FILE**. + +Command line args take precedence over environment variables. + +## Process Instance Collection + +A _processInstance_ entry will be created with defaults if one is not found. It can be used to configure some parameters and limit nodes allowed to run instances. + +See also + +* [Schema Documentation](../../docs/schema.md) diff --git a/src/mongowr/app-defs.js b/src/mongowr/app-defs.js new file mode 100644 index 00000000..a4420332 --- /dev/null +++ b/src/mongowr/app-defs.js @@ -0,0 +1,25 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +module.exports = { + NAME: 'MONGOWR', + ENV_PREFIX: 'JS_MONGOWR', + MSG: '{json:scada} - Mongowr - write protocol data forwarded by another JSON-SCADA installation.', + VERSION: '0.1.0', +} diff --git a/src/mongowr/customized_module.js b/src/mongowr/customized_module.js new file mode 100644 index 00000000..a3e697ff --- /dev/null +++ b/src/mongowr/customized_module.js @@ -0,0 +1,103 @@ +'use strict' + +/* + * Customizable processor of mongodb changes via change streams. + * + * THIS FILE IS INTENDED TO BE CUSTOMIZED BY USERS TO DO SPECIAL PROCESSING + * + * {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const Log = require('./simple-logger') +const { Double } = require('mongodb') +const { setInterval } = require('timers') +const dgram = require('node:dgram'); + +// UDP broadcast options +const udpPort = 12345; +const udpBind = "0.0.0.0"; + +const UserActionsCollectionName = 'userActions' +const RealtimeDataCollectionName = 'realtimeData' +const CommandsQueueCollectionName = 'commandsQueue' +const SoeDataCollectionName = 'soeData' +const ProcessInstancesCollectionName = 'processInstances' +const ProtocolDriverInstancesCollectionName = 'protocolDriverInstances' +const ProtocolConnectionsCollectionName = 'protocolConnections' + +let CyclicIntervalHandle = null + +// this will be called by the main module when mongo is connected (or reconnected) +module.exports.CustomProcessor = function ( + clientMongo, + jsConfig, + Redundancy, + MongoStatus +) { + if (clientMongo === null) return + const db = clientMongo.db(jsConfig.mongoDatabaseName) + const collection = db.collection(RealtimeDataCollectionName) + + const server = dgram.createSocket('udp4'); + + let maxSz = 0; + + server.on('error', (err) => { + console.error(`server error:\n${err.stack}`); + server.close(); + }); + + server.on('message', (msg, rinfo) => { + if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected) + return // do nothing if process is inactive + + + // console.log(`server got: ${msg} from ${rinfo.address}:${rinfo.port}`); + if (msg.length > maxSz) maxSz = msg.length; + console.log('Size: ', msg.length); + console.log('Max: ', maxSz); + + try { + let dataObj = JSON.parse(msg); + // will process only update data from drivers + if (!dataObj?.updateDescription?.updatedFields?.sourceDataUpdate) + return; + + if (dataObj?.updateDescription?.updatedFields?.sourceDataUpdate.timeTag) + dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTag = new Date(dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTag) + if (dataObj?.updateDescription?.updatedFields?.sourceDataUpdate.timeTagAtSource) + dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTagAtSource = new Date(dataObj.updateDescription.updatedFields.sourceDataUpdate.timeTagAtSource) + + collection.updateOne( + { + ... dataObj.documentKey + }, + { $set: { ... dataObj.updateDescription.updatedFields } } + ) + + } catch (e) { + console.log(e) + } + + }); + + server.on('listening', () => { + const address = server.address(); + console.log(`server listening ${address.address}:${address.port}`); + }); + + server.bind(udpPort, udpBind); +} diff --git a/src/mongowr/index.js b/src/mongowr/index.js new file mode 100644 index 00000000..93d1597f --- /dev/null +++ b/src/mongowr/index.js @@ -0,0 +1,105 @@ +'use strict' + +/* + * Customizable processor of mongodb changes via change streams. + * DO NOT EDIT THIS FILE! CUSTOMIZE THE customized_module.js file + * {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const Log = require('./simple-logger') +const LoadConfig = require('./load-config') +const Redundancy = require('./redundancy') +const { MongoClient } = require('mongodb') +const CustomProcessor = require('./customized_module').CustomProcessor + +const args = process.argv.slice(2) +let inst = null +if (args.length > 0) inst = parseInt(args[0]) + +let logLevel = null +if (args.length > 1) logLevel = parseInt(args[1]) +let confFile = null +if (args.length > 2) confFile = args[2] +const jsConfig = LoadConfig(confFile, logLevel, inst) +const MongoStatus = {HintMongoIsConnected : false} +Log.log('Connecting to MongoDB server...') +;(async () => { + let clientMongo = null + while (true) { + if (clientMongo === null) + await MongoClient.connect( + jsConfig.mongoConnectionString, + jsConfig.MongoConnectionOptions + ) + .then(async (client) => { + clientMongo = client + MongoStatus.HintMongoIsConnected = true + const db = clientMongo.db(jsConfig.mongoDatabaseName) + Log.log('Connected correctly to MongoDB server') + Redundancy.Start(5000, clientMongo, db, jsConfig, MongoStatus) + CustomProcessor(clientMongo, jsConfig, Redundancy, MongoStatus) + }) + .catch(function (err) { + if (clientMongo) clientMongo.close() + clientMongo = null + Log.log(err) + }) + + // wait 5 seconds + await new Promise((resolve) => setTimeout(resolve, 5000)) + + // detect connection problems, if error will null the client to later reconnect + if (clientMongo === undefined) { + Log.log('Disconnected Mongodb!') + clientMongo = null + } + if (clientMongo) + if (!(await checkConnectedMongo(clientMongo))) { + // not anymore connected, will retry + Log.log('Disconnected Mongodb!') + if (clientMongo) clientMongo.close() + clientMongo = null + } + } +})() + +// test mongoDB connectivity +async function checkConnectedMongo(client) { + if (!client) { + return false + } + const CheckMongoConnectionTimeout = 1000 + let tr = setTimeout(() => { + Log.log('Mongo ping timeout error!') + MongoStatus.HintMongoIsConnected = false + }, CheckMongoConnectionTimeout) + + let res = null + try { + res = await client.db('admin').command({ ping: 1 }) + clearTimeout(tr) + } catch (e) { + Log.log('Error on mongodb connection!') + return false + } + if ('ok' in res && res.ok) { + MongoStatus.HintMongoIsConnected = true + return true + } else { + MongoStatus.HintMongoIsConnected = false + return false + } +} diff --git a/src/mongowr/load-config.js b/src/mongowr/load-config.js new file mode 100644 index 00000000..3e15b392 --- /dev/null +++ b/src/mongowr/load-config.js @@ -0,0 +1,111 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const fs = require('fs') +const Log = require('./simple-logger') +const AppDefs = require('./app-defs') +const { ReadPreference } = require('mongodb') + +// load and parse config file +function LoadConfig (confFileArg, logLevelArg, instArg) { + + let configFile = + confFileArg || process.env.JS_CONFIG_FILE || '../../conf/json-scada.json' + Log.log('Config - Config File: ' + configFile) + + if (!fs.existsSync(configFile)) { + Log.log('Config - Error: config file not found!') + process.exit() + } + + let rawFileContents = fs.readFileSync(configFile) + let configObj = JSON.parse(rawFileContents) + if ( + typeof configObj.mongoConnectionString != 'string' || + configObj.mongoConnectionString === '' + ) { + Log.log('Error reading config file.') + process.exit() + } + + Log.levelCurrent = Log.levelNormal + if (AppDefs.ENV_PREFIX + 'LOGLEVEL' in process.env) + Log.levelCurrent = process.env[AppDefs.ENV_PREFIX + 'LOGLEVEL'] + if (logLevelArg) Log.levelCurrent = parseInt(logLevelArg) + configObj.LogLevel = Log.levelCurrent + + configObj.Instance = + instArg || process.env[AppDefs.ENV_PREFIX + 'INSTANCE'] || 1 + + configObj.GridFsCollectionName = 'files' + configObj.RealtimeDataCollectionName = 'realtimeData' + configObj.UsersCollectionName = 'users' + configObj.SoeDataCollectionName = 'soeData' + configObj.CommandsQueueCollectionName = 'commandsQueue' + configObj.ProtocolDriverInstancesCollectionName = 'protocolDriverInstances' + configObj.ProtocolConnectionsCollectionName = 'protocolConnections' + configObj.ProcessInstancesCollectionName = 'processInstances' + configObj.GroupSep = '~' + configObj.ConnectionNumber = 0 + + Log.log('Config - ' + AppDefs.MSG + ' Version ' + AppDefs.VERSION) + Log.log('Config - Instance: ' + configObj.Instance) + Log.log('Config - Log level: ' + Log.levelCurrent) + + configObj.MongoConnectionOptions = getMongoConnectionOptions(configObj) + return configObj +} + +// prepare mongo connection options +function getMongoConnectionOptions (configObj) { + let connOptions = { + //useNewUrlParser: true, + //useUnifiedTopology: true, + appname: + AppDefs.NAME + + ' Version:' + + AppDefs.VERSION + + ' Instance:' + + configObj.Instance, + maxPoolSize: 20, + readPreference: ReadPreference.PRIMARY + } + + if ( + typeof configObj.tlsCaPemFile === 'string' && + configObj.tlsCaPemFile.trim() !== '' + ) { + configObj.tlsClientKeyPassword = configObj.tlsClientKeyPassword || '' + configObj.tlsAllowInvalidHostnames = + configObj.tlsAllowInvalidHostnames || false + configObj.tlsAllowChainErrors = configObj.tlsAllowChainErrors || false + configObj.tlsInsecure = configObj.tlsInsecure || false + + connOptions.tls = true + connOptions.tlsCAFile = configObj.tlsCaPemFile + connOptions.tlsCertificateKeyFile = configObj.tlsClientPemFile + connOptions.tlsCertificateKeyFilePassword = configObj.tlsClientKeyPassword + connOptions.tlsAllowInvalidHostnames = configObj.tlsAllowInvalidHostnames + connOptions.tlsInsecure = configObj.tlsInsecure + } + + return connOptions +} + +module.exports = LoadConfig diff --git a/src/mongowr/package-lock.json b/src/mongowr/package-lock.json new file mode 100644 index 00000000..ed9e62df --- /dev/null +++ b/src/mongowr/package-lock.json @@ -0,0 +1,157 @@ +{ + "name": "mongowr", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "mongowr", + "version": "0.1.0", + "license": "GPL-3.0-or-later", + "dependencies": { + "fs": "0.0.1-security", + "mongodb": "^6.6.2" + } + }, + "node_modules/@mongodb-js/saslprep": { + "version": "1.1.7", + "resolved": "https://registry.npmjs.org/@mongodb-js/saslprep/-/saslprep-1.1.7.tgz", + "integrity": "sha512-dCHW/oEX0KJ4NjDULBo3JiOaK5+6axtpBbS+ao2ZInoAL9/YRQLhXzSNAFz7hP4nzLkIqsfYAK/PDE3+XHny0Q==", + "dependencies": { + "sparse-bitfield": "^3.0.3" + } + }, + "node_modules/@types/webidl-conversions": { + "version": "7.0.3", + "resolved": "https://registry.npmjs.org/@types/webidl-conversions/-/webidl-conversions-7.0.3.tgz", + "integrity": "sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==" + }, + "node_modules/@types/whatwg-url": { + "version": "11.0.5", + "resolved": "https://registry.npmjs.org/@types/whatwg-url/-/whatwg-url-11.0.5.tgz", + "integrity": "sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==", + "dependencies": { + "@types/webidl-conversions": "*" + } + }, + "node_modules/bson": { + "version": "6.7.0", + "resolved": "https://registry.npmjs.org/bson/-/bson-6.7.0.tgz", + "integrity": "sha512-w2IquM5mYzYZv6rs3uN2DZTOBe2a0zXLj53TGDqwF4l6Sz/XsISrisXOJihArF9+BZ6Cq/GjVht7Sjfmri7ytQ==", + "engines": { + "node": ">=16.20.1" + } + }, + "node_modules/fs": { + "version": "0.0.1-security", + "resolved": "https://registry.npmjs.org/fs/-/fs-0.0.1-security.tgz", + "integrity": "sha512-3XY9e1pP0CVEUCdj5BmfIZxRBTSDycnbqhIOGec9QYtmVH2fbLpj86CFWkrNOkt/Fvty4KZG5lTglL9j/gJ87w==" + }, + "node_modules/memory-pager": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/memory-pager/-/memory-pager-1.5.0.tgz", + "integrity": "sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==" + }, + "node_modules/mongodb": { + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/mongodb/-/mongodb-6.6.2.tgz", + "integrity": "sha512-ZF9Ugo2JCG/GfR7DEb4ypfyJJyiKbg5qBYKRintebj8+DNS33CyGMkWbrS9lara+u+h+yEOGSRiLhFO/g1s1aw==", + "dependencies": { + "@mongodb-js/saslprep": "^1.1.5", + "bson": "^6.7.0", + "mongodb-connection-string-url": "^3.0.0" + }, + "engines": { + "node": ">=16.20.1" + }, + "peerDependencies": { + "@aws-sdk/credential-providers": "^3.188.0", + "@mongodb-js/zstd": "^1.1.0", + "gcp-metadata": "^5.2.0", + "kerberos": "^2.0.1", + "mongodb-client-encryption": ">=6.0.0 <7", + "snappy": "^7.2.2", + "socks": "^2.7.1" + }, + "peerDependenciesMeta": { + "@aws-sdk/credential-providers": { + "optional": true + }, + "@mongodb-js/zstd": { + "optional": true + }, + "gcp-metadata": { + "optional": true + }, + "kerberos": { + "optional": true + }, + "mongodb-client-encryption": { + "optional": true + }, + "snappy": { + "optional": true + }, + "socks": { + "optional": true + } + } + }, + "node_modules/mongodb-connection-string-url": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/mongodb-connection-string-url/-/mongodb-connection-string-url-3.0.1.tgz", + "integrity": "sha512-XqMGwRX0Lgn05TDB4PyG2h2kKO/FfWJyCzYQbIhXUxz7ETt0I/FqHjUeqj37irJ+Dl1ZtU82uYyj14u2XsZKfg==", + "dependencies": { + "@types/whatwg-url": "^11.0.2", + "whatwg-url": "^13.0.0" + } + }, + "node_modules/punycode": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", + "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", + "engines": { + "node": ">=6" + } + }, + "node_modules/sparse-bitfield": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/sparse-bitfield/-/sparse-bitfield-3.0.3.tgz", + "integrity": "sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==", + "dependencies": { + "memory-pager": "^1.0.2" + } + }, + "node_modules/tr46": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/tr46/-/tr46-4.1.1.tgz", + "integrity": "sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==", + "dependencies": { + "punycode": "^2.3.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/webidl-conversions": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", + "integrity": "sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==", + "engines": { + "node": ">=12" + } + }, + "node_modules/whatwg-url": { + "version": "13.0.0", + "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-13.0.0.tgz", + "integrity": "sha512-9WWbymnqj57+XEuqADHrCJ2eSXzn8WXIW/YSGaZtb2WKAInQ6CHfaUUcTyyver0p8BDg5StLQq8h1vtZuwmOig==", + "dependencies": { + "tr46": "^4.1.1", + "webidl-conversions": "^7.0.0" + }, + "engines": { + "node": ">=16" + } + } + } +} diff --git a/src/mongowr/package.json b/src/mongowr/package.json new file mode 100644 index 00000000..70b4e44f --- /dev/null +++ b/src/mongowr/package.json @@ -0,0 +1,15 @@ +{ + "name": "mongowr", + "version": "0.1.0", + "description": "{json:scada} - Mongowr - write protocol data forwarded by another JSON-SCADA installation.", + "main": "index", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "Ricardo Lastra Olsen", + "license": "GPL-3.0-or-later", + "dependencies": { + "fs": "0.0.1-security", + "mongodb": "^6.6.2" + } +} diff --git a/src/mongowr/redundancy.js b/src/mongowr/redundancy.js new file mode 100644 index 00000000..422b85b7 --- /dev/null +++ b/src/mongowr/redundancy.js @@ -0,0 +1,169 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2021 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const { setInterval } = require('timers') +const { Double } = require('mongodb') +const Log = require('./simple-logger') +const AppDefs = require('./app-defs') + +let ProcessActive = false // redundancy state +let redundancyIntervalHandle = null // timer handle + +// start processing redundancy +function Start(interval, clientMongo, db, configObj, MongoStatus) { + // check and update redundancy control + ProcessRedundancy(clientMongo, db, configObj) + clearInterval(redundancyIntervalHandle) + redundancyIntervalHandle = setInterval(function () { + if (!MongoStatus.HintMongoIsConnected){ + ProcessActive = false + return + } + + ProcessRedundancy(clientMongo, db, configObj) + }, interval) +} + +// process JSON-SCADA redundancy state for this driver module +async function ProcessRedundancy(clientMongo, db, configObj) { + if (!clientMongo || !db) return + + Log.levelCurrent = configObj.LogLevel + + const countKeepAliveUpdatesLimit = 4 + + // poor man's local static variables + if (typeof ProcessRedundancy.countKeepAliveNotUpdated === 'undefined') { + ProcessRedundancy.lastActiveNodeKeepAliveTimeTag = null + ProcessRedundancy.countKeepAliveNotUpdated = 0 + } + + Log.log('Redundancy - Process ' + (ProcessActive ? 'Active' : 'Inactive')) + + try { + // look for process instance entry, if not found create a new entry + const result = await db + .collection(configObj.ProcessInstancesCollectionName) + .findOne({ + processName: AppDefs.NAME, + processInstanceNumber: new Double(configObj.Instance), + }) + + if (!result) { + // not found, then create + ProcessActive = true + Log.log('Redundancy - Instance config not found, creating one...') + db.collection(configObj.ProcessInstancesCollectionName).insertOne({ + processName: AppDefs.NAME, + processInstanceNumber: new Double(configObj.Instance), + enabled: true, + logLevel: new Double(1), + nodeNames: [], + activeNodeName: configObj.nodeName, + activeNodeKeepAliveTimeTag: new Date(), + }) + } else { + // check for disabled or node not allowed + const instance = result + let instKeepAliveTimeTag = null + + if ('activeNodeKeepAliveTimeTag' in instance) + instKeepAliveTimeTag = instance.activeNodeKeepAliveTimeTag.toISOString() + + if (instance?.enabled === false) { + Log.log('Redundancy - Instance disabled, exiting...') + process.exit() + } + if (instance?.nodeNames !== null && instance.nodeNames.length > 0) { + if (!instance.nodeNames.includes(configObj.nodeName)) { + Log.log('Redundancy - Node name not allowed, exiting...') + process.exit() + } + } + if (instance?.activeNodeName === configObj.nodeName) { + if (!ProcessActive) Log.log('Redundancy - Node activated!') + ProcessRedundancy.countKeepAliveNotUpdated = 0 + ProcessActive = true + } else { + // other node active + if (ProcessActive) { + Log.log('Redundancy - Node deactivated!') + ProcessRedundancy.countKeepAliveNotUpdated = 0 + } + ProcessActive = false + if ( + ProcessRedundancy.lastActiveNodeKeepAliveTimeTag === + instKeepAliveTimeTag + ) { + ProcessRedundancy.countKeepAliveNotUpdated++ + Log.log( + 'Redundancy - Keep-alive from active node not updated. ' + + ProcessRedundancy.countKeepAliveNotUpdated + ) + } else { + ProcessRedundancy.countKeepAliveNotUpdated = 0 + Log.log( + 'Redundancy - Keep-alive updated by active node. Staying inactive.' + ) + } + ProcessRedundancy.lastActiveNodeKeepAliveTimeTag = instKeepAliveTimeTag + if ( + ProcessRedundancy.countKeepAliveNotUpdated > + countKeepAliveUpdatesLimit + ) { + // cnt exceeded, be active + ProcessRedundancy.countKeepAliveNotUpdated = 0 + Log.log('Redundancy - Node activated!') + ProcessActive = true + } + } + + if (ProcessActive) { + // process active, then update keep alive + db.collection(configObj.ProcessInstancesCollectionName).updateOne( + { + processName: AppDefs.NAME, + processInstanceNumber: new Double(configObj.Instance), + }, + { + $set: { + activeNodeName: configObj.nodeName, + activeNodeKeepAliveTimeTag: new Date(), + softwareVersion: AppDefs.VERSION, + stats: {}, + }, + } + ) + } + } + } + catch (err) { + Log.log('Redundancy - Error: ' + err) + } +} + +function ProcessStateIsActive() { + return ProcessActive +} + +module.exports = { + ProcessRedundancy: ProcessRedundancy, + Start: Start, + ProcessStateIsActive: ProcessStateIsActive, +} diff --git a/src/mongowr/simple-logger.js b/src/mongowr/simple-logger.js new file mode 100644 index 00000000..2ee43cfd --- /dev/null +++ b/src/mongowr/simple-logger.js @@ -0,0 +1,35 @@ +'use strict' + +/* + * {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen + * This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +const Log = { + // simple message logger + levelMin: 0, + levelNormal: 1, + levelDetailed: 2, + levelDebug: 3, + levelCurrent: 1, + log: function (msg, level = 1) { + if (level <= this.levelCurrent) { + let dt = new Date() + console.log(dt.toISOString() + ' - ' + msg) + } + } + } + +module.exports = Log