Skip to content

Commit

Permalink
Rust: properly store incoming notifications
Browse files Browse the repository at this point in the history
Store the whole binary message and retrieve the notification reference before
calling the handler.
  • Loading branch information
jmillan committed Oct 10, 2023
1 parent 096ee1e commit 59854e9
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
1 change: 0 additions & 1 deletion rust/src/router/consumer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ async fn init() -> (Router, WebRtcTransport, WebRtcTransport) {
}

#[test]
#[ignore]
fn producer_close_event() {
future::block_on(async move {
let (_router, transport_1, transport_2) = init().await;
Expand Down
1 change: 0 additions & 1 deletion rust/src/router/data_consumer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ async fn init() -> (Router, DataProducer) {
}

#[test]
#[ignore]
fn data_producer_close_event() {
future::block_on(async move {
let (router, data_producer) = init().await;
Expand Down
2 changes: 0 additions & 2 deletions rust/src/router/pipe_transport/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ async fn init() -> (Router, Router, WebRtcTransport, WebRtcTransport) {
}

#[test]
#[ignore]
fn producer_close_is_transmitted_to_pipe_consumer() {
future::block_on(async move {
let (router1, router2, transport1, transport2) = init().await;
Expand Down Expand Up @@ -162,7 +161,6 @@ fn producer_close_is_transmitted_to_pipe_consumer() {
}

#[test]
#[ignore]
fn data_producer_close_is_transmitted_to_pipe_data_consumer() {
future::block_on(async move {
let (router1, router2, transport1, transport2) = init().await;
Expand Down
28 changes: 20 additions & 8 deletions rust/src/worker/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use thiserror::Error;
use uuid::Uuid;

#[derive(Debug, Deserialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -66,10 +67,17 @@ impl Drop for BufferMessagesGuard {
let mut buffered_notifications_for = self.buffered_notifications_for.lock();
if let Some(notifications) = buffered_notifications_for.remove(&self.target_id) {
if let Some(event_handlers) = self.event_handlers_weak.upgrade() {
for notification in notifications {
let notification =
notification::NotificationRef::read_as_root(&notification).unwrap();
event_handlers.call_callbacks_with_single_value(&self.target_id, notification);
for bytes in notifications {
let message_ref = message::MessageRef::read_as_root(&bytes).unwrap();

let message::BodyRef::Notification(notification_ref) =
message_ref.data().unwrap()
else {
panic!("Wrong notification stored: {message_ref:?}");
};

event_handlers
.call_callbacks_with_single_value(&self.target_id, notification_ref);
}
}
}
Expand Down Expand Up @@ -233,16 +241,20 @@ impl Channel {

match deserialize_message(message) {
ChannelReceiveMessage::Notification(notification) => {
let target_id: SubscriptionTarget = SubscriptionTarget::String(
notification.handler_id().unwrap().to_string(),
);
let target_id = notification.handler_id().unwrap();
// Target id can be either the worker PID or a UUID.
let target_id = match target_id.parse::<u64>() {
Ok(_) => SubscriptionTarget::String(target_id.to_string()),
Err(_) => SubscriptionTarget::Uuid(Uuid::parse_str(target_id).unwrap()),
};

if !non_buffered_notifications.contains(&target_id) {
let mut buffer_notifications_for = buffered_notifications_for.lock();
// Check if we need to buffer notifications for this
// target_id
if let Some(list) = buffer_notifications_for.get_mut(&target_id) {
list.push(Vec::from(message));
// Store the whole message removing the size prefix.
list.push(Vec::from(&message[4..]));
return;
}

Expand Down
1 change: 0 additions & 1 deletion rust/tests/integration/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,6 @@ fn set_unset_priority_succeeds() {
}

#[test]
#[ignore]
fn producer_pause_resume_events() {
future::block_on(async move {
let (_executor_guard, _worker, _router, transport_1, transport_2) = init().await;
Expand Down

0 comments on commit 59854e9

Please sign in to comment.