Skip to content

Cloud Connectors Implementation Guidelines

Marcello Rinaldo Martina edited this page Jan 24, 2024 · 2 revisions

Cloud Connection APIs are easy to implement wrongly. The APIs hide some subtleties and assumptions that this document will try to highlight.

Overview of services

A Cloud Connection is generally composed of 3 distinct services:

  • Cloud Endpoint; which is the “entrypoint” that the rest of the framework uses to publish messages, listen to connection events, or subcribe to topics. This is also the service to which CloudSubscribers and CloudPublishers attach to.

  • Data Service; the Cloud Endpoint forwards its requests to the Data Service layer, that implements the following APIs: DataService API . This layer is responsible of persisting the messages onto a database, resend the unpublished ones, manage the unacknowledged messages, and so on. Because of its complexity, in the Cloud Connection Factories we usually use the Data Service implementation that is already present in Kura: DataService Implementation .

  • Data Transport Service; the Data Service, in turn, forwards all the publish and subscribe requests to the Data Transport layer. This layer is responsible of implementing the actual call to the underlying MQTT library (Paho in Kura).

Usually, a CloudEndpoint implements the following APIs:

Subscriber considerations

A subscriber should register itself as a CloudConnectionListener and as a CloudSubscriberListener to the CloudEndpoint. To do so, it needs first to find the correct Cloud Endpoint implementation running on the framework. This is usually done with service trackers, as in the code here.

We need to be aware of some pitfalls:

  • The registration of the subscriber as listener to the CloudEndpoint is one-shot, hence it cannot fail. The methods registerSubscriber and registerCloudConnectionListener in the implementation of the endpoint should not throw any exceptions and catch any possible ones (even RuntimeExceptions) when calling the subscribe method on the underlying Data Service, as done here. The reason behind this is that if those calls fail, we will have an unregistered subscriber which is not connected to any Cloud Endpoint.

  • Same as above applies to the unregister implementation.

Managing subscribers in endpoint

  • The Cloud Endpoint should allow multiple CloudSubscriberListeners to register on the same topic and QoS, or on same topic but different QoS. Hence, it is not sufficient to implement a simple Map<String, CloudSubscriberListener> to maintain topic-listener mappings. A more suitable data structure to use as a key for this mapping can be something similar to CloudSubscriptionRecord (use it as a reference to create your own, this example actually doesn't account for the QoS as being part of the key). The mapping then becomes:

    Map<CloudSubscriptionRecord, Set<CloudSubscriberListener>>

    In this way, more subscribers can register on the same topic-QoS combination, and different topic-QoS combinations will be a different entry in the mapping.

  • When registering a new subscriber, we should avoid to call DataService.subscribe(topic, qos) if we already called that method before on that particular topic-QoS combination. This should be easy to enforce with the mapping above. This test should pass:

    CloudEndpoint endpoint = new CloudEndpointImpl();
    DataService dataService = mock(DataService.class);
    endpoint.setDataService(dataService);
    
    Map<String, Object> p1 = new HashMap<>();
    p1.put("topic.id", "test.topic");
    p1.put("qos", 1);
    CloudConnectionListener l1 = mock(CloudConnectionListener.class);
    endpoint.registerSubscriber(p1, l1);
    verify(dataService, times(1)).subscribe("test.topic", 1)
    
    Map<String, Object> p2 = new HashMap<>();
    p2.put("topic.id", "test.topic");
    p2.put("qos", 1);
    CloudConnectionListener l2 = mock(CloudConnectionListener.class);
    endpoint.registerSubscriber(p2, l2);
    verify(dataService, times(0)).subscribe("test.topic", 1)
    
    Map<String, Object> p3 = new HashMap<>();
    p3.put("topic.id", "test.topic");
    p3.put("qos", 0);
    CloudConnectionListener l3 = mock(CloudConnectionListener.class);
    endpoint.registerSubscriber(p3, l3);
    verify(dataService, times(1)).subscribe("test.topic", 0)
  • DataService.unsubscribe(String topic) should be called only where there are no more listeners for that topic in the mapping Map<CloudSubscriptionRecord, Set<CloudSubscriberListener>>. The following test should pass:

    CloudEndpoint endpoint = new CloudEndpointImpl();
    DataService dataService = mock(DataService.class);
    endpoint.setDataService(dataService);
    
    Map<String, Object> p1 = new HashMap<>();
    p1.put("topic.id", "test.topic");
    p1.put("qos", 1);
    CloudConnectionListener l1 = mock(CloudConnectionListener.class);
    endpoint.registerSubscriber(p1, l1);
    
    Map<String, Object> p2 = new HashMap<>();
    p2.put("topic.id", "test.topic");
    p2.put("qos", 1);
    CloudConnectionListener l2 = mock(CloudConnectionListener.class);
    endpoint.registerSubscriber(p2, l2);
    
    endpoint.unregisterSubscriber(l1);
    verify(dataService, times(0)).unsubscribe("test.topic");
    
    endpoint.unregisterSubscriber(l2);
    verify(dataService, times(1)).unsubscribe("test.topic");
  • In the onMessageArrived you should consider the following problem: in the registerSubscriber implementation you may receive a topic which contains wildcards, like test/a/b/#, but the topic argument of onMessageArrived may be the full topic, like test/a/b/hello. We need a way to match the wildcard. This can be solved leveraging MqttUtil.isMatched. Left as reader’s exercise :).

  • Subscribers need to be initialized only in the callback onConnectionEstabilished using the mapping Map<CloudSubscriptionRecord, Set<CloudSubscriberListener>>, and not in the updated OSGi hook or connect API.

Data Transport Service considerations

  • The DataTransportService can use the DataTransportListener.onConfigurationUpdating and DataTransportListener.onConfigurationUpdated callbacks in its update method to make the re-connection be managed by the data service implementation. Do not manage the reconnection in the update method AND use these callbacks at the same time.

  • The DataTransportToken returned by DataTransportService.publish should be null if the published message has QoS 0. It should return a valid token with a unique message ID otherwise.

  • In case a message is published with QoS > 1, we need to call the DataTransportListener.onMessageConfirmed callback after the DataTransportService.publish has returned. How to do it depends on the underlying library:

    • In case the library has a callback for acknowledged messages we can call DataTransportListener.onMessageConfirmed on such callback

    • Use a CompletableFuture

Clone this wiki locally