Skip to content

Commit

Permalink
Add continuous subscription to subscribe method
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Mar 6, 2024
1 parent 7d9d335 commit 6a95751
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 23 deletions.
242 changes: 223 additions & 19 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
use std::sync::Mutex;
use std::time::{Duration, SystemTime};

use crate::query::{CompiledQuery, ExecutionInput};
use crate::types::ExecutionInputImplData;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub enum Field {
MetadataUnit,
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct Database {
next_id: AtomicI32,
path_to_id: HashMap<String, i32>,
Expand All @@ -101,6 +102,7 @@ pub struct Database {
pub struct Subscriptions {
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
continuous_subscriptions: Arc<Mutex<Vec<ContinuousSubscription>>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -158,6 +160,14 @@ pub struct ChangeSubscription {
permissions: Permissions,
}

#[derive(Debug, Clone)]
pub struct ContinuousSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: mpsc::Sender<EntryUpdates>,
permissions: Permissions,
database: Arc<RwLock<Database>>,
}

#[derive(Debug)]
pub struct NotificationError {}

Expand Down Expand Up @@ -605,6 +615,23 @@ impl Subscriptions {
self.change_subscriptions.push(subscription)
}

pub fn add_continuous_subscription(&mut self, subscription: ContinuousSubscription) {
let local_subscription = subscription.clone();
self.continuous_subscriptions
.lock()
.unwrap()
.push(subscription);

tokio::spawn(async move {
// Asynchronous code to be executed in the new task
while !local_subscription.sender.is_closed() {
let _ = local_subscription.notify(None).await;
// Simulate some asynchronous work
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

pub async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -648,6 +675,7 @@ impl Subscriptions {
pub fn clear(&mut self) {
self.query_subscriptions.clear();
self.change_subscriptions.clear();
self.continuous_subscriptions.lock().unwrap().clear();
}

pub fn cleanup(&mut self) {
Expand All @@ -667,6 +695,14 @@ impl Subscriptions {
true
}
});
self.continuous_subscriptions.lock().unwrap().retain(|sub| {
if sub.sender.is_closed() {
info!("Subscriber gone: removing continuous subscription");
false
} else {
true
}
});
}
}

Expand Down Expand Up @@ -921,6 +957,117 @@ impl QuerySubscription {
}
}

impl ContinuousSubscription {
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
) -> Result<(), NotificationError> {
let db = self.database.read().await;
let db_read = db.authorized_read_access(&self.permissions);
match changed {
Some(changed) => {
let mut matches = false;
for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
matches = true;
break;
}
}
}
if matches {
// notify
let notifications = {
let mut notifications = EntryUpdates::default();

for (id, changed_fields) in changed {
if let Some(fields) = self.entries.get(id) {
if !fields.is_disjoint(changed_fields) {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if changed_fields.contains(&Field::Datapoint)
&& fields.contains(&Field::Datapoint)
{
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
}
if changed_fields.contains(&Field::ActuatorTarget)
&& fields.contains(&Field::ActuatorTarget)
{
update.actuator_target =
Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
}
notifications.updates.push(ChangeNotification {
update,
fields: notify_fields,
});
}
Err(_) => {
debug!("notify: could not find entry with id {}", id)
}
}
}
}
}
notifications
};
if notifications.updates.is_empty() {
Ok(())
} else {
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
}
} else {
Ok(())
}
}
None => {
let notifications = {
let mut notifications = EntryUpdates::default();

for (id, fields) in &self.entries {
match db_read.get_entry_by_id(*id) {
Ok(entry) => {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
// TODO: Perhaps make path optional
update.path = Some(entry.metadata.path.clone());
if fields.contains(&Field::Datapoint) {
update.datapoint = Some(entry.datapoint.clone());
notify_fields.insert(Field::Datapoint);
}
if fields.contains(&Field::ActuatorTarget) {
update.actuator_target = Some(entry.actuator_target.clone());
notify_fields.insert(Field::ActuatorTarget);
}
notifications.updates.push(ChangeNotification {
update,
fields: notify_fields,
});
}
Err(_) => {
debug!("notify: could not find entry with id {}", id)
}
}
}
notifications
};
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
}
}
}
}
}

