The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in IETF BCP14 (RFC2119 & RFC8174)
SPDX-FileCopyrightText: 2023 Contributors to the Eclipse Foundation See the NOTICE file(s) distributed with this work for additional information regarding copyright ownership. This program and the accompanying materials are made available under the terms of the Apache License Version 2.0 which is available at https://www.apache.org/licenses/LICENSE-2.0 SPDX-FileType: DOCUMENTATION SPDX-License-Identifier: Apache-2.0
The Publish&Subscribe ("pubsub") architecture pattern allows the decoupling of senders and receives of messages on the levels of code-dependency, sender-receiver relationship, deployment context, network topology and even temporal coincidence. There exist a large number of implementations of this pattern for all sorts of domains and requirements, both in open source and proprietary development models.
The purpose of uProtocol is to be the overarching service fabric/backbone of an automotive service landscape that extends from in-vehicle mechatronics devices and in-vehicle high-compute systems all the way up to cloud-based backend services and mobile-device companion applications. As such, uProtocol needs to be integrative towards any specific pubsub implementation that might be used in such a broad scenario (ex. MQTT, Eclipse Zenoh, DDS, SOME/IP, etc): uProtocol requires the facilities to implement a distributed publish-subscribe architecture.
A distributed publish-subscribe network requires the following capabilities to be useful:
-
unified addressing scheme
-
message forwarding between different pub-sub domains
-
discoverability of available publishers/publications
-
cross-system tracking of subscriptions, especially to/from remote systems (this specification)
-
option for local caching and temporally decoupled access to published data
The remainder of this document defines the use cases, interfaces, state machines and application flows to support cross-system tracking of subscriptions, especially to/from remote systems.
In a distributed pubsub network, there exist two subscription scenarios: local subscriptions and remote subscriptions.
In the local case a client subscribes to messages published by a uEntity on the same transport network, both parties using the same pubsub implementation. For instance, imagine an MQTT broker used by publishers and subscribers, all deployed within the same system context.
In contrast, a remote subscription scenario involves publishers and subscribers using different pubsub implementations and/or being located on different systems. For instance, a client interacting with a co-located MQTT broker to subscribe to messages that originate from a low-level mechatronics device, being published via SOME/IP.
To be workable, the remote subscription scenario requires a dispatcher (uStreamer
) which is able to forward published messages from one pubsub network to another. To do so intelligently, i.e. only forward traffic that actually is requested by a remote entity, some form of bookkeeping is needed to keep track of local and remote subscriptions, synchronize remote subscription requests with their bookkeeping counterpart on the remote system, and serve as information source for subscription states for the dispatcher entity.
The uSubscription
service component provides this bookkeeping capability.
A Publisher is a uEntity that sends messages to a topic, thus making the information available to other uEntities interested in the topic. Publishers typically use the Communication Layer API for this purpose.
A Publisher in a uProtocol network usually does not perform any additional actions beyond what is necessary to publish messages to the transport it is connected to.
A Subscriber is a uEntity that is interested in a particular topic and wants to receive messages that have been published to that topic by other uEntities. Subscribers typically use the Communication Layer API for this purpose.
In addition to performing the actions necessary to subscribe to and receive messages from the transport that they are using, Subscribers in a uProtocol network MUST invoke the Subscribe() and Unsubscribe() operations of their local uSubscription service instance in order to indicate which topics they subscribe to or unsubscribe from.
The uSubscription service Subscribe()
function MUST implement the corresponding protobuf definition in usubscription.proto.
When a client calls the Subscribe()
function, uSubscription service MUST persistenly (across service restarts) store the client as a subscriber of the provided topic.
When the subscription expiration time (defined in the expire
field of SubscriptionRequest.attributes
) is exceeded, uSubscription service MUST stop tracking the associated client-topic subscription.
When a subscription request does not specify an expiration time, uSubscription service MUST track the associated client-topic subscription indefinitely and persistently (across service restarts).
When a client calls the Subscribe()
function for a topic that it is already subscribed to, uSubscription service MUST return a response message containing the current subscription status for this client-topic combination (e.g. SUBSCRIBED
or SUBSCRIBE_PENDING
).
When a client calls the Subscribe()
function for a topic that it is already subscribed to where the value in the expire
field of SubscriptionRequest.attributes
exceeds the current expiration time, uSubscription service MUST update the expiration time of the subscription with the new value.
When a client makes the first call to Subscribe()
to a remote topic, i.e. a topic that is not published by a uEntity on the local host, uSubscription service MUST establish a remote subscription to that topic by sending a Subscribe()
request to the (remote) uSubscription service running on the host indicated by the remote topic’s authority.
uSubscription service MUST change the subscriber field to itself (core.usubscription
) when sending a Subscribe()
request to a remote uSubscription service.
When uSubscription service sends a Subscribe()
request to a remote uSubscription service, uSubscription service MUST set the subscription state for any client-topic combination involving the subscribed remote topic to SUBSCRIBE_PENDING
.
When uSubscription service receives a reply to a remote Subscribe()
request, uSubscription service MUST set the subscription state for any client-topic combination involving the subscribed remote topic to match the subscription status response of the remote uSubscription service (e.g. SUBSCRIBED
or UNSUBSCRIBED
).
When a client calls the Subscribe()
function for a remote topic that is in state UNSUBSCRIBE_PENDING
, uSubscription service MUST initiate the regular remote subscription process, i.e. send a subscription request to the remote uSubscription service and set status to SUBSCRIBE_PENDING
.
When a client calls the Subscribe()
function, uSubscription service MUST generate subscription change notifications reflecting any changes to the subscription state of the subscribed topic.
When receiving a Subscribe()
request that contains a topic that
* is not a valid uProtocol URI or
* contains a wildcard authority or
* contains a wildcard uEntity ID (ue_id
) or
* contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
The uSubscription service Unsubscribe()
function MUST implement the corresponding protobuf definition in usubscription.proto.
When a client calls the Unsubscribe()
function, uSubscription service MUST stop tracking the client as a subscriber to the topic provided with the function call.
When a client calls the Unsubscribe()
function for a topic that it has not subscribed to, uSubscription service MUST return a response message containing the subscription status UNSUBSCRIBED
.
When the last client subscribed to a remote topic calls the Unsubscribe()
function on that topic, uSubscription service MUST perform a remote unsubscribe on that topic by sending an Unsubscribe()
request to the remote uSubscription service.
uSubscription service MUST change the subscriber field to itself (core.usubscription
) when sending an Unsubscribe()
request to a remote uSubscription service.
When sending an Unsubscribe()
request to a remote uSubscription service, uSubscription service MUST consider the remote topic to be in state UNSUBSCRIBED
, regardless of the status returned from the remote uSubscription service.
When a client calls the Unsubscribe()
function for a remote topic that is in state SUBSCRIBE_PENDING
, uSubscription service MUST initiate the regular remote unsubscribe process, i.e. send an unsubscribe request to the remote uSubscription service and set status to UNSUBSCRIBED
.
When the last client subscribed to a topic calls the Unsubscribe()
function on that topic, uSubscription service MUST stop generating subscription change notifications for that topic.
When receiving a Unsubscribe()
request that contains a topic that
-
is not a valid uProtocol URI or
-
contains a wildcard authority or
-
contains a wildcard uEntity ID (
ue_id
) or -
contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
The uSubscription service FetchSubscribers()
function MUST implement the corresponding protobuf definition in usubscription.proto.
When a client calls the FetchSubscribers()
function, uSubscription service MUST return a list of subscribers that are currently subscribed to a given topic.
When receiving a FetchSubscribers()
request that contains a topic that
-
is not a valid uProtocol URI or
-
contains a wildcard authority or
-
contains a wildcard uEntity ID (
ue_id
) or -
contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
Subscriber entries returned by subsequent calls to the FetchSubscribers()
function MUST be in identical order.
If the list of subscribers returned in response to a FetchSubscribers()
call does not contain all existing subscription relationships, uSubscription service MUST set the has_more_records
field of the FetchSubscribersResponse
to true
.
When a client calls the FetchSubscribers()
function with an offset
argument, the list of subscribers returned by uSubscription service MUST omit all subscriber entries up to the provided offset.
The uSubscription service FetchSubscriptions()
function MUST implement the corresponding protobuf definition in usubscription.proto.
When a client calls the FetchSubscriptions()
function with a SubscriberInfo
argument, uSubscription service MUST return a list of topics that are currently subscribed to by the given subscriber.
When a client calls the FetchSubscriptions()
function with a topic UURI argument, uSubscription service MUST return a list of subscribers that are currently subscribed to the given topic.
When receiving a FetchSubscriptions()
request that contains a topic that
-
is not a valid uProtocol URI or
-
contains a wildcard authority or
-
contains a wildcard uEntity ID (
ue_id
) or -
contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
When receiving a FetchSubscriptions()
request that contains a subscriber URI that
* is not a valid uProtocol URI or
* contains a wildcard authority or
* contains a wildcard uEntity ID (ue_id
) or
* contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
If the list of subscriptions returned in response to a FetchSubscriptions()
call does not contain all existing subscription relationships, uSubscription service MUST set the has_more_records
field of the FetchSubscriptionsResponse
to true
.
Subscription entries returned by subsequent calls to the FetchSubscriptions()
function MUST be in identical order.
If the list of subscriptions returned in response to a FetchSubscriptions()
call does not contain all existing subscription relationships, uSubscription service MUST set the has_more_records
field of the FetchSubscriptionsResponse
to true
.
When a client calls the FetchSubscriptions()
function with an offset
argument, the list of subscriptions returned by uSubscription service MUST omit all subscription relationship entries up to the provided offset.
A topic identifies the message resource that a Subscriber wants to subscribe to. Topic are expressed in uProtocol URI format.
Topic URIs used in uSubscription APIs MUST contain a specific (non-wildcard) ue_version_major
.
Note
|
As the major version is part of the topic URI, a change in the major version requires Subscribers to (re-)subscribe to the updated topic URI in order to keep receiving messages for that topic. |
A subscription state defines the relationship between exactly one subscriber and one topic. In this section we elaborate on the states that a this relationship can take, based on uSubscription service API calls from uEntities.
The following diagram illustrates the subscriber-topic states, and the transitions between them.
stateDiagram-v2
state sub_local <<choice>>
state unsub_local <<choice>>
[*] --> sub_local: Subscribe(topic)
UNSUBSCRIBED --> sub_local: Subscribe(topic)
sub_local --> SUBSCRIBED: is local topic
sub_local --> SUBSCRIBE_PENDING: is remote topic
SUBSCRIBE_PENDING --> SUBSCRIBED: remote Subscribe(topic)
note left of SUBSCRIBE_PENDING
On first subscription request to remote topic only,
as long as remote subscription request has not
received positive confirmation.
end note
SUBSCRIBED --> unsub_local: Unsubscribe(topic)
unsub_local --> UNSUBSCRIBED: is local topic
unsub_local --> UNSUBSCRIBE_PENDING: is remote topic
UNSUBSCRIBE_PENDING --> UNSUBSCRIBED: remote Unsubscribe(topic)
note right of UNSUBSCRIBE_PENDING
On first unsubscribe request to remote topic only,
as long as remote unsubscribe request has not
received positive confirmation.
end note
uSubscription service MUST implement subscription state transisitions of client-topic subscription relationships according to the above state diagram.
Note
|
SUBSCRIBE_PENDING and UNSUBSCRIBE_PENDING states only apply to remote topic subscriptions, more details provided below.
|
State | Description | Entry | Action | Exit |
---|---|---|---|---|
|
Subscriber uE is not subscribed to the topic |
|
|
|
|
Subscriber uE is subscribed to the topic |
|
|
|
|
Subscription is pending, awaiting acknowledgement from the remote uSubscription service |
|
|
|
|
Unsubscribe is pending, awaiting acknowledgement from the remote uSubscription service |
|
|
|
Note
|
The Action column in the above table describes that action to be taken by a uSubscription service instance to effect a specific state transition. |
When the subscription state of a client-topic relationship changes, uSubscription service sends subscription change notification messages to the client, as well as to any uEntities that have explicitly registered to receive such messages.
Subscription change notifications MUST be of type Update.
Subscription change notifications MUST use topic SubscriptionChange
with resource id 0x8000
, as per the protobuf definition.
If a subscriber-topic relationship changes, uSubscription service MUST send a corresponding Update()
notification to the topic subscriber.
uEntities may register with uSusbcription service to be directly sent subscription change notifications when the subscription state of specific topics changes.
The uSubscription service RegisterForNotifications()
function MUST implement the corresponding protobuf definition in usubscription.proto.
uSubscription service MUST send subscription change notification messages to clients that have previously called RegisterForNotifications()
to opt-in to receive notifications for a specific topic specified in NotificationsRequest.topic
field.
When receiving a RegisterForNotifications()
request that contains a topic that
-
is not a valid uProtocol URI or
-
contains a wildcard authority or
-
contains a wildcard uEntity ID (
ue_id
) or -
contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
The uSubscription service UnregisterForNotifications()
function MUST implement the corresponding protobuf definition in usubscription.proto.
uSubscription service MUST stop sending subscription change notifications to clients afther they have opted-out of receiving subscription change notification by calling UnregisterForNotifications()
.
When receiving a UnregisterForNotifications()
request that contains a topic that
-
is not a valid uProtocol URI or
-
contains a wildcard authority or
-
contains a wildcard uEntity ID (
ue_id
) or -
contains a wildcard resource ID,
a uSubscription service MUST return a failure status message with UCode
INVALID_ARGUMENT
.
Subscribe (and unsubscribe) to remote topics are handled by RPC calls between uSubscription services running on the different devices. Given that devices are not always connected to each other, the onus is on uSubscription service to ensure that a command is received in time. Below are the common retry and timeout policies for USubscription service implementations to follow:
-
Remote Subscribe/Unsubscribe requests MUST implement a minimum timeout of 5 minutes.
-
Timed-out remote commands MUST be retried indefinitely until the business logic behind it no longer requires the command to be sent (e.g. because the last entity unsubscribed from a topic that is in state
SUBSCRIPTION_PENDING
).
This is a private API, to be used only between uSubscription services. Regular uEs can call Unsubscribe() to flush their own subscriptions.
The uSubscription service Reset()
function MUST implement the corresponding protobuf definition in usubscription.proto.
When the Reset()
function is called, the uSubscription service MUST flush all stored subscription information, including any persistently stored subscriptions.
When receiving a Reset()
call from a source that is not another uSubscription services (i.e. from source URIs where uEntity ID (ue_id
) does not equal 0x0), uSubscription service MUST return a failure status message with UCode
PERMISSION_DENIED
.
In the following section, we will elaborate on the various subscription flows for local and remote topics. When a consumer subscribes to a remote topic, it is the responsibility of the (local) uSubscription service to relay the subscription request to the remote uSubscription service as can be seen in the sequence diagrams below.
There are different types of messages passed between uEntities (Request/Response, Publish, Notify), this is how they are represented in the following sequence diagrams:
sequenceDiagram
participant App1
participant App2
rect rgb(245, 245, 245)
App1->>+App2: Request
App2-->>-App1: Response
end
rect rgb(230, 230, 230)
App1-)App2: Publish
end
rect rgb(215, 215, 215)
App1--)App2: Notify
end
Subscription flow will show how a subscriber can subscribe to a topic when uApp is on the same device (local subscriptions) or remote device (remote subscriptions).
sequenceDiagram
box White Device1
actor uApp
participant uSubscription
end
uApp->>+uSubscription: Subscribe(SubscriptionRequest)
alt success
uSubscription-->>uApp: SubscriptionResponse(SUBSCRIBED)
uSubscription--)uApp: Update(SUBSCRIBED)
else failure
uSubscription-->>-uApp: SubscriptionResponse(UNSUBSCRIBED)
end
sequenceDiagram
box White Device1
actor uApp
participant local uSubscription
end
box White Device2
participant remote uSubscription
participant uEntity
end
uEntity->>+remote uSubscription: RegisterForNotification()
uApp->>+local uSubscription: Subscribe(SubscriptionRequest)
alt first subscription
local uSubscription-->>uApp: SubscriptionResponse(SUBSCRIPTION_PENDING)
local uSubscription-->>remote uSubscription: Subscribe(SubscriptionRequest)
alt success
remote uSubscription-->>local uSubscription: SubscriptionResponse(SUBSCRIBED)
remote uSubscription--)uEntity: Update(SUBSCRIBED)
local uSubscription--)uApp: Update(SUBSCRIBED)
else failure
remote uSubscription-->>local uSubscription: SubscriptionResponse(UNSUBSCRIBED)
local uSubscription-->>uApp: Update(UNSUBSCRIBED)
end
else follow-on subscription
local uSubscription-->>uApp: SubscriptionResponse(SUBSCRIBED)
local uSubscription--)uApp: Update(SUBSCRIBED)
end
uEntity->>+remote uSubscription: UnregisterForNotification()
To allow the reverse flow (publication) to be properly multicast to local subscribers by the local disaptcher when it queries the local uSubscription for a list of local subscribers, remote subscriptions are always performed between uSubscription services using their own uEntity identifiers (core.usubscription
).
sequenceDiagram
box White Device1
actor uApp
participant uSubscription
end
uApp->>+uSubscription: Unsubscribe(UnsubscribeRequest)
uSubscription-->>uApp: Ok
uSubscription--)uApp: Update(UNSUBSCRIBED)
sequenceDiagram
box White Device1
actor uApp
participant local uSubscription
end
box White Device2
participant remote uSubscription
participant uEntity
end
uEntity->>+remote uSubscription: RegisterForNotification()
uApp->>+local uSubscription: Unsubscribe(UnsubscribeRequest)
alt success
local uSubscription-->>uApp: Ok
local uSubscription--)uApp: Update(UNSUBSCRIBED)
else failure
local uSubscription-->>uApp: Failure
end
opt last subscription
local uSubscription-->>remote uSubscription: Unsubscribe(UnsubscribeRequest)
alt success
remote uSubscription-->>local uSubscription: Ok
remote uSubscription--)uEntity: Update(UNSUBSCRIBED)
else failure
remote uSubscription-->>local uSubscription: Failure
end
end
uEntity->>+remote uSubscription: UnregisterForNotification()
To allow the reverse flow (publication) to be properly multicast to local subscribers by the local disaptcher when it queries the local uSubscription for a list of local subscribers, remote subscriptions are always performed between uSubscription services using their own uEntity identifiers (core.usubscription
).