Skip to content

Commit

Permalink
Keep original subscribe signature
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Mar 6, 2024
1 parent 6a95751 commit 10592bb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 41 deletions.
56 changes: 30 additions & 26 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

pub async fn subscribe(
&self,
valid_entries: HashMap<i32, (HashSet<Field>, types::ChangeType)>,
valid_entries: HashMap<i32, HashSet<Field>>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
Expand All @@ -1601,21 +1601,36 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let mut entries_on_changed: HashMap<i32, HashSet<Field>> = HashMap::new();
let mut entries_continuous: HashMap<i32, HashSet<Field>> = HashMap::new();

for (id, (fields, change_type)) in valid_entries {
match change_type {
types::ChangeType::OnChange => {
entries_on_changed
.entry(id)
.and_modify(|existing_fields| existing_fields.extend(fields.clone()))
.or_insert(fields.clone());
let db_read = self.broker.database.read().await;
let db_read_access = db_read.authorized_read_access(self.permissions);

for (id, fields) in valid_entries {
match db_read_access.get_entry_by_id(id) {
Ok(entry) => {
let change_type = entry.metadata.change_type.clone();
match change_type {
types::ChangeType::OnChange => {
entries_on_changed
.entry(id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone())
})
.or_insert(fields.clone());
}
types::ChangeType::Continuous => {
entries_continuous
.entry(id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone())
})
.or_insert(fields.clone());
}
types::ChangeType::Static => {}
}
}
types::ChangeType::Continuous => {
entries_continuous
.entry(id)
.and_modify(|existing_fields| existing_fields.extend(fields.clone()))
.or_insert(fields.clone());
Err(_) => {
debug!("notify: could not find entry with id {}", id)
}
types::ChangeType::Static => {}
}
}

Expand Down Expand Up @@ -3145,19 +3160,8 @@ mod tests {
.await
.expect("Register datapoint should succeed");

let my_hashmap: HashMap<i32, (HashSet<Field>, types::ChangeType)> = [(
id1,
(
HashSet::from([Field::Datapoint]),
types::ChangeType::OnChange,
),
)]
.iter()
.cloned()
.collect();

let mut stream = broker
.subscribe(my_hashmap)
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.await
.expect("subscription should succeed");

Expand Down
8 changes: 3 additions & 5 deletions kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::broker::ReadError;
use crate::broker::SubscriptionError;
use crate::glob;
use crate::permissions::Permissions;
use crate::types;

#[tonic::async_trait]
impl proto::val_server::Val for broker::DataBroker {
Expand Down Expand Up @@ -410,7 +409,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

let mut entries: HashMap<i32, (HashSet<broker::Field>, types::ChangeType)> = HashMap::new();
let mut entries: HashMap<i32, HashSet<broker::Field>> = HashMap::new();

if !valid_requests.is_empty() {
for (path, (regex, fields)) in valid_requests {
Expand All @@ -424,10 +423,9 @@ impl proto::val_server::Val for broker::DataBroker {
entries
.entry(entry.metadata().id)
.and_modify(|existing_fields| {
existing_fields.0.extend(fields.clone());
existing_fields.1 = entry.metadata().change_type.clone();
existing_fields.extend(fields.clone());
})
.or_insert((fields.clone(), entry.metadata().change_type.clone()));
.or_insert(fields.clone());

match entry.datapoint() {
Ok(_) => {}
Expand Down
11 changes: 1 addition & 10 deletions kuksa_databroker/databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
};

use super::{conversions, types::*};
pub use crate::types::ChangeType;

#[tonic::async_trait]
pub(crate) trait Viss: Send + Sync + 'static {
Expand Down Expand Up @@ -254,15 +253,7 @@ impl Viss for Server {
let Some(entries) = broker
.get_id_by_path(request.path.as_ref())
.await
.map(|id| {
HashMap::from([(
id,
(
HashSet::from([broker::Field::Datapoint]),
ChangeType::Static,
),
)])
})
.map(|id| HashMap::from([(id, HashSet::from([broker::Field::Datapoint]))]))
else {
return Err(SubscribeErrorResponse {
request_id,
Expand Down

0 comments on commit 10592bb

Please sign in to comment.