-
-
Notifications
You must be signed in to change notification settings - Fork 83
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added mongofw and mongowr processes.
- Loading branch information
Showing
21 changed files
with
1,554 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
'use strict' | ||
|
||
/* | ||
* {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen | ||
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, version 3. | ||
* | ||
* This program is distributed in the hope that it will be useful, but | ||
* WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
* General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
module.exports = { | ||
NAME: 'MONGOFW', | ||
ENV_PREFIX: 'JS_MONGOFW', | ||
MSG: '{json:scada} - Mongofw - forward data from protocol drivers to another JSON-SCADA installation.', | ||
VERSION: '0.1.0', | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
'use strict' | ||
|
||
/* | ||
* Customizable processor of mongodb changes via change streams. | ||
* | ||
* THIS FILE IS INTENDED TO BE CUSTOMIZED BY USERS TO DO SPECIAL PROCESSING | ||
* | ||
* {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen | ||
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, version 3. | ||
* | ||
* This program is distributed in the hope that it will be useful, but | ||
* WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
* General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
const Log = require('./simple-logger') | ||
const { Double } = require('mongodb') | ||
const { setInterval } = require('timers') | ||
const dgram = require('dgram'); | ||
|
||
// UDP broadcast options | ||
const udpPort = 12345; | ||
const udpHostDst = "192.168.0.255"; | ||
// Create a UDP socket | ||
const udpSocket = dgram.createSocket('udp4'); | ||
|
||
udpSocket.bind(udpPort, () => { | ||
udpSocket.setBroadcast(true) | ||
// udpSocket.setMulticastInterface('::%eth1'); | ||
}); | ||
|
||
|
||
let maxSz = 0; | ||
let cnt = 0; | ||
|
||
const UserActionsCollectionName = 'userActions' | ||
const RealtimeDataCollectionName = 'realtimeData' | ||
const CommandsQueueCollectionName = 'commandsQueue' | ||
const SoeDataCollectionName = 'soeData' | ||
const ProcessInstancesCollectionName = 'processInstances' | ||
const ProtocolDriverInstancesCollectionName = 'protocolDriverInstances' | ||
const ProtocolConnectionsCollectionName = 'protocolConnections' | ||
|
||
let CyclicIntervalHandle = null | ||
|
||
// this will be called by the main module when mongo is connected (or reconnected) | ||
module.exports.CustomProcessor = function ( | ||
clientMongo, | ||
jsConfig, | ||
Redundancy, | ||
MongoStatus | ||
) { | ||
if (clientMongo === null) return | ||
const db = clientMongo.db(jsConfig.mongoDatabaseName) | ||
|
||
// ------------------------------------------------------------------------------------------- | ||
// EXAMPLE OF CYCLIC PROCESSING AT INTERVALS | ||
// BEGIN EXAMPLE | ||
|
||
let CyclicProcess = async function () { | ||
// do cyclic processing at each CyclicInterval ms | ||
|
||
if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected) | ||
return // do nothing if process is inactive | ||
|
||
try { | ||
let res = await db | ||
.collection(RealtimeDataCollectionName) | ||
.findOne({ _id: -2 }) // id of point tag with number of digital updates | ||
|
||
Log.log( | ||
'Custom Process - Checking number of digital updates: ' + | ||
res.valueString | ||
) | ||
} catch (err) { | ||
Log.log(err) | ||
} | ||
|
||
return | ||
} | ||
const CyclicInterval = 5000 // interval time in ms | ||
clearInterval(CyclicIntervalHandle) // clear older instances if any | ||
CyclicIntervalHandle = setInterval(CyclicProcess, CyclicInterval) // start a cyclic processing | ||
|
||
// EXAMPLE OF CYCLIC PROCESSING AT INTERVALS | ||
// END EXAMPLE | ||
// ------------------------------------------------------------------------------------------- | ||
|
||
const changeStreamUserActions = db | ||
.collection(RealtimeDataCollectionName) | ||
.watch( | ||
{ $match: { operationType: 'update' } }, | ||
{ | ||
fullDocument: 'updateLookup' | ||
} | ||
) | ||
|
||
try { | ||
changeStreamUserActions.on('error', change => { | ||
if (clientMongo) clientMongo.close() | ||
clientMongo = null | ||
Log.log('Custom Process - Error on changeStreamUserActions!') | ||
}) | ||
changeStreamUserActions.on('close', change => { | ||
Log.log('Custom Process - Closed changeStreamUserActions!') | ||
}) | ||
changeStreamUserActions.on('end', change => { | ||
if (clientMongo) clientMongo.close() | ||
clientMongo = null | ||
Log.log('Custom Process - Ended changeStreamUserActions!') | ||
}) | ||
|
||
// start listen to changes | ||
changeStreamUserActions.on('change', change => { | ||
// Log.log(change.fullDocument) | ||
if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected) | ||
return // do nothing if process is inactive | ||
|
||
// will send only update data from drivers | ||
if (!change.updateDescription.updatedFields?.sourceDataUpdate) | ||
return; | ||
if (change.updateDescription.updatedFields?.sourceDataUpdate?.valueBsonAtSource) | ||
delete change.updateDescription.updatedFields.sourceDataUpdate.valueBsonAtSource; | ||
if (!Redundancy.ProcessStateIsActive()) return // do nothing if process is inactive | ||
const fwObj = { | ||
cnt: cnt++, | ||
operationType: change.operationType, | ||
documentKey: change.documentKey, | ||
updateDescription: change.updateDescription | ||
} | ||
const opData = JSON.stringify(fwObj); | ||
const message = Buffer.from(opData); | ||
if (message.length > maxSz) maxSz = message.length; | ||
|
||
if (message.length > 60000) { | ||
console.log('Message too large: ', opData); | ||
} | ||
else | ||
udpSocket.send(message, 0, message.length, udpPort, udpHostDst, (err, bytes) => { | ||
if (err) { | ||
console.log('UDP error:', err); | ||
} else { | ||
// console.log('Data sent via UDP', opData); | ||
//console.log('Size: ', message.length); | ||
//console.log('Max: ', maxSz); | ||
} | ||
|
||
}); | ||
|
||
}) | ||
} catch (e) { | ||
Log.log('Custom Process - Error: ' + e) | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
'use strict' | ||
|
||
/* | ||
* Customizable processor of mongodb changes via change streams. | ||
* DO NOT EDIT THIS FILE! CUSTOMIZE THE customized_module.js file | ||
* {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen | ||
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada). | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, version 3. | ||
* | ||
* This program is distributed in the hope that it will be useful, but | ||
* WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
* General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
const Log = require('./simple-logger') | ||
const LoadConfig = require('./load-config') | ||
const Redundancy = require('./redundancy') | ||
const { MongoClient } = require('mongodb') | ||
const CustomProcessor = require('./customized_module').CustomProcessor | ||
|
||
const args = process.argv.slice(2) | ||
let inst = null | ||
if (args.length > 0) inst = parseInt(args[0]) | ||
|
||
let logLevel = null | ||
if (args.length > 1) logLevel = parseInt(args[1]) | ||
let confFile = null | ||
if (args.length > 2) confFile = args[2] | ||
const jsConfig = LoadConfig(confFile, logLevel, inst) | ||
const MongoStatus = {HintMongoIsConnected : false} | ||
Log.log('Connecting to MongoDB server...') | ||
;(async () => { | ||
let clientMongo = null | ||
while (true) { | ||
if (clientMongo === null) | ||
await MongoClient.connect( | ||
jsConfig.mongoConnectionString, | ||
jsConfig.MongoConnectionOptions | ||
) | ||
.then(async (client) => { | ||
clientMongo = client | ||
MongoStatus.HintMongoIsConnected = true | ||
const db = clientMongo.db(jsConfig.mongoDatabaseName) | ||
Log.log('Connected correctly to MongoDB server') | ||
Redundancy.Start(5000, clientMongo, db, jsConfig, MongoStatus) | ||
CustomProcessor(clientMongo, jsConfig, Redundancy, MongoStatus) | ||
}) | ||
.catch(function (err) { | ||
if (clientMongo) clientMongo.close() | ||
clientMongo = null | ||
Log.log(err) | ||
}) | ||
|
||
// wait 5 seconds | ||
await new Promise((resolve) => setTimeout(resolve, 5000)) | ||
|
||
// detect connection problems, if error will null the client to later reconnect | ||
if (clientMongo === undefined) { | ||
Log.log('Disconnected Mongodb!') | ||
clientMongo = null | ||
} | ||
if (clientMongo) | ||
if (!(await checkConnectedMongo(clientMongo))) { | ||
// not anymore connected, will retry | ||
Log.log('Disconnected Mongodb!') | ||
if (clientMongo) clientMongo.close() | ||
clientMongo = null | ||
} | ||
} | ||
})() | ||
|
||
// test mongoDB connectivity | ||
async function checkConnectedMongo(client) { | ||
if (!client) { | ||
return false | ||
} | ||
const CheckMongoConnectionTimeout = 1000 | ||
let tr = setTimeout(() => { | ||
Log.log('Mongo ping timeout error!') | ||
MongoStatus.HintMongoIsConnected = false | ||
}, CheckMongoConnectionTimeout) | ||
|
||
let res = null | ||
try { | ||
res = await client.db('admin').command({ ping: 1 }) | ||
clearTimeout(tr) | ||
} catch (e) { | ||
Log.log('Error on mongodb connection!') | ||
return false | ||
} | ||
if ('ok' in res && res.ok) { | ||
MongoStatus.HintMongoIsConnected = true | ||
return true | ||
} else { | ||
MongoStatus.HintMongoIsConnected = false | ||
return false | ||
} | ||
} |
Oops, something went wrong.