From 76e25a43c0b1ac9743901ae9b4a9f5c8e123244f Mon Sep 17 00:00:00 2001 From: yshashix Date: Fri, 22 Sep 2023 10:43:36 +0200 Subject: [PATCH] oisp legacy dependency removed by removing cid:componenet id check - SparkplugB enabled by default at mqqt-gw - Removed old rest based kafka data processing and creation on kafka publish topic - metric --- api/data.ingestion.js | 298 ----------------- api/sparkplug_data.ingestion.js | 216 ++----------- app.js | 6 +- config.js | 6 +- test/unit/api_data_ingestionTest.js | 409 ------------------------ test/unit/api_spb_data_ingestionTest.js | 363 --------------------- 6 files changed, 23 insertions(+), 1275 deletions(-) delete mode 100644 api/data.ingestion.js delete mode 100644 test/unit/api_data_ingestionTest.js diff --git a/api/data.ingestion.js b/api/data.ingestion.js deleted file mode 100644 index 1bc58487..00000000 --- a/api/data.ingestion.js +++ /dev/null @@ -1,298 +0,0 @@ -/** -* Copyright (c) 2017, 2020 Intel Corporation -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - - - -"use strict"; -var config = require("../config"); -var { Kafka, logLevel } = require('kafkajs'); -const { Partitioners } = require('kafkajs'); -var me; - -const CacheFactory = require("../lib/cache"); - - -// Round Robin partitioner used to handle single clients with -// too high throughput -const RoundRobinPartitioner = () => { - var curPartition = 0; - return ({ partitionMetadata}) => { - var numPartitions = partitionMetadata.length; - var partition = curPartition % numPartitions; - curPartition++; - return partition; - }; -}; - -// validate value -var validate = function(value, type) { - if (type === "Number") { - if (isNaN(value)) { - return false; - } else { - return true; - } - } else if (type === "Boolean") { - return value === "0" || value === "1"; - } - else if (type === "String") { - if (typeof value === "string") { - return true; - } - return false; - } - -}; - -// normalize value -var normalizeBoolean = function (value, type) { - if (type === "Boolean") { - // checks: true, false, 0, 1, "0", "1" - if (value === true || value === "1" || value === 1) { - return "1"; - } - if (value === false || value === "0" || value === 0) { - return "0"; - } - // checks: "trUe", "faLSE" - try { - if (value.toLowerCase() === "true") { - return "1"; - } - if (value.toLowerCase() === "false") { - return "0"; - } - } catch (e) { - return "NaB"; - } - return "NaB"; - } - return value; -}; - -// @brief Aggregates messages for periodic executed Kafka producer -class KafkaAggregator { - constructor(logger) { - this.logger = logger; - this.messageArray = []; - var brokers = config.kafka.host.split(','); - const kafka = new Kafka({ - logLevel: logLevel.INFO, - brokers: brokers, - clientId: 'frontend-metrics', - requestTimeout: config.kafka.requestTimeout, - retry: { - maxRetryTime: config.kafka.maxRetryTime, - retries: config.kafka.retries - } - }); - if (config.kafka.partitioner === "roundRobinPartitioner") { - this.kafkaProducer = kafka.producer({createPartitioner: RoundRobinPartitioner}); - this.logger.info("Round Robin partitioner enforced"); - } else { - this.kafkaProducer = kafka.producer({createPartitioner: Partitioners.DefaultPartitioner}); - this.logger.info("Default partitioner is used"); - } - const { CONNECT, DISCONNECT } = this.kafkaProducer.events; - this.kafkaProducer.on(DISCONNECT, e => { - this.logger.log(`Metric producer disconnected!: ${e.timestamp}`); - this.kafkaProducer.connect(); - }); - this.kafkaProducer.on(CONNECT, e => logger.debug("Kafka metric producer connected: " + e)); - this.kafkaProducer.connect().catch(e => { - this.logger.error("Exception occured while creating Kafka Producer: " + e); - process.exit(1); - }); - } - start(timer) { - this.intervalObj = setInterval(() => { - if (this.messageArray.length === 0) { - return; - } - console.log('messageHash consist of ' + this.messageArray.length + " Items"); - - var payloads = { - topic: config.kafka.metricsTopic, - messages: this.messageArray - }; - this.logger.debug("will now deliver Kafka message: " + JSON.stringify(payloads)); - this.kafkaProducer.send(payloads) - .catch((err) => { - return this.logger.error("Could not send message to Kafka: " + err); - } - ); - this.messageArray = []; - }, timer); - } - stop() { - clearInterval(this.intervalObj); - } - addMessage(message) { - this.messageArray.push(message); - } -} - -module.exports = function(logger) { - var topics_subscribe = config.topics.subscribe; - var topics_publish = config.topics.publish; - var dataSchema = require("../lib/schemas/data.json"); - var Validator = require('jsonschema').Validator; - var validator = new Validator(); - var cache = new CacheFactory(config, logger).getInstance(); - me = this; - me.kafkaAggregator = new KafkaAggregator(logger); - me.kafkaAggregator.start(config.kafka.linger); - - me.logger = logger; - me.cache = cache; - me.token = null; - - me.stopAggregator = function() { - me.kafkaAggregator.stop(); - }; - - me.getToken = function (did) { - /*jshint unused:false*/ - return new Promise(function(resolve, reject) { - resolve(null); - }); - }; - - me.processDataIngestion = function (topic, message) { - - var did; - var accountId; - /* - It will be checked if the ttl exist, if it exits the package need to be discarded - */ - me.logger.debug( - "Data Submission Detected : " + topic + " Message " + JSON.stringify(message) - ); - - //legacy mqtt format is putting actual message in body. - //Future formats will directly send valid data. - var bodyMessage; - if (message.body === undefined) { - bodyMessage = message; - } else { - bodyMessage = message.body; - } - - let validationResult = validator.validate(bodyMessage, dataSchema["POST"]); - if (validationResult.errors.length > 0) { - me.logger.info("Schema rejected message! Message will be discarded: " + bodyMessage); - return null; - } else { - //Get accountId - var match = topic.match(/server\/metric\/([^\/]*)\/(.*)/); - accountId = match[1]; - - // Go through data and check whether cid is correct - // Also retrieve dataType - var promarray = []; - bodyMessage.data.forEach(item => { - var value = me.cache.getDidAndDataType(item); - promarray.push(value); - } - ); - Promise.all(promarray) - .then(values => { - values.map(item => { - var kafkaMessage = me.prepareKafkaPayload(item, accountId); - if (kafkaMessage === null) { - var msg = "Validation of " + JSON.stringify(item.dataElement) + " for type " + item.dataType + " failed!"; - me.logger.warn(msg); - me.sendErrorOverChannel(accountId, did, msg); - return; - } - var key = accountId + "." + kafkaMessage.cid; - var message = {key, value: JSON.stringify(kafkaMessage)}; - - me.kafkaAggregator.addMessage(message); - }); - }) - .catch(function(err) { - me.logger.warn("Could not send data to Kafka " + err); - }); - } - }; - me.connectTopics = function() { - me.broker.bind(topics_subscribe.data_ingestion, me.processDataIngestion); - return true; - }; - me.handshake = function () { - if (me.broker) { - me.broker.on('reconnect', function(){ - me.logger.debug('Reconnect topics' ); - me.broker.unbind(topics_subscribe.data_ingestion, function () { - me.token = null; - me.connectTopics(); - me.sessionObject = {}; - }); - }); - } - }; - - // setup channel to provide error feedback to device agent - me.sendErrorOverChannel = function(accountId, did, message) { - var path = me.broker.buildPath(topics_publish.error, [accountId, did]); - me.broker.publish(path, message, null); - }; - /** - * @description It's bind to the MQTT topics - * @param broker - */ - me.bind = function (broker) { - me.broker = broker; - me.handshake(); - me.connectTopics(); - }; - - - /** - * Prepare datapoint from frontend data structure to Kafka like the following example: - * {"dataType":"Number", "aid":"account_id", "cid":"component_id", "value":"1", - * "systemOn": 1574262569420, "on": 1574262569420, "loc": null} - */ - me.prepareKafkaPayload = function(didAndDataType, accountId){ - var dataElement = didAndDataType.dataElement; - var value = normalizeBoolean(dataElement.value.toString(), didAndDataType.dataType); - if (!validate(value, didAndDataType.dataType)) { - return null; - } - const msg = { - dataType: didAndDataType.dataType, - aid: accountId, - cid: dataElement.componentId, - on: dataElement.on, - value: value - }; - if (dataElement.systemOn !== undefined) { - msg.systemOn = dataElement.systemOn; - } else { - msg.systemOn = dataElement.on; - } - - if (undefined !== dataElement.attributes) { - msg.attributes = dataElement.attributes; - } - if (undefined !== dataElement.loc) { - msg.loc = dataElement.loc; - } - - return msg; - }; -}; diff --git a/api/sparkplug_data.ingestion.js b/api/sparkplug_data.ingestion.js index 366280a7..9416f1e8 100644 --- a/api/sparkplug_data.ingestion.js +++ b/api/sparkplug_data.ingestion.js @@ -18,9 +18,7 @@ var config = require("../config"); var { Kafka, logLevel } = require('kafkajs'); const { Partitioners } = require('kafkajs'); -const CacheFactory = require("../lib/cache"); -const { QueryTypes } = require('sequelize'); -var uuidValidate = require('uuid-validate'); +const CacheFactory = require("../lib/cache"); var ngsildMapper = require("../lib/spb-ngsild-mapper"); var me; @@ -52,54 +50,6 @@ const RoundRobinPartitioner = () => { }; }; - -// validate value -var validate = function(value, type) { - if (type === "Number") { - if (isNaN(value)) { - return false; - } else { - return true; - } - } else if (type === "Boolean") { - return value === "0" || value === "1"; - } - else if (type === "String") { - if (typeof value === "string") { - return true; - } - return false; - } - -}; - -// normalize value -var normalizeBoolean = function (value, type) { - if (type === "Boolean") { - // checks: true, false, 0, 1, "0", "1" - if (value === true || value === "1" || value === 1) { - return "1"; - } - if (value === false || value === "0" || value === 0) { - return "0"; - } - // checks: "trUe", "faLSE" - try { - if (value.toLowerCase() === "true") { - return "1"; - } - if (value.toLowerCase() === "false") { - return "0"; - } - } catch (e) { - return "NaB"; - } - return "NaB"; - } - return value; -}; - - // @brief Aggregates messages for periodic executed Kafka producer class KafkaAggregator { constructor(logger) { @@ -278,48 +228,13 @@ module.exports = function(logger) { } }; - /**Retrive data from SQL to cache if not present in cache to enhance performance - * SQL check also is useful for verify if the alias/cid sent is previously registered alias or not - */ - - me.getSpBDidAndDataType = async function (item) { - var cid = item.alias; - //check whether cid = uuid - if (!uuidValidate(cid)) { - throw new Error("cid not UUID. Rejected!"); - } - var value = await me.cache.getValues(cid); - var didAndDataType; - if (value === null || (Array.isArray(value) && value.length === 1 && value[0] === null)) { - me.logger.debug("Could not find " + cid + "in cache. Now trying sql query."); - var sqlquery='SELECT devices.id,"dataType" FROM dashboard.device_components,dashboard.devices,dashboard.component_types WHERE "componentId"::text=\'' + cid + '\' and "deviceUID"::text=devices.uid::text and device_components."componentTypeId"::text=component_types.id::text'; - me.logger.debug("Applying SQL query: " + sqlquery); - didAndDataType = await me.cache.sequelize.query(sqlquery, { type: QueryTypes.SELECT }); - me.logger.debug("Result of sql query: " + JSON.stringify(didAndDataType)); - } else { - me.logger.debug("Found in cache: " + JSON.stringify(value) + ", Component is valid"); - didAndDataType = [value]; - } - if (didAndDataType === undefined || didAndDataType === null) { - throw new Error("DB lookup failed!"); - } - var redisResult = await me.cache.setValue(cid, "id", didAndDataType[0].id) && me.cache.setValue(cid, "dataType", didAndDataType[0].dataType); - didAndDataType[0].dataElement = item; - if (redisResult) { - return didAndDataType[0]; - } else { - me.logger.warn("Could not store db value in redis. This will significantly reduce performance"); - return didAndDataType[0]; - } - }; - + me.createKafakaPubData = function(topic,bodyMessage){ /*** For forwarding sparkplugB data directly without kafka metrics format * @param spBkafkaProduce is set in the config file * @param ngsildKafkaProduce is set in the config file * */ var subTopic = topic.split("/"); - var accountId = subTopic[1]; var devID = subTopic[4]; if (config.sparkplug.spBKafkaProduce ) { @@ -329,29 +244,13 @@ module.exports = function(logger) { */ bodyMessage.metrics.forEach(item => { var kafkaMessage = item; - if (subTopic[2] === "NBIRTH") { + if (subTopic[2] === "NBIRTH" || subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") { var key = topic; var message = {key, value: JSON.stringify(kafkaMessage)}; me.logger.debug("Selecting kafka message topic SparkplugB with spB format payload for data type: "+subTopic[2]); me.kafkaAggregator.addMessage(message,config.sparkplug.spBkafKaTopic); return true; - } - /**Validating component id in the database to check for DDATA */ - else if (subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") { - me.getSpBDidAndDataType(item).then(values => { - if (values) { - me.logger.debug("SpB payload is valid with component registered." + kafkaMessage.name); - var key = topic; - var addMessageTopic = config.sparkplug.spBkafKaTopic; - let message = {key, value: JSON.stringify(kafkaMessage)}; - me.kafkaAggregator.addMessage(message, addMessageTopic); - return true; - } - }).catch(function(err) { - me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err); - return false; - }); - } + } }); } if ( config.sparkplug.ngsildKafkaProduce) { @@ -367,71 +266,25 @@ module.exports = function(logger) { /**Validating component id in the database to check for DDATA */ else if (subTopic[2] === "DDATA") { let metricName = kafkaMessage.name.split("/"); - let metricType = metricName[0]; - me.getSpBDidAndDataType(item).then(values => { - if (values) { - me.logger.debug("SpB payload is valid with component registered for :" + kafkaMessage.name); - var key = topic; - var ngsiMappedKafkaMessage; - if (metricType === "Relationship") { - ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage); - me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage)); - let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; - me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); - } else if (metricType === "Property") { - ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage); - me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage)); - let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; - me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); - } else { - me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: "); - } - return true; - } - }).catch(function(err) { - me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err); - return false; - }); + let metricType = metricName[0]; + var key = topic; + var ngsiMappedKafkaMessage; + if (metricType === "Relationship") { + ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage); + me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage)); + let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; + me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); + } else if (metricType === "Property") { + ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage); + me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage)); + let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; + me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); + } else { + me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: "); + } + return true; } }); - } else if (!config.sparkplug.spBKafkaProduce && !config.sparkplug.ngsildKafkaProduce) { - /*** For forwarding sparkplugB data according to kafka metrics format to be shown in dashboard - * @param kafkaProduce is set in the config file - * */ - me.logger.debug("Kafka metric topic is selected as to meet frontend dashboard data format"); - // For NBIRTH message type with kafka topic, ignoring the kafka send - let subTopic = topic.split("/"); - if (subTopic[2] === "NBIRTH") { - me.logger.info("Received spB NBIRTH message, ignoring currently kafka forward for metric topic"); - return true; - } - // Go through payload metrics and check whether cid/alias is correct - // Check CID in catch or SQL table for security check of component registered - var promarray = []; - bodyMessage.metrics.forEach(item => { - var value = me.getSpBDidAndDataType(item); - promarray.push(value); - }); - Promise.all(promarray) - .then(values => - values.map(item => { - var kafkaMessage = me.prepareKafkaPayload(item, accountId); - if (kafkaMessage === null) { - var msg = "Validation of " + JSON.stringify(item.dataElement) + " for type " + item.dataType + " failed!"; - me.logger.warn(msg); - me.sendErrorOverChannel(accountId, devID, msg); - return; - } - var key = accountId + "." + kafkaMessage.cid; - var message = {key, value: JSON.stringify(kafkaMessage)}; - me.logger.debug("Received spB message, kafka payload created to forward for metric topic :" + JSON.stringify(message)); - me.kafkaAggregator.addMessage(message, config.kafka.metricsTopic); - }) - ) - .catch(function(err) { - me.logger.warn("Could not send data to Kafka " + err); - return; - }); } }; @@ -500,31 +353,4 @@ module.exports = function(logger) { me.connectTopics(); }; - /** - * Prepare datapoint from frontend data structure to Kafka like the following example: - * {"dataType":"Number", "aid":"account_id", "cid":"component_id", "value":"1", - * "systemOn": 1574262569420, "on": 1574262569420, "loc": null} - */ - me.prepareKafkaPayload = function(didAndDataType, accountId){ - var dataElement = didAndDataType.dataElement; - var value = normalizeBoolean(dataElement.value.toString(), didAndDataType.dataType); - if (!validate(value, didAndDataType.dataType)) { - return null; - } - const msg = { - dataType: didAndDataType.dataType, - aid: accountId, - cid: dataElement.alias, - on: dataElement.timestamp, - systemOn : dataElement.timestamp, - value: value - }; - if (undefined !== dataElement.attributes) { - msg.attributes = dataElement.attributes; - } - if (undefined !== dataElement.loc) { - msg.loc = dataElement.loc; - } - return msg; - }; }; diff --git a/app.js b/app.js index 1084dddc..280a1365 100644 --- a/app.js +++ b/app.js @@ -17,7 +17,6 @@ "use strict"; var Broker = require("./lib/mqtt/connector"), - ApiData = require('./api/data.ingestion'), SparkplugApiData = require('./api/sparkplug_data.ingestion'), ApiActuation = require('./api/actuation'), config = require("./config"), @@ -30,10 +29,7 @@ var brokerConnector = Broker.singleton(config.broker, logger); function brokerCb(err) { if (!err) { - // Manage Connections to API Server - var apiDataConnector = new ApiData(logger); - apiDataConnector.bind(brokerConnector); - //SparkplugB connector if enabled + //SparkplugB connector var sparkplugapiDataConnector = new SparkplugApiData(logger); sparkplugapiDataConnector.bind(brokerConnector); var apiActuationConnector = new ApiActuation(logger, brokerConnector); diff --git a/config.js b/config.js index ebe96047..0a882cae 100644 --- a/config.js +++ b/config.js @@ -96,10 +96,6 @@ var config = { }, "topics": { "prefix": parsedConfig.topicsPrefix || "server", - "subscribe": { - "data_ingestion": "$share/bridge/server/metric/+/+", - "health": "server/devices/+/health" - }, "publish": { "error": "server/error/{accountId}/{deviceId}", "actuation": "/{accountId}/DCMD/{gatewayId}/{deviceId}" @@ -113,7 +109,7 @@ var config = { * */ "sparkplug": { - "spBKafkaProduce": parsedConfig.spbEnable || false, + "spBKafkaProduce": parsedConfig.spbEnable || true, "spBkafKaTopic": parsedConfig.spbTopic || "sparkplugB", "ngsildKafkaProduce": parsedConfig.ngsildEnable || false, "ngsildKafkaTopic": parsedConfig.ngsildTopic || "ngsildSpB", diff --git a/test/unit/api_data_ingestionTest.js b/test/unit/api_data_ingestionTest.js deleted file mode 100644 index a3d8ee00..00000000 --- a/test/unit/api_data_ingestionTest.js +++ /dev/null @@ -1,409 +0,0 @@ -/** -* Copyright (c) 2021 Intel Corporation -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -var assert = require('chai').assert, - rewire = require('rewire'); -var fileToTest = "../../api/data.ingestion.js"; - -describe(fileToTest, function() { - - process.env.OISP_KEYCLOAK_CONFIG = '{\ - "listenerPort": 4080, \ - "auth-server-url": "keycloak" \ - }'; - process.env.OISP_KAFKA_CONFIG = '{\ - "uri": "uri", \ - "partitions": 1, \ - "metricsPartitions": 1, \ - "replication": 1, \ - "timeoutMs": 10000, \ - "topicsObservations": "metricsTopic", \ - "topicsRuleEngine": "rules-update", \ - "topicsHeartbeatName": "heartbeat", \ - "topicsHeartbeatInterval": 5000, \ - "maxPayloadSize": 1234456, \ - "retries": 10, \ - "requestTimeout": 4, \ - "maxRetryTime": 10 \ - }'; - process.env.OISP_MQTT_GATEWAY_CONFIG = '{ \ - "mqttBrokerUrl": "brokerUrl", \ - "mqttBrokerLocalPort": "1234", \ - "mqttBrokerUsername": "brokerUsername", \ - "mqttBrokerPassword": "brokerPassword", \ - "authServicePort": "2345", \ - "redisConf": "@@OISP_REDIS_CONFIG", \ - "kafkaConfig": "@@OISP_KAFKA_CONFIG", \ - "postgresConfig": "@@OISP_POSTGRES_CONFIG", \ - "keycloakConfig": "@@OISP_KEYCLOAK_CONFIG", \ - "aesKey": "/app/keys/mqtt/mqtt_gw_secret.key" \ - }'; - - process.env.OISP_REDIS_CONFIG = '{\ - "hostname": "redis",\ - "port": "6379",\ - "password": "password" \ - }'; - - process.env.OISP_POSTGRES_CONFIG = '{\ - "dbname": "oisp",\ - "hostname": "postgres-ro",\ - "writeHostname": "postgres",\ - "port": "5432",\ - "su_username": "su_username",\ - "su_password": "su_password",\ - "username": "username",\ - "password": "password"\ - }'; - - var config = { - "mqttBrokerUrl": "brokerUrl", - "mqttBrokerLocalPort": "1234", - "mqttBrokerUsername": "brokerUsername", - "mqttBrokerPassword": "brokerPassword", - "authServicePort": "2345", - "topics": { - "subscribe": { - "data_ingestion": "$share/bridge/server/metric/+/+" - } - }, - "cache": { - "hostname": "redis", - "port": "6379", - "password": "password" - }, - "kafka": { - "host": "uri", - "partitions": 1, - "metricsPartitions": 1, - "replication": 1, - "timeoutMs": 10000, - "metricsTopic": "metricsTopic", - "topicsRuleEngine": "rules-update", - "topicsHeartbeatName": "heartbeat", - "topicsHeartbeatInterval": 5000, - "maxPayloadSize": 1234456, - "retries": 10, - "requestTimeout": 4, - "maxRetryTime": 10 - }, - "postgres": { - "dbname": "oisp", - "hostname": "postgres-ro", - "writeHostname": "postgres", - "port": "5432", - "su_username": "su_username", - "su_password": "su_password", - "username": "username", - "password": "password" - }, - "aesKey": "/app/keys/mqtt/mqtt_gw_secret.key" - }; - var ToTest = rewire(fileToTest); - - var Kafka = function() { - return { - producer: function(){ - return { - connect: function() { - return Promise.resolve(); - }, - on: function() { - }, - send: function() { - done(); - }, - events: "event" - }; - } - }; - }; - - var logger = { - error: function(){ - - }, - debug: function() { - - }, - info: function() { - - } - }; - var broker = { - bind : function(subscribeTopics){ - assert.equal(subscribeTopics,"$share/bridge/server/metric/+/+","sparkplugb Topic subscribed"); - return true; - }, - on : function(){ - return true; - }, - unbind : function(){ - return true; - }, - publish : function(){ - return true; - }, - buildPath : function(){ - return true; - } - }; - - class KafkaAggregator { - start(){} - stop(){} - addMessage(){} - } - - class CacheFactory { - constructor() { - - } - getInstance() { - return { - getDidAndDataType: function(getDidAndData) { - return getDidAndData; - } - }; - } - } - var origKafkaAggregator; - var cid = "dfcd5482-6fb5-4341-a887-b8041fe83dc2"; - it('Shall initialize data ingestion modules Kafka and Cache', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - origKafkaAggregator = ToTest.__get__("KafkaAggregator"); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var dataIngestion = new ToTest(logger); - assert.isObject(dataIngestion); - done(); - }); - - it('Ingest data to Kafka', function (done) { - var Kafka = function() { - return { - producer: function(){ - return { - connect: function() { - return Promise.resolve(); - }, - on: function() { - }, - send: function(payload) { - message = payload.messages[0]; - assert.equal(message.key, "accountId." + cid, "Received Kafka payload key not correct"); - var value = { - dataType: "String", - aid: "accountId", - cid:"dfcd5482-6fb5-4341-a887-b8041fe83dc2", - value:"value", - systemOn:1, - on:1, - loc:null - }; - assert.deepEqual(JSON.parse(message.value), value, "Received Kafke message not correct"); - dataIngestion.stopAggregator(); - done(); - return new Promise(() => {}); - }, - events: "event" - }; - } - }; - }; - var prepareKafkaPayload = function(){ - return {"dataType":"String", "aid":"accountId", "cid":cid, "value":"value", "systemOn": 1, "on": 1, "loc": null}; - }; - - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("config", config); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("KafkaAggregator", origKafkaAggregator); - - var dataIngestion = new ToTest(logger); - - dataIngestion.prepareKafkaPayload = prepareKafkaPayload; - var message = { - accountId: "accountId", - on: 1, - data: [ - { - "componentId": cid, - "on": 1, - "value": "value" - } - ] - }; - - dataIngestion.processDataIngestion("server/metric/accountId/device", message); - }); - it('Validate data types', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - - var validate = ToTest.__get__("validate"); - assert.isTrue(validate("string", "String"), "Error in Validation"); - assert.isTrue(validate("1.23", "Number"), "Error in Validation"); - assert.isTrue(validate("1", "Boolean"), "Error in Validation"); - done(); - }); - it('Normalize Boolean', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - - var normalizeBoolean = ToTest.__get__("normalizeBoolean"); - assert.equal(normalizeBoolean("true", "Boolean"), 1, "Error in Validation"); - assert.equal(normalizeBoolean("0", "Boolean"), 0, "Error in Validation"); - assert.equal(normalizeBoolean("fAlse", "Boolean"), 0, "Error in Validation"); - assert.equal(normalizeBoolean(true, "Boolean"), 1, "Error in Validation"); - assert.equal(normalizeBoolean(1, "Boolean"), 1, "Error in Validation"); - assert.equal(normalizeBoolean("falsetrue", "Boolean"), "NaB", "Error in Validation"); - assert.equal(normalizeBoolean(2, "Boolean"), "NaB", "Error in Validation"); - done(); - }); - it('Shall check mqtt bind to topic', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var dataIngestion = new ToTest(logger); - dataIngestion.bind(broker); - done(); - }); - - it('Shall prepare Kafka payload', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var dataIngestion = new ToTest(logger); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "componentId": cid, - "on": 1, - "value": "value", - "systemOn": 2 - } - }; - - var msg = dataIngestion.prepareKafkaPayload(didAndDataType, "accountId"); - var expectedMsg = { - dataType: "String", - aid: "accountId", - value: "value", - cid: cid, - on: 1, - systemOn: 2 - }; - assert.deepEqual(msg, expectedMsg, "Wrong kafka payload"); - done(); - }); - - it('Shall prepare Kafka payload for kafka topic with loc and attributes', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var dataIngestion = new ToTest(logger); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "componentId": cid, - "on": 1, - "value": "value", - "systemOn": 2, - "attributes": { - hardware_model : "linux" - }, - loc : { - lat : 88, - long : 64 - } - } - }; - - var msg = dataIngestion.prepareKafkaPayload(didAndDataType, "accountId"); - var expectedMsg = { - dataType: "String", - aid: "accountId", - value: "value", - cid: cid, - on: 1, - systemOn: 2, - attributes: { - hardware_model : "linux" - }, - loc : { - lat : 88, - long : 64 - } - }; - assert.deepEqual(msg, expectedMsg, "Wrong kafka payload"); - done(); - }); - - it('Process data Ingestion with missing data field in received payload', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var dataIngestion = new ToTest(logger); - var message = { - accountId: "accountId", - on: 1, - data: [{}] - }; - - let processDataIngestReturn = dataIngestion.processDataIngestion("server/metric/accountId/device", message); - assert.deepEqual(processDataIngestReturn, null, "Unable to validate Metric message received schema"); - done(); - }); - - it('Shall processdataIngestion return undefined due to null return on preparekafkafkapayload of datatype', function (done) { - var prepareKafkaPayload = function(){ - return null; - }; - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("config", config); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - - var dataIngestion = new ToTest(logger); - dataIngestion.prepareKafkaPayload = prepareKafkaPayload; - var message = { - accountId: "accountId", - on: 1, - data: [ - { - "componentId": cid, - "on": 1, - "value": "value" - } - ] - }; - var KafkaProcessDataIngestionReturn = dataIngestion.processDataIngestion("server/metric/accountId/device", message); - assert.deepEqual(KafkaProcessDataIngestionReturn, undefined, " Able to process data ingestion with null preparekafka payload"); - done(); - }); -}); diff --git a/test/unit/api_spb_data_ingestionTest.js b/test/unit/api_spb_data_ingestionTest.js index d697d8f6..8994bf16 100644 --- a/test/unit/api_spb_data_ingestionTest.js +++ b/test/unit/api_spb_data_ingestionTest.js @@ -246,21 +246,6 @@ describe(fileToTest, function() { assert.equal(type, "seq", "Wrong key value"); return 0; }, - getValues(key) { - assert.deepEqual(key, cid, "Wrong CID cache value received."); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "componentId": cid, - "on": 12345, - "value": "value", - "systemOn": 2 - } - }; - return didAndDataType; - }, setValue(key, type) { assert.oneOf(key, ["accountId/eonID",cid], "Wrong key on cache value received."); assert.oneOf(type, [ "seq", "id", "dataType" ], "Wrong type to se value received."); @@ -317,7 +302,6 @@ describe(fileToTest, function() { timestamp: 12345, metrics: [{ name : "bdseq", - alias : cid, timestamp : 12345, dataType : "Uint64", value: 123 @@ -341,7 +325,6 @@ describe(fileToTest, function() { timestamp: 12345, metrics: [{ name : "temp", - alias : cid, timestamp : 12345, dataType : "Uint64", value: 123 @@ -457,78 +440,6 @@ describe(fileToTest, function() { .catch((e) => done(e)); }); - it('Verify SparkplugB metric existence and get its DeviceId and dataType from cache using alias Id', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var DataMessage = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - }], - seq: 2 - }; - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - } - }; - spbdataIngestion.getSpBDidAndDataType(DataMessage.metrics[0]) - .then((result) => { - assert.deepEqual(result, didAndDataType, "Invalid alias ID specific to device, not in cache/DB "); - done(); - }) - .catch((e) => done(e)); - }); - - it('KafkaProduce for SparkplugB metric fails due to Invalid CID/Alias Id', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var DataMessage = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : "c574252-31d5-4b76-bce6-53f2c56b", - timestamp : 12345, - dataType : "Uint64", - value: "value" - }], - seq: 2 - }; - var validateSpbDevSeq = function() { - return Promise.resolve (true); - }; - var errorMessage = "cid not UUID. Rejected!"; - spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType(DataMessage.metrics[0]) - .then(() => { - done(); - }) - .catch((err) => { - assert.deepEqual(err, errorMessage, "Valid alias ID specific to device "); - done(); - }); - var kafkaPubReturn= spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DBIRTH/eonID/deviceId", DataMessage); - assert.oneOf(kafkaPubReturn, [ false, undefined ], " Possible to produce kafka message for wrong alias/CID id: " + kafkaPubReturn); - done(); - }); - it('KafkaProduce for SparkplugB metric fails due to empty sparkplugB message', function (done) { config.sparkplug.spBKafkaProduce = true; ToTest.__set__("Kafka", Kafka); @@ -617,22 +528,6 @@ describe(fileToTest, function() { return Promise.resolve (true); }; - var getSpBDidAndDataType = async function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - } - }; - return didAndDataType; - - }; config.sparkplug.spBKafkaProduce = true; ToTest.__set__("Kafka", Kafka); ToTest.__set__("config", config); @@ -641,7 +536,6 @@ describe(fileToTest, function() { var spbdataIngestion = new ToTest(logger); spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; var message = { timestamp: 12345, metrics: [{ @@ -658,127 +552,6 @@ describe(fileToTest, function() { spbdataIngestion.processDataIngestion("spBv1.0/accountId/DDATA/eonID/deviceId", message); }); - it('Create Kafka publish data on kafka metric topic', function (done) { - var Kafka = function() { - return { - producer: function(){ - return { - connect: function() { - }, - on: function() { - }, - send: function(payload) { - message = payload.messages[0]; - assert.equal(message.key, "accountId." + cid,"Received Kafka payload key not correct"); - var value = { - dataType: "String", - aid: "accountId", - cid: cid, - value:"value", - systemOn:1, - on:1, - loc:null - }; - assert.deepEqual(JSON.parse(message.value), value, "Received Kafke message not correct"); - spbdataIngestion.stopAggregator(); - done(); - return new Promise(() => {}); - }, - events: "event" - }; - } - }; - }; - var prepareKafkaPayload = function(){ - return {"dataType":"String", "aid":"accountId", "cid":cid, "value":"value", "systemOn": 1, "on": 1, "loc": null}; - }; - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - } - }; - return didAndDataType; - }; - - config.sparkplug.spBKafkaProduce = false; - config.sparkplug.ngsildKafkaProduce = false; - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("config", config); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("KafkaAggregator", origKafkaAggregator); - - var spbdataIngestion = new ToTest(logger); - - spbdataIngestion.prepareKafkaPayload = prepareKafkaPayload; - - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; - var message = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - }], - seq: 0 - }; - spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DBIRTH/eonID/deviceId", message); - spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DDATA/eonID/deviceId", message); - }); - - it('Shall Kafka publish data on kafka metric topic return undefined due to mismatch of datatype', function (done) { - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "Boolean", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Boolean", - value: "NoTrue" - } - }; - return didAndDataType; - }; - - config.sparkplug.spBKafkaProduce = false; - config.sparkplug.ngsildKafkaProduce = false; - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("config", config); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - - var spbdataIngestion = new ToTest(logger); - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; - var message = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Boolean", - value: "NoTrue" - }], - seq: 0 - }; - var KafkaPubDataReturnNbirth = spbdataIngestion.createKafakaPubData("spBv1.0/accountId/NBIRTH/eonID", message); - assert.deepEqual(KafkaPubDataReturnNbirth, true, "Unable to send NBIRTH Message"); - var KafkaPubDataReturnDdata = spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DDATA/eonID/deviceId", message); - assert.deepEqual(KafkaPubDataReturnDdata, undefined, "Unable to validate createkafkaPubData with wrong datatype"); - done(); - }); - it('Create Kafka publish Relationship data on NGSI-LD Spb topic', function (done) { var Kafka = function() { return { @@ -816,22 +589,6 @@ describe(fileToTest, function() { return Promise.resolve (true); }; - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "Relationship/https://industry-fusion.com/types/v0.9/hasFilter", - alias : cid, - timestamp : 12345, - dataType : "string", - value: "value" - } - }; - return Promise.resolve (didAndDataType); - - }; config.sparkplug.spBKafkaProduce = false; config.sparkplug.ngsildKafkaProduce = true; ToTest.__set__("Kafka", Kafka); @@ -841,7 +598,6 @@ describe(fileToTest, function() { var spbdataIngestion = new ToTest(logger); spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; var message = { timestamp: 12345, metrics: [{ @@ -893,22 +649,6 @@ describe(fileToTest, function() { return Promise.resolve (true); }; - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "Property/https://industry-fusion.com/types/v0.9/hasFilter", - alias : cid, - timestamp : 12345, - dataType : "string", - value: "value" - } - }; - return Promise.resolve (didAndDataType); - - }; config.sparkplug.spBKafkaProduce = true; config.sparkplug.ngsildKafkaProduce = true; ToTest.__set__("Kafka", Kafka); @@ -918,7 +658,6 @@ describe(fileToTest, function() { var spbdataIngestion = new ToTest(logger); spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; var message = { timestamp: 12345, metrics: [{ @@ -1139,33 +878,6 @@ describe(fileToTest, function() { assert.deepEqual(processDataIngestReturn, true, "Unable to validate SParkplugB schema"); done(); }); - - it('Validate data types', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - - var validate = ToTest.__get__("validate"); - assert.isTrue(validate("string", "String"), "Error in Validation"); - assert.isTrue(validate("1.23", "Number"), "Error in Validation"); - assert.isTrue(validate("1", "Boolean"), "Error in Validation"); - done(); - }); - it('Normalize Boolean', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - - var normalizeBoolean = ToTest.__get__("normalizeBoolean"); - assert.equal(normalizeBoolean("true", "Boolean"), 1, "Error in Validation"); - assert.equal(normalizeBoolean("0", "Boolean"), 0, "Error in Validation"); - assert.equal(normalizeBoolean("fAlse", "Boolean"), 0, "Error in Validation"); - assert.equal(normalizeBoolean(true, "Boolean"), 1, "Error in Validation"); - assert.equal(normalizeBoolean(1, "Boolean"), 1, "Error in Validation"); - assert.equal(normalizeBoolean("falsetrue", "Boolean"), "NaB", "Error in Validation"); - assert.equal(normalizeBoolean(2, "Boolean"), "NaB", "Error in Validation"); - done(); - }); it('Shall check mqtt bind to topic', function (done) { ToTest.__set__("Kafka", Kafka); @@ -1179,79 +891,4 @@ describe(fileToTest, function() { done(); }); - it('Shall prepare Kafka payload for kafka topic', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "alias": cid, - "on": 1, - "value": "value", - "timestamp": 1 - } - }; - - var msg = spbdataIngestion.prepareKafkaPayload(didAndDataType, "accountId"); - var expectedMsg = { - dataType: "String", - aid: "accountId", - value: "value", - cid: cid, - on: 1, - systemOn: 1 - }; - assert.deepEqual(msg, expectedMsg, "Wrong kafka payload"); - done(); - }); - - it('Shall prepare Kafka payload for kafka topic with loc and attributes', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "alias": cid, - "on": 1, - "value": "value", - "timestamp": 1, - "attributes": { - hardware_model : "linux" - }, - loc : { - lat : 88, - long : 64 - } - } - }; - - var msg = spbdataIngestion.prepareKafkaPayload(didAndDataType, "accountId"); - var expectedMsg = { - dataType: "String", - aid: "accountId", - value: "value", - cid: cid, - on: 1, - systemOn: 1, - attributes: { - hardware_model : "linux" - }, - loc : { - lat : 88, - long : 64 - } - }; - assert.deepEqual(msg, expectedMsg, "Wrong kafka payload"); - done(); - }); });