Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sparkplug): Edge Node implementation at Data Transport level #5098

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5ef9607
feat(sparkplug): Edge Node implementation at DataTransport level
marcellorinaldo Jan 12, 2024
ad5aeb9
fix: message handler threading rework
marcellorinaldo Jan 12, 2024
92cc228
chrore: changed executor type
marcellorinaldo Jan 12, 2024
120523b
fix: proper thread shutdown on update
marcellorinaldo Jan 15, 2024
b3e5998
feat: added example device for testing purposes
marcellorinaldo Jan 15, 2024
21dc31d
feat: added SessionStatus for managing state transitions
marcellorinaldo Jan 15, 2024
651bf7a
refactor: renaming
marcellorinaldo Jan 15, 2024
dc7f15f
refactor: renaming
marcellorinaldo Jan 15, 2024
328e324
refactor: better SessionStatus management
marcellorinaldo Jan 15, 2024
64c17e6
fix: made BdSeqCounter thread safe
marcellorinaldo Jan 16, 2024
1899150
refactor: replaced messages queue with ExecutorService submits
marcellorinaldo Jan 16, 2024
bd24267
refactor: better error handling on connect
marcellorinaldo Jan 16, 2024
4519626
fix: confirming session outside the client
marcellorinaldo Jan 16, 2024
53bc41b
refactor: simplified BdSeqCounter
marcellorinaldo Jan 16, 2024
7cdb8c5
refactor: better options class, added logs, moved session estabilish …
marcellorinaldo Jan 16, 2024
7c968e4
test: added test cases and refactor of SparkplugDataTransportOptionsTest
marcellorinaldo Jan 16, 2024
70568d4
refactor: removed unecessary reference to DataTransportService in Clo…
marcellorinaldo Jan 16, 2024
54eb309
test: fixed CloudEndpoint tests
marcellorinaldo Jan 16, 2024
a2586a9
test: replaced unit test with integration tests for SparkplugDataTran…
marcellorinaldo Jan 18, 2024
965d51b
test: added test cases, removed negative test verification due to spu…
marcellorinaldo Jan 19, 2024
f21329f
test: added test cases
marcellorinaldo Jan 19, 2024
6fcb9d7
test: added test cases
marcellorinaldo Jan 19, 2024
98cab29
fix: using Random to generate int values
marcellorinaldo Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright (c) 2023 Eurotech and/or its affiliates and others
Copyright (c) 2023, 2024 Eurotech and/or its affiliates and others

This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
Expand All @@ -20,19 +20,41 @@
description="Data Transport layer configuration.">

<Icon resource="MqttDataTransport" size="32"/>

<AD id="group.id"
name="Sparkplug Group ID"
type="String"
cardinality="0"
required="true"
default="group"
description="Sparkplug Group identifier to which this Sparkplug Edge Node belongs."/>

<AD id="node.id"
name="Sparkplug Edge Node ID"
type="String"
cardinality="0"
required="true"
default="node"
description="Sparkplug Edge Node identifier to use for this Cloud Connection."/>

<AD id="primary.host.application.id"
name="Sparkplug Primary Host Application ID"
type="String"
cardinality="0"
required="false"
default=""
description="Sparkplug Primary Host Application to associate with this Sparkplug Edge Node."/>

<AD id="server.uris"
name="Server URIs"
type="String"
cardinality="0"
required="true"
default="tcp://broker1-url:1883"
description="List of space-separated URLs of the MQTT brokers to connect to.
When an attempt to connect is initiated the client will start with the
first server in the list and work through the list until a connection
is established with a server. If a connection cannot be made to any of
the servers then the connect attempt fails.
Supported types of connection are tcp: and ssl:."/>
description="List of space-separated URIs of the MQTT brokers to connect to.
Supported types of connection are tcp: and ssl:. URIs must not end with /.
If a primary.host.application.id has been set, the client will cycle
over the list until a Primary Host Application becomes online."/>

