From 960e0574468802d5c4db1dfa19731e9a1326e8b1 Mon Sep 17 00:00:00 2001 From: ruz4fe Date: Wed, 6 Mar 2024 15:48:36 +0100 Subject: [PATCH] Continuous subscription at a specific frequency --- kuksa_databroker/databroker/src/broker.rs | 22 +++++++++++++++---- .../databroker/src/grpc/kuksa_val_v1/val.rs | 2 +- .../databroker/src/viss/v2/server.rs | 2 +- kuksa_databroker/lib/kuksa/src/lib.rs | 15 ++++++++++--- proto/kuksa/val/v1/val.proto | 3 ++- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index 4d69ef61d..7505010cc 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -615,7 +615,11 @@ impl Subscriptions { self.change_subscriptions.push(subscription) } - pub fn add_continuous_subscription(&mut self, subscription: ContinuousSubscription) { + pub fn add_continuous_subscription( + &mut self, + subscription: ContinuousSubscription, + frequency: u64, + ) { let local_subscription = subscription.clone(); self.continuous_subscriptions .lock() @@ -627,7 +631,7 @@ impl Subscriptions { while !local_subscription.sender.is_closed() { let _ = local_subscription.notify(None).await; // Simulate some asynchronous work - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await; } }); } @@ -1593,6 +1597,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, valid_entries: HashMap>, + frequency: Option, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); @@ -1636,6 +1641,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let (sender, receiver) = mpsc::channel(10); if !entries_on_changed.is_empty() { + if frequency.is_some() && entries_continuous.is_empty() { + return Err(SubscriptionError::InvalidInput); + } let subscription = ChangeSubscription { entries: entries_on_changed, sender: sender.clone(), @@ -1658,6 +1666,9 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { } if !entries_continuous.is_empty() { + if frequency.is_none() { + return Err(SubscriptionError::InvalidInput); + } let subscription_continuous = ContinuousSubscription { entries: entries_continuous, sender, @@ -1677,7 +1688,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .subscriptions .write() .await - .add_continuous_subscription(subscription_continuous); + .add_continuous_subscription(subscription_continuous, frequency.unwrap()); } let stream = ReceiverStream::new(receiver); @@ -3161,7 +3172,10 @@ mod tests { .expect("Register datapoint should succeed"); let mut stream = broker - .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) + .subscribe( + HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), + None, + ) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 72a8fa87a..680eb0b14 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -445,7 +445,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - match broker.subscribe(entries).await { + match broker.subscribe(entries, request.frequency_hertz).await { Ok(stream) => { let stream = convert_to_proto_stream(stream); Ok(tonic::Response::new(Box::pin(stream))) diff --git a/kuksa_databroker/databroker/src/viss/v2/server.rs b/kuksa_databroker/databroker/src/viss/v2/server.rs index 01f96fc3a..eda4e910a 100644 --- a/kuksa_databroker/databroker/src/viss/v2/server.rs +++ b/kuksa_databroker/databroker/src/viss/v2/server.rs @@ -262,7 +262,7 @@ impl Viss for Server { }); }; - match broker.subscribe(entries).await { + match broker.subscribe(entries, None).await { Ok(stream) => { let subscription_id = SubscriptionId::new(); diff --git a/kuksa_databroker/lib/kuksa/src/lib.rs b/kuksa_databroker/lib/kuksa/src/lib.rs index a2beec628..f02db1847 100644 --- a/kuksa_databroker/lib/kuksa/src/lib.rs +++ b/kuksa_databroker/lib/kuksa/src/lib.rs @@ -288,7 +288,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), @@ -321,7 +324,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), @@ -346,7 +352,10 @@ impl KuksaClient { }) } - let req = proto::v1::SubscribeRequest { entries }; + let req = proto::v1::SubscribeRequest { + entries, + frequency_hertz: None, + }; match client.subscribe(req).await { Ok(response) => Ok(response.into_inner()), diff --git a/proto/kuksa/val/v1/val.proto b/proto/kuksa/val/v1/val.proto index 3059d8098..184526521 100644 --- a/proto/kuksa/val/v1/val.proto +++ b/proto/kuksa/val/v1/val.proto @@ -98,6 +98,7 @@ message SubscribeEntry { // Subscribe to changes in datapoints. message SubscribeRequest { repeated SubscribeEntry entries = 1; + optional uint64 frequency_hertz = 2; } // A subscription response @@ -112,4 +113,4 @@ message GetServerInfoRequest { message GetServerInfoResponse { string name = 1; string version = 2; -} \ No newline at end of file +}