Skip to content

Commit

Permalink
feat(sparkplug): added DataTransport layer (#5077)
Browse files Browse the repository at this point in the history
* feat(sparkplug): added DataTransportLayer

Signed-off-by: Marcello Martina <[email protected]>

* build: added Paho client JAR in lib

Signed-off-by: Marcello Martina <[email protected]>

---------

Signed-off-by: Marcello Martina <[email protected]>
  • Loading branch information
marcellorinaldo authored Dec 22, 2023
1 parent 597bcf6 commit 5297748
Show file tree
Hide file tree
Showing 15 changed files with 1,474 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/target
/bin
/lib
.vscode
generated-sources/
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Bundle-SymbolicName: org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider
Bundle-Version: 1.0.0.qualifier
Require-Capability: osgi.ee;filter:="(&(osgi.ee=JavaSE)(version=1.8))"
Import-Package: com.google.protobuf;version="[3.0,4.0]",
com.google.gson;version="[2.7,3.0)",
org.eclipse.kura;version="[1.0,2.0)",
org.eclipse.kura.cloud;version="[1.1,2.0)",
org.eclipse.kura.cloudconnection;version="[1.0,1.1)",
Expand All @@ -27,4 +28,5 @@ Bundle-ActivationPolicy: lazy
Service-Component: OSGI-INF/*.xml
Bundle-Vendor: Eclipse Kura
Bundle-License: Eclipse Public License v2.0
Bundle-ClassPath: .
Bundle-ClassPath: .,
lib/org.eclipse.paho.client.mqttv3-1.2.1.k2.jar
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
cardinality="1..1"
bind="setEventAdmin"/>

<reference interface="org.eclipse.kura.data.DataTransportService"
name="DataTransportService"
policy="static"
cardinality="1..1"
bind="setDataTransportService"/>

<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="Boolean" value="true"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<provide interface="org.eclipse.kura.data.DataTransportService"/>
<provide interface="org.eclipse.kura.configuration.ConfigurableComponent"/>
</service>

<reference name="DataTransportListener"
policy="dynamic"
cardinality="0..n"
interface="org.eclipse.kura.data.DataTransportListener"/>

<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="Boolean" value="true"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,61 @@
name="SparkplugDataTransport"
description="Data Transport layer configuration.">

<Icon resource="MqttDataTransport" size="32"/>

<AD id="server.uris"
name="Server URIs"
type="String"
cardinality="0"
required="true"
default="tcp://broker1-url:1883"
description="List of space-separated URLs of the MQTT brokers to connect to.
When an attempt to connect is initiated the client will start with the
first server in the list and work through the list until a connection
is established with a server. If a connection cannot be made to any of
the servers then the connect attempt fails.
Supported types of connection are tcp: and ssl:."/>

<AD id="client.id"
name="Client ID"
type="String"
cardinality="0"
required="true"
default="client"
description="Client identifier to be used when connecting to the MQTT broker."/>

<AD id="username"
name="Username"
type="String"
cardinality="0"
required="false"
default=""
description="Username to be used when connecting to the MQTT broker."/>

<AD id="password"
name="Password"
type="Password"
cardinality="0"
required="false"
default=""
description="Password to be used when connecting to the MQTT broker."/>

<AD id="keep.alive"
name="Keep Alive Interval"
type="Integer"
cardinality="0"
required="true"
default="60"
description="Frequency in seconds for the periodic MQTT PING message."/>

<AD id="connection.timeout"
name="Connection Timeout"
type="Integer"
cardinality="0"
required="true"
default="30"
description="Timeout used for all interactions with the MQTT broker."/>

</OCD>

<Designate pid="org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ output.. = target/
bin.includes = META-INF/,\
.,\
OSGI-INF/,\
about.html
about.html,\
lib/
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.eclipse.kura.cloudconnection.listener.CloudConnectionListener;
import org.eclipse.kura.cloudconnection.listener.CloudDeliveryListener;
import org.eclipse.kura.cloudconnection.message.KuraMessage;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport;
import org.eclipse.kura.cloudconnection.subscriber.listener.CloudSubscriberListener;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.configuration.ConfigurationService;
import org.eclipse.kura.data.DataService;
import org.eclipse.kura.data.DataTransportService;
import org.eclipse.kura.data.listener.DataServiceListener;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
Expand All @@ -53,6 +55,7 @@ public class SparkplugCloudEndpoint

private DataService dataService;
private EventAdmin eventAdmin;
private SparkplugDataTransport dataTransport;

public void setDataService(DataService dataService) {
this.dataService = dataService;
Expand All @@ -62,6 +65,14 @@ public void setEventAdmin(EventAdmin eventAdmin) {
this.eventAdmin = eventAdmin;
}

public void setDataTransportService(DataTransportService dataTransport) {
if (dataTransport instanceof SparkplugDataTransport) {
this.dataTransport = (SparkplugDataTransport) dataTransport;
} else {
throw new IllegalStateException("The bound DataTransport reference is not a SparkplugDataTransport");
}
}

public void activate(Map<String, Object> properties) {
this.kuraServicePid = (String) properties.get(ConfigurationService.KURA_SERVICE_PID);
logger.info("{} - Activating", this.kuraServicePid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void createConfiguration(String pid) throws KuraException {
cloudEndpointProperties.put(DATA_SERVICE_REFERENCE_NAME,
String.format(REFERENCE_TARGET_VALUE_FORMAT, dataServicePid));
cloudEndpointProperties.put(KURA_CLOUD_CONNECTION_FACTORY_PID, FACTORY_PID);
cloudEndpointProperties.put(DATA_TRANSPORT_SERVICE_REFERENCE_NAME,
String.format(REFERENCE_TARGET_VALUE_FORMAT, dataTransportServicePid));

this.configurationService.createFactoryConfiguration(CLOUD_ENDPOINT_FACTORY_PID, pid, cloudEndpointProperties,
false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*******************************************************************************
* Copyright (c) 2023 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.message;

import java.util.Date;

import org.eclipse.tahu.protobuf.SparkplugBProto.DataType;
import org.eclipse.tahu.protobuf.SparkplugBProto.Payload;

public class SparkplugPayloads {

private SparkplugPayloads() {
}

public static byte[] getNodeDeathPayload(long bdSeq) {
return new SparkplugBProtobufPayloadBuilder().withBdSeq(bdSeq, new Date().getTime()).build();
}

public static byte[] getNodeBirthPayload(long bdSeq, long seq) {
long timestamp = new Date().getTime();

Payload.Builder protoMsg = Payload.newBuilder();

Payload.Metric.Builder bdSeqMetric = Payload.Metric.newBuilder();
bdSeqMetric.setName("bdSeq");
bdSeqMetric.setLongValue(bdSeq);
bdSeqMetric.setDatatype(DataType.Int64.getNumber());
bdSeqMetric.setTimestamp(timestamp);
protoMsg.addMetrics(bdSeqMetric.build());

Payload.Metric.Builder rebirthMetric = Payload.Metric.newBuilder();
rebirthMetric.setName("Node Control/Rebirth");
rebirthMetric.setBooleanValue(false);
rebirthMetric.setDatatype(DataType.Boolean_VALUE);
protoMsg.addMetrics(rebirthMetric.build());

protoMsg.setSeq(seq);
protoMsg.setTimestamp(timestamp);

return protoMsg.build().toByteArray();
}

}
Loading

0 comments on commit 5297748

Please sign in to comment.