pub struct DatabaseReadAccess<'a, 'b> {
db: &'a Database,
permissions: &'b Permissions,
Expand Down Expand Up @@ -1445,32 +1592,78 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
valid_entries: HashMap<i32, (HashSet<Field>, types::ChangeType)>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}

let mut entries_on_changed: HashMap<i32, HashSet<Field>> = HashMap::new();
let mut entries_continuous: HashMap<i32, HashSet<Field>> = HashMap::new();

for (id, (fields, change_type)) in valid_entries {
match change_type {
types::ChangeType::OnChange => {
entries_on_changed
.entry(id)
.and_modify(|existing_fields| existing_fields.extend(fields.clone()))
.or_insert(fields.clone());
}
types::ChangeType::Continuous => {
entries_continuous
.entry(id)
.and_modify(|existing_fields| existing_fields.extend(fields.clone()))
.or_insert(fields.clone());
}
types::ChangeType::Static => {}
}
}

let (sender, receiver) = mpsc::channel(10);
let subscription = ChangeSubscription {
entries: valid_entries,
sender,
permissions: self.permissions.clone(),
};
if !entries_on_changed.is_empty() {
let subscription = ChangeSubscription {
entries: entries_on_changed,
sender: sender.clone(),
permissions: self.permissions.clone(),
};

{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);
if !entries_continuous.is_empty() {
let subscription_continuous = ContinuousSubscription {
entries: entries_continuous,
sender,
permissions: self.permissions.clone(),
database: Arc::clone(&self.broker.database),
};

{
// Send everything subscribed to in an initial notification
//let db = self.broker.database.read().await;
if subscription_continuous.notify(None).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous);
}

let stream = ReceiverStream::new(receiver);
Ok(stream)
Expand Down Expand Up @@ -2952,8 +3145,19 @@ mod tests {
.await
.expect("Register datapoint should succeed");

let my_hashmap: HashMap<i32, (HashSet<Field>, types::ChangeType)> = [(
id1,
(
HashSet::from([Field::Datapoint]),
types::ChangeType::OnChange,
),
)]
.iter()
.cloned()
.collect();

let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(my_hashmap)
.await
.expect("subscription should succeed");

Expand Down
8 changes: 5 additions & 3 deletions kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::broker::ReadError;
use crate::broker::SubscriptionError;
use crate::glob;
use crate::permissions::Permissions;
use crate::types;

#[tonic::async_trait]
impl proto::val_server::Val for broker::DataBroker {
Expand Down Expand Up @@ -409,7 +410,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

let mut entries: HashMap<i32, HashSet<broker::Field>> = HashMap::new();
let mut entries: HashMap<i32, (HashSet<broker::Field>, types::ChangeType)> = HashMap::new();

if !valid_requests.is_empty() {
for (path, (regex, fields)) in valid_requests {
Expand All @@ -423,9 +424,10 @@ impl proto::val_server::Val for broker::DataBroker {
entries
.entry(entry.metadata().id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone());
existing_fields.0.extend(fields.clone());
existing_fields.1 = entry.metadata().change_type.clone();
})
.or_insert(fields.clone());
.or_insert((fields.clone(), entry.metadata().change_type.clone()));

match entry.datapoint() {
Ok(_) => {}
Expand Down
11 changes: 10 additions & 1 deletion kuksa_databroker/databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
};

use super::{conversions, types::*};
pub use crate::types::ChangeType;

#[tonic::async_trait]
pub(crate) trait Viss: Send + Sync + 'static {
Expand Down Expand Up @@ -253,7 +254,15 @@ impl Viss for Server {
let Some(entries) = broker
.get_id_by_path(request.path.as_ref())
.await
.map(|id| HashMap::from([(id, HashSet::from([broker::Field::Datapoint]))]))
.map(|id| {
HashMap::from([(
id,
(
HashSet::from([broker::Field::Datapoint]),
ChangeType::Static,
),
)])
})
else {
return Err(SubscribeErrorResponse {
request_id,
Expand Down

0 comments on commit 6a95751

Please sign in to comment.