Skip to content

Commit

Permalink
feat(sparkplug): added SparkplugSubscriber (#5119)
Browse files Browse the repository at this point in the history
* refactor(sparkplug): created utilites package to common factor some code

Signed-off-by: Marcello Martina <[email protected]>

* feat(sparkplug): added SparkplugSubscriber

Signed-off-by: Marcello Martina <[email protected]>

* test: added SubscriptionRecordTest

Signed-off-by: Marcello Martina <[email protected]>

* test: added SubscriptionsMapTest

Signed-off-by: Marcello Martina <[email protected]>

* test: added SparkplugSubscriberTest

Signed-off-by: Marcello Martina <[email protected]>

* test: applied review suggestions

Signed-off-by: Marcello Martina <[email protected]>

---------

Signed-off-by: Marcello Martina <[email protected]>
  • Loading branch information
marcellorinaldo authored Feb 19, 2024
1 parent 523744f commit 1c36691
Show file tree
Hide file tree
Showing 15 changed files with 1,325 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2024 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Eurotech
-->
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0"
configuration-policy="require"
activate="activate"
deactivate="deactivate"
modified="update"
enabled="true"
immediate="true"
name="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber">

<implementation class="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber"/>

<service>
<provide interface="org.eclipse.kura.cloudconnection.subscriber.CloudSubscriber"/>
<provide interface="org.eclipse.kura.configuration.ConfigurableComponent"/>
</service>

<property name="service.pid" type="String" value="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber"/>
<property name="cloud.connection.factory.pid" type="String" value="org.eclipse.kura.cloudconnection.sparkplug.mqtt.endpoint.SparkplugCloudEndpoint"/>
<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="String" value="true"/>

</scr:component>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2024 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Eurotech
-->
<MetaData xmlns="http://www.osgi.org/xmlns/metatype/v1.2.0" localization="en_us">

<OCD id="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber"
name="SparkplugSubscriber"
description="Component that serves as a CloudSubscriber for this Sparkplug Cloud Connection.">

<AD id="topic.filter"
name="Topic Filter"
type="String"
cardinality="0"
required="true"
default="A/B/C"
description="The MQTT subscription topic filter. For example foo/bar/baz, foo/+/bar, #, foo/#."/>

<AD id="qos"
name="QoS"
type="Integer"
cardinality="0"
required="true"
default="0"
description="The maximum desired quality of service for the subscription messages.">

<Option label="QoS 0 - at most once" value="0" />
<Option label="QoS 1 - at least once" value="1" />
<Option label="QoS 2 - exactly once" value="2" />
</AD>

</OCD>

<Designate pid="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber"
factoryPid="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber">
<Object ocdref="org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber"/>
</Designate>
</MetaData>
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,17 @@
import org.eclipse.kura.KuraErrorCode;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.cloudconnection.CloudConnectionConstants;
import org.eclipse.kura.cloudconnection.CloudConnectionManager;
import org.eclipse.kura.cloudconnection.listener.CloudConnectionListener;
import org.eclipse.kura.cloudconnection.listener.CloudDeliveryListener;
import org.eclipse.kura.cloudconnection.message.KuraMessage;
import org.eclipse.kura.cloudconnection.publisher.CloudPublisher;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.endpoint.SparkplugCloudEndpoint;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugMessageType;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.utils.InvocationUtils;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.utils.SparkplugCloudEndpointTracker;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.component.ComponentContext;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +48,7 @@ public class SparkplugDevice
public static final String KEY_DEVICE_ID = "device.id";

private String deviceId;
private ServiceTracker<CloudConnectionManager, CloudConnectionManager> cloudConnectionManagerTracker;
private SparkplugCloudEndpointTracker endpointTracker;
private Optional<SparkplugCloudEndpoint> sparkplugCloudEndpoint = Optional.empty();
private final Set<CloudConnectionListener> cloudConnectionListeners = new CopyOnWriteArraySet<>();
private final Set<CloudDeliveryListener> cloudDeliveryListeners = new CopyOnWriteArraySet<>();
Expand All @@ -66,18 +61,12 @@ public class SparkplugDevice

public void activate(final ComponentContext componentContext, final Map<String, Object> properties)
throws InvalidSyntaxException {
String selectedCloudEndpointPid = (String) properties
String endpointPid = (String) properties
.get(CloudConnectionConstants.CLOUD_ENDPOINT_SERVICE_PID_PROP_NAME.value());

String filterString = String.format("(&(%s=%s)(kura.service.pid=%s))", Constants.OBJECTCLASS,
CloudConnectionManager.class.getName(), selectedCloudEndpointPid);

final BundleContext context = componentContext.getBundleContext();
final Filter filter = context.createFilter(filterString);
this.cloudConnectionManagerTracker = new ServiceTracker<>(context, filter,
new CloudConnectionManagerTrackerCustomizer(context));

this.executorService.submit(() -> this.cloudConnectionManagerTracker.open());
this.endpointTracker = new SparkplugCloudEndpointTracker(componentContext.getBundleContext(),
this::setSparkplugCloudEndpoint, this::unsetSparkplugCloudEndpoint, endpointPid);
this.endpointTracker.startEndpointTracker();

update(properties);
}
Expand All @@ -96,9 +85,7 @@ public void update(final Map<String, Object> properties) {
public void deactivate() {
logger.info("Sparkplug Device {} - Deactivating", this.deviceId);

if (Objects.nonNull(this.cloudConnectionManagerTracker)) {
this.cloudConnectionManagerTracker.close();
}
this.endpointTracker.stopEndpointTracker();

logger.debug("Sparkplug Device {} - Shutting down executor service", this.deviceId);
this.executorService.shutdownNow();
Expand Down Expand Up @@ -180,64 +167,26 @@ public void unregisterCloudDeliveryListener(final CloudDeliveryListener cloudDel

@Override
public void onMessageConfirmed(final String messageId) {
this.cloudDeliveryListeners
.forEach(listener -> this.executorService.execute(() -> listener.onMessageConfirmed(messageId)));
this.cloudDeliveryListeners.forEach(listener -> this.executorService
.execute(() -> InvocationUtils.callSafely(listener::onMessageConfirmed, messageId)));
}

/*
* Utils
*/

synchronized void setSparkplugCloudEndpoint(SparkplugCloudEndpoint endpoint) {
private synchronized void setSparkplugCloudEndpoint(SparkplugCloudEndpoint endpoint) {
this.sparkplugCloudEndpoint = Optional.of(endpoint);
this.sparkplugCloudEndpoint.get().registerCloudConnectionListener(this);
this.sparkplugCloudEndpoint.get().registerCloudDeliveryListener(this);
}

synchronized void unsetSparkplugCloudEndpoint(SparkplugCloudEndpoint endpoint) {
private synchronized void unsetSparkplugCloudEndpoint(SparkplugCloudEndpoint endpoint) {
if (this.sparkplugCloudEndpoint.isPresent() && this.sparkplugCloudEndpoint.get() == endpoint) {
this.sparkplugCloudEndpoint.get().unregisterCloudConnectionListener(this);
this.sparkplugCloudEndpoint.get().unregisterCloudDeliveryListener(this);
this.sparkplugCloudEndpoint = Optional.empty();
}
}

private class CloudConnectionManagerTrackerCustomizer
implements ServiceTrackerCustomizer<CloudConnectionManager, CloudConnectionManager> {

private final BundleContext context;

public CloudConnectionManagerTrackerCustomizer(BundleContext context) {
this.context = context;
}

@Override
public synchronized CloudConnectionManager addingService(
final ServiceReference<CloudConnectionManager> reference) {
CloudConnectionManager cloudConnectionManager = this.context.getService(reference);

if (cloudConnectionManager instanceof SparkplugCloudEndpoint) {
setSparkplugCloudEndpoint((SparkplugCloudEndpoint) cloudConnectionManager);
return cloudConnectionManager;
} else {
this.context.ungetService(reference);
}

return null;
}

@Override
public synchronized void removedService(final ServiceReference<CloudConnectionManager> reference,
final CloudConnectionManager service) {
unsetSparkplugCloudEndpoint((SparkplugCloudEndpoint) service);
}

@Override
public synchronized void modifiedService(final ServiceReference<CloudConnectionManager> reference,
final CloudConnectionManager service) {
// Not needed
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.eclipse.kura.KuraConnectException;
import org.eclipse.kura.KuraDisconnectException;
Expand All @@ -34,6 +35,8 @@
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugMessageType;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugPayloads;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugTopics;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.subscriber.SparkplugSubscriber;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.utils.InvocationUtils;
import org.eclipse.kura.cloudconnection.subscriber.listener.CloudSubscriberListener;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.configuration.ConfigurationService;
Expand All @@ -44,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.InvalidProtocolBufferException;

public class SparkplugCloudEndpoint
implements ConfigurableComponent, CloudEndpoint, CloudConnectionManager, DataServiceListener {

Expand All @@ -56,6 +61,8 @@ public class SparkplugCloudEndpoint
private Set<CloudDeliveryListener> cloudDeliveryListeners = new HashSet<>();
private String kuraServicePid;
private SeqCounter seqCounter = new SeqCounter();
private SubscriptionsMap subscriptions = new SubscriptionsMap();
private ExecutorService executorService = Executors.newCachedThreadPool();

/*
* Activation APIs
Expand Down Expand Up @@ -100,6 +107,8 @@ public void deactivate() {
logger.info("{} - Error disconnecting", this.kuraServicePid, e);
}

this.executorService.shutdownNow();

logger.info("{} - Deactivated", this.kuraServicePid);
}

Expand Down Expand Up @@ -155,14 +164,22 @@ private String publishInternal(String topic, byte[] payload, int qos, boolean re
@Override
public void registerSubscriber(Map<String, Object> subscriptionProperties,
CloudSubscriberListener cloudSubscriberListener) {
// TODO Auto-generated method stub
String topicFilter = (String) subscriptionProperties.get(SparkplugSubscriber.KEY_TOPIC_FILTER);
int qos = (int) subscriptionProperties.get(SparkplugSubscriber.KEY_QOS);

this.subscriptions.add(topicFilter, qos, cloudSubscriberListener);
subscribeIfConnected(topicFilter, qos);

logger.info("{} - Added subscription for {}", this.kuraServicePid,
cloudSubscriberListener.getClass().getSimpleName());
}

@Override
public void unregisterSubscriber(CloudSubscriberListener cloudSubscriberListener) {
// TODO Auto-generated method stub
this.subscriptions.remove(cloudSubscriberListener).forEach(this::unsubscribeIfConnected);

logger.info("{} - Removed subscription for {}", this.kuraServicePid,
cloudSubscriberListener.getClass().getSimpleName());
}

@Override
Expand Down Expand Up @@ -220,12 +237,14 @@ public void unregisterCloudConnectionListener(CloudConnectionListener cloudConne
public void onConnectionEstablished() {
logger.debug("{} - Connection estabilished", this.kuraServicePid);

this.cloudConnectionListeners.forEach(listener -> callSafely(listener::onConnectionEstablished));
this.cloudConnectionListeners
.forEach(listener -> InvocationUtils.callSafely(listener::onConnectionEstablished));
postConnectionChangeEvent(true);

this.seqCounter = new SeqCounter();

// TO DO: init subscriptions
this.subscriptions.getSubscriptionRecords()
.forEach(subscription -> subscribeIfConnected(subscription.getTopicFilter(), subscription.getQos()));
}

@Override
Expand All @@ -236,21 +255,31 @@ public void onDisconnecting() {
@Override
public void onDisconnected() {
logger.debug("{} - Disconnected", this.kuraServicePid);
this.cloudConnectionListeners.forEach(listener -> callSafely(listener::onDisconnected));
this.cloudConnectionListeners.forEach(listener -> InvocationUtils.callSafely(listener::onDisconnected));
postConnectionChangeEvent(false);
}

@Override
public void onConnectionLost(Throwable cause) {
logger.debug("{} - Connection lost", this.kuraServicePid);
this.cloudConnectionListeners.forEach(listener -> callSafely(listener::onConnectionLost));
this.cloudConnectionListeners.forEach(listener -> InvocationUtils.callSafely(listener::onConnectionLost));
postConnectionChangeEvent(false);
}

@Override
public void onMessageArrived(String topic, byte[] payload, int qos, boolean retained) {
logger.debug("{} - Message arrived, forwarding to registered subscribers", this.kuraServicePid);
// TODO
logger.debug("{} - Message arrived on topic {}, forwarding to registered subscribers", this.kuraServicePid,
topic);

for (CloudSubscriberListener listener : this.subscriptions.getMatchingListeners(topic, qos)) {
try {
KuraMessage message = new KuraMessage(SparkplugPayloads.getKuraPayload(payload));

this.executorService.execute(() -> InvocationUtils.callSafely(listener::onMessageArrived, message));
} catch (InvalidProtocolBufferException e) {
logger.error("{} - Error parsing received SparkplugPayload to KuraPayload", this.kuraServicePid, e);
}
}
}

@Override
Expand All @@ -261,8 +290,8 @@ public void onMessagePublished(int messageId, String topic) {
@Override
public void onMessageConfirmed(int messageId, String topic) {
logger.debug("{} - Message with ID {} confirmed", this.kuraServicePid, messageId);
this.cloudDeliveryListeners
.forEach(listener -> callSafely(listener::onMessageConfirmed, String.valueOf(messageId)));
this.cloudDeliveryListeners.forEach(
listener -> InvocationUtils.callSafely(listener::onMessageConfirmed, String.valueOf(messageId)));
}

/*
Expand All @@ -281,19 +310,23 @@ private void postConnectionChangeEvent(final boolean isConnected) {
this.eventAdmin.postEvent(event);
}

private void callSafely(Runnable f) {
private synchronized void subscribeIfConnected(String topicFilter, int qos) {
try {
f.run();
} catch (Exception e) {
logger.warn("{} - An error occured in listener {}", this.kuraServicePid, f.getClass().getName(), e);
if (isConnected()) {
this.dataService.subscribe(topicFilter, qos);
}
} catch (KuraException e) {
logger.error("{} - Error subscribing to topic " + topicFilter + " with QoS " + qos, this.kuraServicePid, e);
}
}

private <T> void callSafely(Consumer<T> f, T argument) {
private synchronized void unsubscribeIfConnected(String topic) {
try {
f.accept(argument);
} catch (Exception e) {
logger.error("{} - An error occured in listener {}", this.kuraServicePid, f.getClass().getName(), e);
if (isConnected()) {
this.dataService.unsubscribe(topic);
}
} catch (KuraException e) {
logger.error("{} - Error unsubscribing from topic {}", this.kuraServicePid, topic);
}
}

Expand Down
Loading

0 comments on commit 1c36691

Please sign in to comment.