-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add continuous subscription to subscribe method at a specific frequency #734
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,8 @@ use std::collections::{HashMap, HashSet}; | |
use std::convert::TryFrom; | ||
use std::sync::atomic::{AtomicI32, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::SystemTime; | ||
use std::sync::Mutex; | ||
use std::time::{Duration, SystemTime}; | ||
|
||
use crate::query::{CompiledQuery, ExecutionInput}; | ||
use crate::types::ExecutionInputImplData; | ||
|
@@ -90,7 +91,7 @@ pub enum Field { | |
MetadataUnit, | ||
} | ||
|
||
#[derive(Default)] | ||
#[derive(Default, Debug)] | ||
pub struct Database { | ||
next_id: AtomicI32, | ||
path_to_id: HashMap<String, i32>, | ||
|
@@ -101,6 +102,7 @@ pub struct Database { | |
pub struct Subscriptions { | ||
query_subscriptions: Vec<QuerySubscription>, | ||
change_subscriptions: Vec<ChangeSubscription>, | ||
continuous_subscriptions: Arc<Mutex<Vec<ContinuousSubscription>>>, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
|
@@ -158,6 +160,14 @@ pub struct ChangeSubscription { | |
permissions: Permissions, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct ContinuousSubscription { | ||
entries: HashMap<i32, HashSet<Field>>, | ||
sender: mpsc::Sender<EntryUpdates>, | ||
permissions: Permissions, | ||
database: Arc<RwLock<Database>>, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct NotificationError {} | ||
|
||
|
@@ -605,6 +615,27 @@ impl Subscriptions { | |
self.change_subscriptions.push(subscription) | ||
} | ||
|
||
pub fn add_continuous_subscription( | ||
&mut self, | ||
subscription: ContinuousSubscription, | ||
frequency: u64, | ||
) { | ||
let local_subscription = subscription.clone(); | ||
self.continuous_subscriptions | ||
.lock() | ||
.unwrap() | ||
.push(subscription); | ||
|
||
tokio::spawn(async move { | ||
// Asynchronous code to be executed in the new task | ||
while !local_subscription.sender.is_closed() { | ||
let _ = local_subscription.notify(None).await; | ||
// Simulate some asynchronous work | ||
tokio::time::sleep(Duration::from_millis(1 / frequency * 1000)).await; | ||
} | ||
}); | ||
} | ||
|
||
pub async fn notify( | ||
&self, | ||
changed: Option<&HashMap<i32, HashSet<Field>>>, | ||
|
@@ -648,6 +679,7 @@ impl Subscriptions { | |
pub fn clear(&mut self) { | ||
self.query_subscriptions.clear(); | ||
self.change_subscriptions.clear(); | ||
self.continuous_subscriptions.lock().unwrap().clear(); | ||
} | ||
|
||
pub fn cleanup(&mut self) { | ||
|
@@ -667,6 +699,14 @@ impl Subscriptions { | |
true | ||
} | ||
}); | ||
self.continuous_subscriptions.lock().unwrap().retain(|sub| { | ||
if sub.sender.is_closed() { | ||
info!("Subscriber gone: removing continuous subscription"); | ||
false | ||
} else { | ||
true | ||
} | ||
}); | ||
} | ||
} | ||
|
||
|
@@ -921,6 +961,117 @@ impl QuerySubscription { | |
} | ||
} | ||
|
||
impl ContinuousSubscription { | ||
async fn notify( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that this is very similar to the ChangeSubscription notify. Couldn't it be reused somehow? What I mean is just use different subscribe function but maybe one notify? If this is not doable let me know :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, let me see if I can use traits -> https://doc.rust-lang.org/book/ch10-02-traits.html |
||
&self, | ||
changed: Option<&HashMap<i32, HashSet<Field>>>, | ||
) -> Result<(), NotificationError> { | ||
let db = self.database.read().await; | ||
let db_read = db.authorized_read_access(&self.permissions); | ||
match changed { | ||
Some(changed) => { | ||
let mut matches = false; | ||
for (id, changed_fields) in changed { | ||
if let Some(fields) = self.entries.get(id) { | ||
if !fields.is_disjoint(changed_fields) { | ||
matches = true; | ||
break; | ||
} | ||
} | ||
} | ||
if matches { | ||
// notify | ||
let notifications = { | ||
let mut notifications = EntryUpdates::default(); | ||
|
||
for (id, changed_fields) in changed { | ||
if let Some(fields) = self.entries.get(id) { | ||
if !fields.is_disjoint(changed_fields) { | ||
match db_read.get_entry_by_id(*id) { | ||
Ok(entry) => { | ||
let mut update = EntryUpdate::default(); | ||
let mut notify_fields = HashSet::new(); | ||
// TODO: Perhaps make path optional | ||
update.path = Some(entry.metadata.path.clone()); | ||
if changed_fields.contains(&Field::Datapoint) | ||
&& fields.contains(&Field::Datapoint) | ||
{ | ||
update.datapoint = Some(entry.datapoint.clone()); | ||
notify_fields.insert(Field::Datapoint); | ||
} | ||
if changed_fields.contains(&Field::ActuatorTarget) | ||
&& fields.contains(&Field::ActuatorTarget) | ||
{ | ||
update.actuator_target = | ||
Some(entry.actuator_target.clone()); | ||
notify_fields.insert(Field::ActuatorTarget); | ||
} | ||
notifications.updates.push(ChangeNotification { | ||
update, | ||
fields: notify_fields, | ||
}); | ||
} | ||
Err(_) => { | ||
debug!("notify: could not find entry with id {}", id) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
notifications | ||
}; | ||
if notifications.updates.is_empty() { | ||
Ok(()) | ||
} else { | ||
match self.sender.send(notifications).await { | ||
Ok(()) => Ok(()), | ||
Err(_) => Err(NotificationError {}), | ||
} | ||
} | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
None => { | ||
let notifications = { | ||
let mut notifications = EntryUpdates::default(); | ||
|
||
for (id, fields) in &self.entries { | ||
match db_read.get_entry_by_id(*id) { | ||
Ok(entry) => { | ||
let mut update = EntryUpdate::default(); | ||
let mut notify_fields = HashSet::new(); | ||
// TODO: Perhaps make path optional | ||
update.path = Some(entry.metadata.path.clone()); | ||
if fields.contains(&Field::Datapoint) { | ||
update.datapoint = Some(entry.datapoint.clone()); | ||
notify_fields.insert(Field::Datapoint); | ||
} | ||
if fields.contains(&Field::ActuatorTarget) { | ||
update.actuator_target = Some(entry.actuator_target.clone()); | ||
notify_fields.insert(Field::ActuatorTarget); | ||
} | ||
notifications.updates.push(ChangeNotification { | ||
update, | ||
fields: notify_fields, | ||
}); | ||
} | ||
Err(_) => { | ||
debug!("notify: could not find entry with id {}", id) | ||
} | ||
} | ||
} | ||
notifications | ||
}; | ||
match self.sender.send(notifications).await { | ||
Ok(()) => Ok(()), | ||
Err(_) => Err(NotificationError {}), | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub struct DatabaseReadAccess<'a, 'b> { | ||
db: &'a Database, | ||
permissions: &'b Permissions, | ||
|
@@ -1446,31 +1597,99 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { | |
pub async fn subscribe( | ||
&self, | ||
valid_entries: HashMap<i32, HashSet<Field>>, | ||
frequency: Option<u64>, | ||
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> { | ||
if valid_entries.is_empty() { | ||
return Err(SubscriptionError::InvalidInput); | ||
} | ||
|
||
let mut entries_on_changed: HashMap<i32, HashSet<Field>> = HashMap::new(); | ||
let mut entries_continuous: HashMap<i32, HashSet<Field>> = HashMap::new(); | ||
|
||
let db_read = self.broker.database.read().await; | ||
let db_read_access = db_read.authorized_read_access(self.permissions); | ||
|
||
for (id, fields) in valid_entries { | ||
match db_read_access.get_entry_by_id(id) { | ||
Ok(entry) => { | ||
let change_type = entry.metadata.change_type.clone(); | ||
match change_type { | ||
types::ChangeType::OnChange => { | ||
entries_on_changed | ||
.entry(id) | ||
.and_modify(|existing_fields| { | ||
existing_fields.extend(fields.clone()) | ||
}) | ||
.or_insert(fields.clone()); | ||
} | ||
types::ChangeType::Continuous => { | ||
entries_continuous | ||
.entry(id) | ||
.and_modify(|existing_fields| { | ||
existing_fields.extend(fields.clone()) | ||
}) | ||
.or_insert(fields.clone()); | ||
} | ||
types::ChangeType::Static => {} | ||
} | ||
} | ||
Err(_) => { | ||
debug!("notify: could not find entry with id {}", id) | ||
} | ||
} | ||
} | ||
|
||
let (sender, receiver) = mpsc::channel(10); | ||
let subscription = ChangeSubscription { | ||
entries: valid_entries, | ||
sender, | ||
permissions: self.permissions.clone(), | ||
}; | ||
if !entries_on_changed.is_empty() { | ||
if frequency.is_some() && entries_continuous.is_empty() { | ||
return Err(SubscriptionError::InvalidInput); | ||
} | ||
let subscription = ChangeSubscription { | ||
entries: entries_on_changed, | ||
sender: sender.clone(), | ||
permissions: self.permissions.clone(), | ||
}; | ||
|
||
{ | ||
// Send everything subscribed to in an initial notification | ||
let db = self.broker.database.read().await; | ||
if subscription.notify(None, &db).await.is_err() { | ||
warn!("Failed to create initial notification"); | ||
{ | ||
// Send everything subscribed to in an initial notification | ||
let db = self.broker.database.read().await; | ||
if subscription.notify(None, &db).await.is_err() { | ||
warn!("Failed to create initial notification"); | ||
} | ||
} | ||
|
||
self.broker | ||
.subscriptions | ||
.write() | ||
.await | ||
.add_change_subscription(subscription); | ||
} | ||
|
||
self.broker | ||
.subscriptions | ||
.write() | ||
.await | ||
.add_change_subscription(subscription); | ||
if !entries_continuous.is_empty() { | ||
if frequency.is_none() { | ||
return Err(SubscriptionError::InvalidInput); | ||
} | ||
let subscription_continuous = ContinuousSubscription { | ||
entries: entries_continuous, | ||
sender, | ||
permissions: self.permissions.clone(), | ||
database: Arc::clone(&self.broker.database), | ||
}; | ||
|
||
{ | ||
// Send everything subscribed to in an initial notification | ||
//let db = self.broker.database.read().await; | ||
if subscription_continuous.notify(None).await.is_err() { | ||
warn!("Failed to create initial notification"); | ||
} | ||
} | ||
|
||
self.broker | ||
.subscriptions | ||
.write() | ||
.await | ||
.add_continuous_subscription(subscription_continuous, frequency.unwrap()); | ||
} | ||
|
||
let stream = ReceiverStream::new(receiver); | ||
Ok(stream) | ||
|
@@ -2953,7 +3172,10 @@ mod tests { | |
.expect("Register datapoint should succeed"); | ||
|
||
let mut stream = broker | ||
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) | ||
.subscribe( | ||
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), | ||
None, | ||
) | ||
.await | ||
.expect("subscription should succeed"); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does Rust manage type conversion?
from_millis
expects a u64. You use1
and1000
, will it really be a float value? Or do you rather need something like:Duration::from_millis((1000.0/ frequency).round() as u64)
(I actually do not know what resolution/performance we can expect from the system, like if we request 1000 Hz, will we really receive signals with 1000 Hz?
I have also no experience from Rust unit-tests, here a unit test would be a good way to prove that duration becomes as expected , like that 600 Hz gives 2 ms duration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be really interesting e.g. how fast is databroker able to notify subscribers. Does it change if we have 100 subscribers at the same time. What is the limit e.g. what Erik wrote. But I guess not that much related to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used your line code to fix it. I will also try to implement some unit tests to test it.
We could also open some performance report topic to measure its capabilities.