Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
oisp legacy dependency removed by removing cid:componenet id check
Browse files Browse the repository at this point in the history
- SparkplugB enabled by default at mqqt-gw
- Removed old rest based kafka publish data creation on topic metric
  • Loading branch information
yshashix committed Sep 22, 2023
1 parent 98402c9 commit ed5cc32
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 558 deletions.
216 changes: 21 additions & 195 deletions api/sparkplug_data.ingestion.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
var config = require("../config");
var { Kafka, logLevel } = require('kafkajs');
const { Partitioners } = require('kafkajs');
const CacheFactory = require("../lib/cache");
const { QueryTypes } = require('sequelize');
var uuidValidate = require('uuid-validate');
const CacheFactory = require("../lib/cache");
var ngsildMapper = require("../lib/spb-ngsild-mapper");

var me;
Expand Down Expand Up @@ -52,54 +50,6 @@ const RoundRobinPartitioner = () => {
};
};


// validate value
var validate = function(value, type) {
if (type === "Number") {
if (isNaN(value)) {
return false;
} else {
return true;
}
} else if (type === "Boolean") {
return value === "0" || value === "1";
}
else if (type === "String") {
if (typeof value === "string") {
return true;
}
return false;
}

};

// normalize value
var normalizeBoolean = function (value, type) {
if (type === "Boolean") {
// checks: true, false, 0, 1, "0", "1"
if (value === true || value === "1" || value === 1) {
return "1";
}
if (value === false || value === "0" || value === 0) {
return "0";
}
// checks: "trUe", "faLSE"
try {
if (value.toLowerCase() === "true") {
return "1";
}
if (value.toLowerCase() === "false") {
return "0";
}
} catch (e) {
return "NaB";
}
return "NaB";
}
return value;
};


// @brief Aggregates messages for periodic executed Kafka producer
class KafkaAggregator {
constructor(logger) {
Expand Down Expand Up @@ -278,48 +228,13 @@ module.exports = function(logger) {
}

};
/**Retrive data from SQL to cache if not present in cache to enhance performance
* SQL check also is useful for verify if the alias/cid sent is previously registered alias or not
*/

me.getSpBDidAndDataType = async function (item) {
var cid = item.alias;
//check whether cid = uuid
if (!uuidValidate(cid)) {
throw new Error("cid not UUID. Rejected!");
}
var value = await me.cache.getValues(cid);
var didAndDataType;
if (value === null || (Array.isArray(value) && value.length === 1 && value[0] === null)) {
me.logger.debug("Could not find " + cid + "in cache. Now trying sql query.");
var sqlquery='SELECT devices.id,"dataType" FROM dashboard.device_components,dashboard.devices,dashboard.component_types WHERE "componentId"::text=\'' + cid + '\' and "deviceUID"::text=devices.uid::text and device_components."componentTypeId"::text=component_types.id::text';
me.logger.debug("Applying SQL query: " + sqlquery);
didAndDataType = await me.cache.sequelize.query(sqlquery, { type: QueryTypes.SELECT });
me.logger.debug("Result of sql query: " + JSON.stringify(didAndDataType));
} else {
me.logger.debug("Found in cache: " + JSON.stringify(value) + ", Component is valid");
didAndDataType = [value];
}
if (didAndDataType === undefined || didAndDataType === null) {
throw new Error("DB lookup failed!");
}
var redisResult = await me.cache.setValue(cid, "id", didAndDataType[0].id) && me.cache.setValue(cid, "dataType", didAndDataType[0].dataType);
didAndDataType[0].dataElement = item;
if (redisResult) {
return didAndDataType[0];
} else {
me.logger.warn("Could not store db value in redis. This will significantly reduce performance");
return didAndDataType[0];
}
};


me.createKafakaPubData = function(topic,bodyMessage){
/*** For forwarding sparkplugB data directly without kafka metrics format
* @param spBkafkaProduce is set in the config file
* @param ngsildKafkaProduce is set in the config file
* */
var subTopic = topic.split("/");
var accountId = subTopic[1];
var devID = subTopic[4];

if (config.sparkplug.spBKafkaProduce ) {
Expand All @@ -329,29 +244,13 @@ module.exports = function(logger) {
*/
bodyMessage.metrics.forEach(item => {
var kafkaMessage = item;
if (subTopic[2] === "NBIRTH") {
if (subTopic[2] === "NBIRTH" || subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") {
var key = topic;
var message = {key, value: JSON.stringify(kafkaMessage)};
me.logger.debug("Selecting kafka message topic SparkplugB with spB format payload for data type: "+subTopic[2]);
me.kafkaAggregator.addMessage(message,config.sparkplug.spBkafKaTopic);
return true;
}
/**Validating component id in the database to check for DDATA */
else if (subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") {
me.getSpBDidAndDataType(item).then(values => {
if (values) {
me.logger.debug("SpB payload is valid with component registered." + kafkaMessage.name);
var key = topic;
var addMessageTopic = config.sparkplug.spBkafKaTopic;
let message = {key, value: JSON.stringify(kafkaMessage)};
me.kafkaAggregator.addMessage(message, addMessageTopic);
return true;
}
}).catch(function(err) {
me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err);
return false;
});
}
}
});
}
if ( config.sparkplug.ngsildKafkaProduce) {
Expand All @@ -367,71 +266,25 @@ module.exports = function(logger) {
/**Validating component id in the database to check for DDATA */
else if (subTopic[2] === "DDATA") {
let metricName = kafkaMessage.name.split("/");
let metricType = metricName[0];
me.getSpBDidAndDataType(item).then(values => {
if (values) {
me.logger.debug("SpB payload is valid with component registered for :" + kafkaMessage.name);
var key = topic;
var ngsiMappedKafkaMessage;
if (metricType === "Relationship") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else if (metricType === "Property") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else {
me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: ");
}
return true;
}
}).catch(function(err) {
me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err);
return false;
});
let metricType = metricName[0];
var key = topic;
var ngsiMappedKafkaMessage;
if (metricType === "Relationship") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else if (metricType === "Property") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else {
me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: ");
}
return true;
}
});
} else if (!config.sparkplug.spBKafkaProduce && !config.sparkplug.ngsildKafkaProduce) {
/*** For forwarding sparkplugB data according to kafka metrics format to be shown in dashboard
* @param kafkaProduce is set in the config file
* */
me.logger.debug("Kafka metric topic is selected as to meet frontend dashboard data format");
// For NBIRTH message type with kafka topic, ignoring the kafka send
let subTopic = topic.split("/");
if (subTopic[2] === "NBIRTH") {
me.logger.info("Received spB NBIRTH message, ignoring currently kafka forward for metric topic");
return true;
}
// Go through payload metrics and check whether cid/alias is correct
// Check CID in catch or SQL table for security check of component registered
var promarray = [];
bodyMessage.metrics.forEach(item => {
var value = me.getSpBDidAndDataType(item);
promarray.push(value);
});
Promise.all(promarray)
.then(values =>
values.map(item => {
var kafkaMessage = me.prepareKafkaPayload(item, accountId);
if (kafkaMessage === null) {
var msg = "Validation of " + JSON.stringify(item.dataElement) + " for type " + item.dataType + " failed!";
me.logger.warn(msg);
me.sendErrorOverChannel(accountId, devID, msg);
return;
}
var key = accountId + "." + kafkaMessage.cid;
var message = {key, value: JSON.stringify(kafkaMessage)};
me.logger.debug("Received spB message, kafka payload created to forward for metric topic :" + JSON.stringify(message));
me.kafkaAggregator.addMessage(message, config.kafka.metricsTopic);
})
)
.catch(function(err) {
me.logger.warn("Could not send data to Kafka " + err);
return;
});
}
};

