Skip to content

Commit

Permalink
Merge pull request #179 from json-scada/master
Browse files Browse the repository at this point in the history
Fixes for MQTT and AdminUI
  • Loading branch information
riclolsen authored Dec 1, 2024
2 parents ca1c06d + eb5a9e1 commit bc1868e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
5 changes: 3 additions & 2 deletions src/AdminUI/src/components/TagsTab.vue
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@
const closeEditTag = () => {
dialogEditTag.value = false
fetchTags()
}
const closeDeleteTag = () => {
Expand Down Expand Up @@ -972,12 +973,12 @@
}
const addNewProtocolDestination = async () => {
editedTag.protocolDestinations.push(newProtocolDestination.value)
editedTag.value.protocolDestinations.push(newProtocolDestination.value)
dialogAddProtocolDestination.value = false
}
const addNewParcel = async () => {
editedTag.parcels.push(newParcel.value)
editedTag.value.parcels.push(newParcel.value)
dialogAddParcel.value = false
}
Expand Down
3 changes: 2 additions & 1 deletion src/mqtt-sparkplug/app-defs.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module.exports = {
ENV_PREFIX: 'JS_MQTTSPB_',
AUTOTAG_PREFIX: 'MQTT',
MSG: '{json:scada} - MQTT-Sparkplug-B Client Driver',
VERSION: '0.2.0',
VERSION: '0.2.1',
MAX_QUEUEDMETRICS: 10000,
SPARKPLUG_PUBLISH_INTERVAL: 777,
SPARKPLUG_COMPRESS_DBIRTH: false,
Expand All @@ -34,4 +34,5 @@ module.exports = {
MQTT_CONNECTION_TIMEOUT: 30,
MQTT_CONNECTION_KEEPALIVE: 15,
SECONDS_BETWEEN_NODE_REQUESTS: 300,
SECONDS_BETWEEN_REBIRTHS: 30,
}
40 changes: 32 additions & 8 deletions src/mqtt-sparkplug/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const SparkplugPublishQueue = new Queue() // queue of values to publish as Spark
let SparkplugDeviceBirthed = false
let AutoCreateTags = true
const MongoStatus = { HintMongoIsConnected: false }
let LastNodeRebirth = 0

;(async () => {
const jsConfig = LoadConfig() // load and parse config file
Expand Down Expand Up @@ -1144,16 +1145,22 @@ async function sparkplugProcess(

spClient.handle.on('error', function (error) {
Log.log(logMod + "Event: Can't connect" + error)
if (spClient?.handle) spClient.handle.stop()
spClient.handle = null
})

spClient.handle.on('close', function () {
SparkplugDeviceBirthed = false
Log.log(logMod + 'Event: Connection Closed')
if (spClient?.handle) spClient.handle.stop()
spClient.handle = null
})

spClient.handle.on('offline', function () {
SparkplugDeviceBirthed = false
Log.log(logMod + 'Event: Connection Offline...')
if (spClient?.handle) spClient.handle.stop()
spClient.handle = null
})

// Create 'birth' handler
Expand Down Expand Up @@ -1406,17 +1413,16 @@ async function sparkplugProcess(
if (elem) {
// test if published topic matches subscribed element
if (topic == elem) {
let JsonValue = TryPayloadAsRJson(payload)
const JsonValue = TryPayloadAsRJson(payload)
EnqueueJsonValue(JsonValue, topic)
// match = true
}

if (
autoTag &&
elem.endsWith('#') &&
topic.startsWith(elem.substring(0, elem.length - 1))
) {
let JsonValue = TryPayloadAsRJson(payload)
const JsonValue = TryPayloadAsRJson(payload)
EnqueueJsonValue(JsonValue, topic)
}

Expand All @@ -1425,7 +1431,7 @@ async function sparkplugProcess(

// when used '*~' at the end of JSONPathPlus explode property names
if (elem.endsWith('*~')) {
let jpt = JsonPathTopic(elem)
const jpt = JsonPathTopic(elem)
if (jpt.jsonPath !== '' && topicMatchSub(topic)(jpt.topic)) {
const jpPropNames = JSONPath({
path: jpt.jsonPath,
Expand All @@ -1434,7 +1440,9 @@ async function sparkplugProcess(
})
jpPropNames.forEach((propName) => {
let newTopic = elem.replace('*~', propName)
let jptNew = JsonPathTopic(newTopic)
const jptNew = JsonPathTopic(newTopic)
newTopic = newTopic.replace('$.', '')

// extract value from payload using JSON PATH
const jpRes = JSONPath({
path: jptNew.jsonPath,
Expand All @@ -1444,9 +1452,10 @@ async function sparkplugProcess(
EnqueueJsonValue(jpRes, newTopic)
})
}
return
}

let jpt = JsonPathTopic(elem)
const jpt = JsonPathTopic(elem)
if (jpt.jsonPath !== '' && topicMatchSub(topic)(jpt.topic)) {
// extract value from payload using JSON PATH
const jpRes = JSONPath({
Expand Down Expand Up @@ -1477,7 +1486,20 @@ async function sparkplugProcess(
jscadaConnection.groupId.trim() === ''
)
return

if (metric?.value === true) {
if (
LastNodeRebirth + AppDefs.SECONDS_BETWEEN_REBIRTHS * 1000 >
new Date().getTime()
) {
Log.log(
logModS +
`Node Rebirth command received, ignoring until after ${AppDefs.SECONDS_BETWEEN_REBIRTHS} seconds`
)
return
}
LastNodeRebirth = new Date().getTime()

Log.log(logModS + 'Node Rebirth command received')
// Publish Node BIRTH certificate
let nbc = getNodeBirthPayload(configObj)
Expand Down Expand Up @@ -1622,7 +1644,8 @@ async function sparkplugProcess(
nodeLocator in DevicesList &&
'lastReqDateTime' in DevicesList[nodeLocator] &&
new Date().getTime() <
DevicesList[nodeLocator].lastReqDateTime + AppDefs.SECONDS_BETWEEN_NODE_REQUESTS * 1000
DevicesList[nodeLocator].lastReqDateTime +
AppDefs.SECONDS_BETWEEN_NODE_REQUESTS * 1000
) {
Log.log(
logModS +
Expand Down Expand Up @@ -1681,7 +1704,8 @@ async function sparkplugProcess(
nodeLocator in DevicesList &&
'lastReqDateTime' in DevicesList[nodeLocator] &&
new Date().getTime() <
DevicesList[nodeLocator].lastReqDateTime + AppDefs.SECONDS_BETWEEN_NODE_REQUESTS * 1000
DevicesList[nodeLocator].lastReqDateTime +
AppDefs.SECONDS_BETWEEN_NODE_REQUESTS * 1000
) {
Log.log(
logModS +
Expand Down

0 comments on commit bc1868e

Please sign in to comment.