<AD id="client.id"
name="Client ID"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import org.eclipse.kura.cloudconnection.listener.CloudConnectionListener;
import org.eclipse.kura.cloudconnection.listener.CloudDeliveryListener;
import org.eclipse.kura.cloudconnection.message.KuraMessage;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugPayloads;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugTopics;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport;
import org.eclipse.kura.cloudconnection.subscriber.listener.CloudSubscriberListener;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.configuration.ConfigurationService;
import org.eclipse.kura.data.DataService;
import org.eclipse.kura.data.DataTransportService;
import org.eclipse.kura.data.listener.DataServiceListener;
import org.eclipse.kura.type.StringValue;
import org.eclipse.kura.type.TypedValue;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
Expand Down Expand Up @@ -172,12 +176,32 @@ public void unregisterCloudConnectionListener(CloudConnectionListener cloudConne
@Override
public void onConnectionEstablished() {
logger.debug("{} - Connection estabilished", this.kuraServicePid);

sendExampleDeviceBirth();

this.cloudConnectionListeners.forEach(listener -> callSafely(listener::onConnectionEstablished));
postConnectionChangeEvent(true);

// TO DO: init subscriptions
}

private void sendExampleDeviceBirth() {
try {
this.dataService.subscribe(SparkplugTopics.getDeviceCommandTopic("g1", "n1", "d1"), 1);

Map<String, TypedValue<?>> metrics = new HashMap<>();
TypedValue<String> value = new StringValue("test.value");
metrics.put("test.key", value);

String topic = SparkplugTopics.getDeviceBirthTopic("g1", "n1", "d1");
byte[] payload = SparkplugPayloads.getDeviceBirthPayload(1, metrics);

this.dataService.publish(topic, payload, 0, false, 7);
} catch (KuraException e) {
logger.error("Error in example", e);
}
}

@Override
public void onDisconnecting() {
// nothing to do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2023 Eurotech and/or its affiliates and others
* Copyright (c) 2023, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -13,12 +13,20 @@
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.message;

import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;

import org.eclipse.kura.type.TypedValue;
import org.eclipse.tahu.protobuf.SparkplugBProto.DataType;
import org.eclipse.tahu.protobuf.SparkplugBProto.Payload;
import org.eclipse.tahu.protobuf.SparkplugBProto.Payload.Metric;

import com.google.protobuf.InvalidProtocolBufferException;

public class SparkplugPayloads {

public static final String NODE_CONTROL_REBIRTH_METRIC_NAME = "Node Control/Rebirth";

private SparkplugPayloads() {
}

Expand All @@ -29,25 +37,39 @@ public static byte[] getNodeDeathPayload(long bdSeq) {
public static byte[] getNodeBirthPayload(long bdSeq, long seq) {
long timestamp = new Date().getTime();

Payload.Builder protoMsg = Payload.newBuilder();
SparkplugBProtobufPayloadBuilder payloadBuilder = new SparkplugBProtobufPayloadBuilder();
payloadBuilder.withBdSeq(bdSeq, timestamp);
payloadBuilder.withMetric(NODE_CONTROL_REBIRTH_METRIC_NAME, false, DataType.Boolean, timestamp);
payloadBuilder.withSeq(seq);
payloadBuilder.withTimestamp(timestamp);

Payload.Metric.Builder bdSeqMetric = Payload.Metric.newBuilder();
bdSeqMetric.setName("bdSeq");
bdSeqMetric.setLongValue(bdSeq);
bdSeqMetric.setDatatype(DataType.Int64.getNumber());
bdSeqMetric.setTimestamp(timestamp);
protoMsg.addMetrics(bdSeqMetric.build());
return payloadBuilder.build();
}

Payload.Metric.Builder rebirthMetric = Payload.Metric.newBuilder();
rebirthMetric.setName("Node Control/Rebirth");
rebirthMetric.setBooleanValue(false);
rebirthMetric.setDatatype(DataType.Boolean_VALUE);
protoMsg.addMetrics(rebirthMetric.build());
public static byte[] getDeviceBirthPayload(long seq, Map<String, TypedValue<?>> metrics) {
long timestamp = new Date().getTime();

protoMsg.setSeq(seq);
protoMsg.setTimestamp(timestamp);
SparkplugBProtobufPayloadBuilder payloadBuilder = new SparkplugBProtobufPayloadBuilder();
for (Entry<String, TypedValue<?>> metricEntry : metrics.entrySet()) {
payloadBuilder.withMetric(metricEntry.getKey(), metricEntry.getValue(), timestamp);
}

payloadBuilder.withSeq(seq);
payloadBuilder.withTimestamp(timestamp);

return payloadBuilder.build();
}

return protoMsg.build().toByteArray();
public static boolean getBooleanMetric(String metricName, byte[] rawPayload)
throws InvalidProtocolBufferException, NoSuchFieldException {
Payload payload = Payload.parseFrom(rawPayload);
for (Metric metric : payload.getMetricsList()) {
if (metric.getName().equals(metricName)) {
return metric.getBooleanValue();
}
}

throw new NoSuchFieldException("Metric " + metricName + " not found in payload");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*******************************************************************************
* Copyright (c) 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport;

public class BdSeqCounter {

private int currentBdSeq = 0;
private int nextBdSeq = 0;
marcellorinaldo marked this conversation as resolved.
Show resolved Hide resolved

public synchronized void next() {
if (this.nextBdSeq > 255) {
this.nextBdSeq = 0;
}

this.currentBdSeq = this.nextBdSeq++;
}

public synchronized int getCurrent() {
return this.currentBdSeq;
}

}
Loading