Expand Down Expand Up @@ -500,31 +353,4 @@ module.exports = function(logger) {
me.connectTopics();
};

/**
* Prepare datapoint from frontend data structure to Kafka like the following example:
* {"dataType":"Number", "aid":"account_id", "cid":"component_id", "value":"1",
* "systemOn": 1574262569420, "on": 1574262569420, "loc": null}
*/
me.prepareKafkaPayload = function(didAndDataType, accountId){
var dataElement = didAndDataType.dataElement;
var value = normalizeBoolean(dataElement.value.toString(), didAndDataType.dataType);
if (!validate(value, didAndDataType.dataType)) {
return null;
}
const msg = {
dataType: didAndDataType.dataType,
aid: accountId,
cid: dataElement.alias,
on: dataElement.timestamp,
systemOn : dataElement.timestamp,
value: value
};
if (undefined !== dataElement.attributes) {
msg.attributes = dataElement.attributes;
}
if (undefined !== dataElement.loc) {
msg.loc = dataElement.loc;
}
return msg;
};
};
2 changes: 1 addition & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var config = {
*
*/
"sparkplug": {
"spBKafkaProduce": parsedConfig.spbEnable || false,
"spBKafkaProduce": parsedConfig.spbEnable || true,
"spBkafKaTopic": parsedConfig.spbTopic || "sparkplugB",
"ngsildKafkaProduce": parsedConfig.ngsildEnable || false,
"ngsildKafkaTopic": parsedConfig.ngsildTopic || "ngsildSpB",
Expand Down
28 changes: 2 additions & 26 deletions test/unit/api_data_ingestionTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ describe(fileToTest, function() {
};
}
}

var origKafkaAggregator;
var cid = "dfcd5482-6fb5-4341-a887-b8041fe83dc2";
it('Shall initialize data ingestion modules Kafka and Cache', function (done) {
Expand Down Expand Up @@ -251,32 +252,7 @@ describe(fileToTest, function() {

dataIngestion.processDataIngestion("server/metric/accountId/device", message);
});
it('Validate data types', function (done) {
ToTest.__set__("Kafka", Kafka);
ToTest.__set__("CacheFactory", CacheFactory);
ToTest.__set__("config", config);

var validate = ToTest.__get__("validate");
assert.isTrue(validate("string", "String"), "Error in Validation");
assert.isTrue(validate("1.23", "Number"), "Error in Validation");
assert.isTrue(validate("1", "Boolean"), "Error in Validation");
done();
});
it('Normalize Boolean', function (done) {
ToTest.__set__("Kafka", Kafka);
ToTest.__set__("CacheFactory", CacheFactory);
ToTest.__set__("config", config);

var normalizeBoolean = ToTest.__get__("normalizeBoolean");
assert.equal(normalizeBoolean("true", "Boolean"), 1, "Error in Validation");
assert.equal(normalizeBoolean("0", "Boolean"), 0, "Error in Validation");
assert.equal(normalizeBoolean("fAlse", "Boolean"), 0, "Error in Validation");
assert.equal(normalizeBoolean(true, "Boolean"), 1, "Error in Validation");
assert.equal(normalizeBoolean(1, "Boolean"), 1, "Error in Validation");
assert.equal(normalizeBoolean("falsetrue", "Boolean"), "NaB", "Error in Validation");
assert.equal(normalizeBoolean(2, "Boolean"), "NaB", "Error in Validation");
done();
});

it('Shall check mqtt bind to topic', function (done) {
ToTest.__set__("Kafka", Kafka);
ToTest.__set__("CacheFactory", CacheFactory);
Expand Down
Loading

0 comments on commit ed5cc32

Please sign in to comment.