diff --git a/node/src/tests/test-DirectTransport.ts b/node/src/tests/test-DirectTransport.ts index 60034008fd..eadb37a0e8 100644 --- a/node/src/tests/test-DirectTransport.ts +++ b/node/src/tests/test-DirectTransport.ts @@ -385,11 +385,13 @@ test('dataProducer.send() with subchannels succeeds', async () => for (const message of receivedMessages1) { expect([ 'both', 'dc1' ].includes(message)).toBe(true); + expect([ 'dc2' ].includes(message)).toBe(false); } for (const message of receivedMessages2) { expect([ 'both', 'dc2' ].includes(message)).toBe(true); + expect([ 'dc1' ].includes(message)).toBe(false); } }, 5000); diff --git a/rust/benches/direct_data.rs b/rust/benches/direct_data.rs index 8406a7acd3..0cea4d8fe0 100644 --- a/rust/benches/direct_data.rs +++ b/rust/benches/direct_data.rs @@ -19,7 +19,7 @@ async fn create_data_producer_consumer_pair( .produce_data(DataProducerOptions::new_direct()) .await?; let data_consumer = direct_transport - .consume_data(DataConsumerOptions::new_direct(data_producer.id())) + .consume_data(DataConsumerOptions::new_direct(data_producer.id(), None)) .await?; Ok((data_producer, data_consumer)) @@ -51,7 +51,8 @@ pub fn criterion_benchmark(c: &mut Criterion) { let _ = sender.send(()); }); - let _ = direct_data_producer.send(WebRtcMessage::Binary(Cow::from(data))); + let _ = + direct_data_producer.send(WebRtcMessage::Binary(Cow::from(data)), None, None); let _ = receiver.recv(); }) diff --git a/rust/src/fbs.rs b/rust/src/fbs.rs index b923e88d4f..e883f180e0 100644 --- a/rust/src/fbs.rs +++ b/rust/src/fbs.rs @@ -26331,16 +26331,7 @@ mod root { )] pub struct SetSubchannelsRequest { /// The field `subchannels` in the table `SetSubchannelsRequest` - pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec>, - } - - #[allow(clippy::derivable_impls)] - impl ::core::default::Default for SetSubchannelsRequest { - fn default() -> Self { - Self { - subchannels: ::core::default::Default::default(), - } - } + pub subchannels: ::planus::alloc::vec::Vec, } impl SetSubchannelsRequest { @@ -26353,23 +26344,17 @@ mod root { #[allow(clippy::too_many_arguments)] pub fn create( builder: &mut ::planus::Builder, - field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>, + field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>, ) -> ::planus::Offset { let prepared_subchannels = field_subchannels.prepare(builder); let mut table_writer: ::planus::table_writer::TableWriter<6> = ::core::default::Default::default(); - if prepared_subchannels.is_some() { - table_writer.write_entry::<::planus::Offset<[u16]>>(0); - } + table_writer.write_entry::<::planus::Offset<[u16]>>(0); unsafe { table_writer.finish(builder, |object_writer| { - if let ::core::option::Option::Some(prepared_subchannels) = - prepared_subchannels - { - object_writer.write::<_, _, 4>(&prepared_subchannels); - } + object_writer.write::<_, _, 4>(&prepared_subchannels); }); } builder.current_offset() @@ -26424,17 +26409,10 @@ mod root { #[allow(clippy::type_complexity)] pub fn subchannels(self, value: T0) -> SetSubchannelsRequestBuilder<(T0,)> where - T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>, + T0: ::planus::WriteAs<::planus::Offset<[u16]>>, { SetSubchannelsRequestBuilder((value,)) } - - /// Sets the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels) to null. - #[inline] - #[allow(clippy::type_complexity)] - pub fn subchannels_as_null(self) -> SetSubchannelsRequestBuilder<((),)> { - self.subchannels(()) - } } impl SetSubchannelsRequestBuilder<(T0,)> { @@ -26451,7 +26429,7 @@ mod root { } } - impl>> + impl>> ::planus::WriteAs<::planus::Offset> for SetSubchannelsRequestBuilder<(T0,)> { @@ -26466,7 +26444,7 @@ mod root { } } - impl>> + impl>> ::planus::WriteAsOptional<::planus::Offset> for SetSubchannelsRequestBuilder<(T0,)> { @@ -26482,7 +26460,7 @@ mod root { } } - impl>> + impl>> ::planus::WriteAsOffset for SetSubchannelsRequestBuilder<(T0,)> { @@ -26503,22 +26481,16 @@ mod root { impl<'a> SetSubchannelsRequestRef<'a> { /// Getter for the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels). #[inline] - pub fn subchannels( - &self, - ) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>> - { - self.0.access(0, "SetSubchannelsRequest", "subchannels") + pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> { + self.0 + .access_required(0, "SetSubchannelsRequest", "subchannels") } } impl<'a> ::core::fmt::Debug for SetSubchannelsRequestRef<'a> { fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { let mut f = f.debug_struct("SetSubchannelsRequestRef"); - if let ::core::option::Option::Some(field_subchannels) = - self.subchannels().transpose() - { - f.field("subchannels", &field_subchannels); - } + f.field("subchannels", &self.subchannels()); f.finish() } } @@ -26529,13 +26501,7 @@ mod root { #[allow(unreachable_code)] fn try_from(value: SetSubchannelsRequestRef<'a>) -> ::planus::Result { ::core::result::Result::Ok(Self { - subchannels: if let ::core::option::Option::Some(subchannels) = - value.subchannels()? - { - ::core::option::Option::Some(subchannels.to_vec()?) - } else { - ::core::option::Option::None - }, + subchannels: value.subchannels()?.to_vec()?, }) } } @@ -26631,16 +26597,7 @@ mod root { )] pub struct SetSubchannelsResponse { /// The field `subchannels` in the table `SetSubchannelsResponse` - pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec>, - } - - #[allow(clippy::derivable_impls)] - impl ::core::default::Default for SetSubchannelsResponse { - fn default() -> Self { - Self { - subchannels: ::core::default::Default::default(), - } - } + pub subchannels: ::planus::alloc::vec::Vec, } impl SetSubchannelsResponse { @@ -26653,23 +26610,17 @@ mod root { #[allow(clippy::too_many_arguments)] pub fn create( builder: &mut ::planus::Builder, - field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>, + field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>, ) -> ::planus::Offset { let prepared_subchannels = field_subchannels.prepare(builder); let mut table_writer: ::planus::table_writer::TableWriter<6> = ::core::default::Default::default(); - if prepared_subchannels.is_some() { - table_writer.write_entry::<::planus::Offset<[u16]>>(0); - } + table_writer.write_entry::<::planus::Offset<[u16]>>(0); unsafe { table_writer.finish(builder, |object_writer| { - if let ::core::option::Option::Some(prepared_subchannels) = - prepared_subchannels - { - object_writer.write::<_, _, 4>(&prepared_subchannels); - } + object_writer.write::<_, _, 4>(&prepared_subchannels); }); } builder.current_offset() @@ -26726,17 +26677,10 @@ mod root { #[allow(clippy::type_complexity)] pub fn subchannels(self, value: T0) -> SetSubchannelsResponseBuilder<(T0,)> where - T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>, + T0: ::planus::WriteAs<::planus::Offset<[u16]>>, { SetSubchannelsResponseBuilder((value,)) } - - /// Sets the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels) to null. - #[inline] - #[allow(clippy::type_complexity)] - pub fn subchannels_as_null(self) -> SetSubchannelsResponseBuilder<((),)> { - self.subchannels(()) - } } impl SetSubchannelsResponseBuilder<(T0,)> { @@ -26753,7 +26697,7 @@ mod root { } } - impl>> + impl>> ::planus::WriteAs<::planus::Offset> for SetSubchannelsResponseBuilder<(T0,)> { @@ -26768,7 +26712,7 @@ mod root { } } - impl>> + impl>> ::planus::WriteAsOptional<::planus::Offset> for SetSubchannelsResponseBuilder<(T0,)> { @@ -26784,7 +26728,7 @@ mod root { } } - impl>> + impl>> ::planus::WriteAsOffset for SetSubchannelsResponseBuilder<(T0,)> { @@ -26805,22 +26749,16 @@ mod root { impl<'a> SetSubchannelsResponseRef<'a> { /// Getter for the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels). #[inline] - pub fn subchannels( - &self, - ) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>> - { - self.0.access(0, "SetSubchannelsResponse", "subchannels") + pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> { + self.0 + .access_required(0, "SetSubchannelsResponse", "subchannels") } } impl<'a> ::core::fmt::Debug for SetSubchannelsResponseRef<'a> { fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { let mut f = f.debug_struct("SetSubchannelsResponseRef"); - if let ::core::option::Option::Some(field_subchannels) = - self.subchannels().transpose() - { - f.field("subchannels", &field_subchannels); - } + f.field("subchannels", &self.subchannels()); f.finish() } } @@ -26831,13 +26769,7 @@ mod root { #[allow(unreachable_code)] fn try_from(value: SetSubchannelsResponseRef<'a>) -> ::planus::Result { ::core::result::Result::Ok(Self { - subchannels: if let ::core::option::Option::Some(subchannels) = - value.subchannels()? - { - ::core::option::Option::Some(subchannels.to_vec()?) - } else { - ::core::option::Option::None - }, + subchannels: value.subchannels()?.to_vec()?, }) } } diff --git a/rust/src/messages.rs b/rust/src/messages.rs index 327a82eb84..86ad756298 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -1909,6 +1909,7 @@ pub(crate) struct TransportConsumeDataRequest { pub(crate) label: String, pub(crate) protocol: String, pub(crate) paused: bool, + pub(crate) subchannels: Option>, } #[derive(Debug)] @@ -1919,6 +1920,7 @@ pub(crate) struct TransportConsumeDataResponse { pub(crate) protocol: String, pub(crate) paused: bool, pub(crate) data_producer_paused: bool, + pub(crate) subchannels: Vec, } impl Request for TransportConsumeDataRequest { @@ -1949,8 +1951,7 @@ impl Request for TransportConsumeDataRequest { Some(self.protocol) }, self.paused, - // TODO. - None::>, + self.subchannels, ); let request_body = request::Body::create_transport_consume_data_request(&mut builder, data); let request = request::Request::create( @@ -1985,6 +1986,7 @@ impl Request for TransportConsumeDataRequest { protocol: data.protocol.to_string(), paused: data.paused, data_producer_paused: data.data_producer_paused, + subchannels: data.subchannels, }) } } @@ -2830,6 +2832,8 @@ impl Request for DataProducerResumeRequest { pub(crate) struct DataProducerSendNotification { pub(crate) ppid: u32, pub(crate) payload: Vec, + pub(crate) subchannels: Option>, + pub(crate) required_subchannel: Option, } impl Notification for DataProducerSendNotification { @@ -2839,17 +2843,14 @@ impl Notification for DataProducerSendNotification { fn into_bytes(self, handler_id: Self::HandlerId) -> Vec { let mut builder = Builder::new(); - // TODO: Implement subchannels. - let subchannels: Vec = vec![]; - let required_subchannel: Option = Default::default(); let binary_data = data_producer::Binary::create(&mut builder, self.payload); let binary = data_producer::Data::create_binary(&mut builder, binary_data); let data = data_producer::SendNotification::create( &mut builder, self.ppid, binary, - subchannels, - required_subchannel, + self.subchannels, + self.required_subchannel, ); let notification_body = notification::Body::create_data_producer_send_notification(&mut builder, data); @@ -3172,6 +3173,54 @@ impl From for u32 { } } +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerSetSubchannelsRequest { + pub(crate) subchannels: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct DataConsumerSetSubchannelsResponse { + pub(crate) subchannels: Vec, +} + +impl Request for DataConsumerSetSubchannelsRequest { + const METHOD: request::Method = request::Method::DataconsumerSetSubchannels; + type HandlerId = DataConsumerId; + type Response = DataConsumerSetSubchannelsResponse; + + fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { + let mut builder = Builder::new(); + + let data = data_consumer::SetSubchannelsRequest::create(&mut builder, self.subchannels); + let request_body = + request::Body::create_data_consumer_set_subchannels_request(&mut builder, data); + + let request = request::Request::create( + &mut builder, + id, + Self::METHOD, + handler_id.to_string(), + Some(request_body), + ); + let message_body = message::Body::create_request(&mut builder, request); + let message = message::Message::create(&mut builder, message::Type::Request, message_body); + + builder.finish(message, None).to_vec() + } + + fn convert_response( + response: Option, + ) -> Result> { + let Some(response::Body::DataConsumerSetSubchannelsResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + Ok(DataConsumerSetSubchannelsResponse { + subchannels: data.subchannels, + }) + } +} + #[derive(Debug)] pub(crate) struct RtpObserverCloseRequest { pub(crate) rtp_observer_id: RtpObserverId, diff --git a/rust/src/router.rs b/rust/src/router.rs index 4c1925e603..c2d09a0228 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -658,22 +658,6 @@ impl Router { } }; - /* - let data = self - .inner - .channel - .request( - self.inner.id, - RouterCreateWebRtcTransportRequest { - data: RouterCreateWebrtcTransportData::from_options( - transport_id, - &webrtc_transport_options, - ), - }, - ) - .await?; - */ - let transport = WebRtcTransport::new( transport_id, Arc::clone(&self.inner.executor), diff --git a/rust/src/router/data_consumer.rs b/rust/src/router/data_consumer.rs index 401eec52da..7d8b2bc828 100644 --- a/rust/src/router/data_consumer.rs +++ b/rust/src/router/data_consumer.rs @@ -8,6 +8,7 @@ use crate::messages::{ DataConsumerCloseRequest, DataConsumerDumpRequest, DataConsumerGetBufferedAmountRequest, DataConsumerGetStatsRequest, DataConsumerPauseRequest, DataConsumerResumeRequest, DataConsumerSendRequest, DataConsumerSetBufferedAmountLowThresholdRequest, + DataConsumerSetSubchannelsRequest, }; use crate::sctp_parameters::SctpStreamParameters; use crate::transport::Transport; @@ -56,6 +57,10 @@ pub struct DataConsumerOptions { pub(super) max_retransmits: Option, /// Whether the DataConsumer must start in paused mode. Default false. pub paused: bool, + /// Subchannels this DataConsumer initially subscribes to. + /// Only used in case this DataConsumer receives messages from a local DataProducer + /// that specifies subchannel(s) when calling send(). + pub subchannels: Option>, /// Custom application data. pub app_data: AppData, } @@ -70,6 +75,7 @@ impl DataConsumerOptions { ordered: None, max_packet_life_time: None, max_retransmits: None, + subchannels: None, paused: false, app_data: AppData::default(), } @@ -77,13 +83,14 @@ impl DataConsumerOptions { /// For [`DirectTransport`](crate::direct_transport::DirectTransport). #[must_use] - pub fn new_direct(data_producer_id: DataProducerId) -> Self { + pub fn new_direct(data_producer_id: DataProducerId, subchannels: Option>) -> Self { Self { data_producer_id, ordered: Some(true), max_packet_life_time: None, max_retransmits: None, paused: false, + subchannels, app_data: AppData::default(), } } @@ -97,6 +104,7 @@ impl DataConsumerOptions { max_packet_life_time: None, max_retransmits: None, paused: false, + subchannels: None, app_data: AppData::default(), } } @@ -114,6 +122,7 @@ impl DataConsumerOptions { max_packet_life_time: Some(max_packet_life_time), max_retransmits: None, paused: false, + subchannels: None, app_data: AppData::default(), } } @@ -130,6 +139,7 @@ impl DataConsumerOptions { max_packet_life_time: None, max_retransmits: Some(max_retransmits), paused: false, + subchannels: None, app_data: AppData::default(), } } @@ -148,6 +158,7 @@ pub struct DataConsumerDump { pub sctp_stream_parameters: Option, pub buffered_amount_low_threshold: u32, pub paused: bool, + pub subchannels: Vec, pub data_producer_paused: bool, } @@ -168,6 +179,7 @@ impl DataConsumerDump { .map(|parameters| SctpStreamParameters::from_fbs(*parameters)), buffered_amount_low_threshold: dump.buffered_amount_low_threshold, paused: dump.paused, + subchannels: dump.subchannels, data_producer_paused: dump.data_producer_paused, }) } @@ -298,6 +310,7 @@ struct Inner { data_producer_id: DataProducerId, direct: bool, paused: Arc>, + subchannels: Arc>>, data_producer_paused: Arc>, executor: Arc>, channel: Channel, @@ -450,6 +463,7 @@ impl DataConsumer { executor: Arc>, channel: Channel, data_producer_paused: bool, + subchannels: Vec, app_data: AppData, transport: Arc, direct: bool, @@ -461,6 +475,7 @@ impl DataConsumer { let paused = Arc::new(Mutex::new(paused)); #[allow(clippy::mutex_atomic)] let data_producer_paused = Arc::new(Mutex::new(data_producer_paused)); + let subchannels = Arc::new(Mutex::new(subchannels)); let inner_weak = Arc::>>>::default(); let subscription_handler = { @@ -567,6 +582,7 @@ impl DataConsumer { executor, channel, handlers, + subchannels, app_data, transport, weak_data_producer: data_producer.downgrade(), @@ -638,6 +654,12 @@ impl DataConsumer { &self.inner().protocol } + /// The data consumer subchannels. + #[must_use] + pub fn subchannels(&self) -> Vec { + self.inner().subchannels.lock().clone() + } + /// Custom application data. #[must_use] pub fn app_data(&self) -> &AppData { @@ -897,6 +919,22 @@ impl DirectDataConsumer { ) .await } + + /// Sets subchannels to the worker DataConsumer. + pub async fn set_subchannels(&self, subchannels: Vec) -> Result<(), RequestError> { + let response = self + .inner + .channel + .request( + self.inner.id, + DataConsumerSetSubchannelsRequest { subchannels }, + ) + .await?; + + *self.inner.subchannels.lock() = response.subchannels; + + Ok(()) + } } /// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent diff --git a/rust/src/router/data_producer.rs b/rust/src/router/data_producer.rs index 409b372400..baef69427e 100644 --- a/rust/src/router/data_producer.rs +++ b/rust/src/router/data_producer.rs @@ -543,7 +543,12 @@ impl DataProducer { impl DirectDataProducer { /// Sends direct messages from the Rust to the worker. - pub fn send(&self, message: WebRtcMessage<'_>) -> Result<(), NotificationError> { + pub fn send( + &self, + message: WebRtcMessage<'_>, + subchannels: Option>, + required_subchannel: Option, + ) -> Result<(), NotificationError> { let (ppid, _payload) = message.into_ppid_and_payload(); self.inner.channel.notify( @@ -551,6 +556,8 @@ impl DirectDataProducer { DataProducerSendNotification { ppid, payload: _payload.into_owned(), + subchannels, + required_subchannel, }, ) } diff --git a/rust/src/router/transport.rs b/rust/src/router/transport.rs index 7e4a35b0e0..cbab16ca27 100644 --- a/rust/src/router/transport.rs +++ b/rust/src/router/transport.rs @@ -765,6 +765,7 @@ pub(super) trait TransportImpl: TransportGeneric { max_packet_life_time, max_retransmits, paused, + subchannels, app_data, } = data_consumer_options; @@ -829,6 +830,7 @@ pub(super) trait TransportImpl: TransportGeneric { sctp_stream_parameters, label: data_producer.label().clone(), protocol: data_producer.protocol().clone(), + subchannels, paused, }, ) @@ -846,6 +848,7 @@ pub(super) trait TransportImpl: TransportGeneric { Arc::clone(self.executor()), self.channel().clone(), response.data_producer_paused, + response.subchannels, app_data, Arc::new(self.clone()), transport_type == TransportType::Direct, diff --git a/rust/tests/integration/data_consumer.rs b/rust/tests/integration/data_consumer.rs index 30b3688279..3d0894fcc7 100644 --- a/rust/tests/integration/data_consumer.rs +++ b/rust/tests/integration/data_consumer.rs @@ -129,6 +129,7 @@ fn consume_data_succeeds() { 4000, ); + options.subchannels = Some(vec![0, 1, 1, 1, 2, 65535, 100]); options.app_data = AppData::new(CustomAppData { baz: "LOL" }); options @@ -152,6 +153,11 @@ fn consume_data_succeeds() { } assert_eq!(data_consumer.label().as_str(), "foo"); assert_eq!(data_consumer.protocol().as_str(), "bar"); + + let mut sorted_subchannels = data_consumer.subchannels(); + sorted_subchannels.sort(); + + assert_eq!(sorted_subchannels, [0, 1, 2, 100, 65535]); assert_eq!( data_consumer .app_data() @@ -339,7 +345,7 @@ fn consume_data_on_direct_transport_succeeds() { let data_consumer = transport3 .consume_data({ - let mut options = DataConsumerOptions::new_direct(data_producer.id()); + let mut options = DataConsumerOptions::new_direct(data_producer.id(), None); options.app_data = AppData::new(CustomAppData2 { hehe: "HEHE" }); @@ -386,7 +392,7 @@ fn dump_on_direct_transport_succeeds() { let data_consumer = transport3 .consume_data({ - let mut options = DataConsumerOptions::new_direct(data_producer.id()); + let mut options = DataConsumerOptions::new_direct(data_producer.id(), None); options.app_data = AppData::new(CustomAppData2 { hehe: "HEHE" }); @@ -421,7 +427,7 @@ fn get_stats_on_direct_transport_succeeds() { let data_consumer = transport3 .consume_data({ - let mut options = DataConsumerOptions::new_direct(data_producer.id()); + let mut options = DataConsumerOptions::new_direct(data_producer.id(), None); options.app_data = AppData::new(CustomAppData2 { hehe: "HEHE" }); diff --git a/rust/tests/integration/direct_transport.rs b/rust/tests/integration/direct_transport.rs index 290136f241..380f3e1b3e 100644 --- a/rust/tests/integration/direct_transport.rs +++ b/rust/tests/integration/direct_transport.rs @@ -175,7 +175,7 @@ fn send_succeeds() { .expect("Failed to produce data"); let data_consumer = transport - .consume_data(DataConsumerOptions::new_direct(data_producer.id())) + .consume_data(DataConsumerOptions::new_direct(data_producer.id(), None)) .await .expect("Failed to consume data"); @@ -286,7 +286,7 @@ fn send_succeeds() { }; direct_data_producer - .send(message) + .send(message, None, None) .expect("Failed to send message"); if id == num_messages { @@ -342,6 +342,212 @@ fn send_succeeds() { }); } +#[test] +fn send_with_subchannels_succeeds() { + future::block_on(async move { + let (_worker, _router, transport) = init().await; + + let data_producer = transport + .produce_data({ + let mut options = DataProducerOptions::new_direct(); + + options.label = "foo".to_string(); + options.protocol = "bar".to_string(); + options.app_data = AppData::new(CustomAppData { foo: "bar" }); + + options + }) + .await + .expect("Failed to produce data"); + + let data_consumer_1 = transport + .consume_data(DataConsumerOptions::new_direct( + data_producer.id(), + Some(vec![1, 11, 666]), + )) + .await + .expect("Failed to consume data"); + + let data_consumer_2 = transport + .consume_data(DataConsumerOptions::new_direct( + data_producer.id(), + Some(vec![2, 22, 666]), + )) + .await + .expect("Failed to consume data"); + + let expected_received_num_messages_1 = 7; + let expected_received_num_messages_2 = 5; + let received_messages_1: Arc>> = Arc::new(Mutex::new(vec![])); + let received_messages_2: Arc>> = Arc::new(Mutex::new(vec![])); + + let (received_messages_tx_1, received_messages_rx_1) = async_oneshot::oneshot::<()>(); + let _handler_1 = data_consumer_1.on_message({ + let received_messages_tx_1 = Mutex::new(Some(received_messages_tx_1)); + let received_messages_1 = Arc::clone(&received_messages_1); + + move |message| { + let text: String = match message { + WebRtcMessage::String(string) => string.parse().unwrap(), + _ => { + panic!("Unexpected empty messages!"); + } + }; + + received_messages_1.lock().push(text); + + if received_messages_1.lock().len() == expected_received_num_messages_1 { + let _ = received_messages_tx_1.lock().take().unwrap().send(()); + } + } + }); + + let (received_messages_tx_2, received_messages_rx_2) = async_oneshot::oneshot::<()>(); + let _handler_2 = data_consumer_2.on_message({ + let received_messages_tx_2 = Mutex::new(Some(received_messages_tx_2)); + let received_messages_2 = Arc::clone(&received_messages_2); + + move |message| { + let text: String = match message { + WebRtcMessage::String(string) => string.parse().unwrap(), + _ => { + panic!("Unexpected empty messages!"); + } + }; + + received_messages_2.lock().push(text); + + if received_messages_2.lock().len() == expected_received_num_messages_2 { + let _ = received_messages_tx_2.lock().take().unwrap().send(()); + } + } + }); + + let direct_data_producer = match &data_producer { + DataProducer::Direct(direct_data_producer) => direct_data_producer, + _ => { + panic!("Expected direct data producer") + } + }; + + let direct_data_consumer_2 = match &data_consumer_2 { + DataConsumer::Direct(direct_data_consumer) => direct_data_consumer, + _ => { + panic!("Expected direct data consumer") + } + }; + + let both: &'static str = "both"; + let none: &'static str = "none"; + let dc1: &'static str = "dc1"; + let dc2: &'static str = "dc2"; + + // Must be received by dataConsumer1 and dataConsumer2. + direct_data_producer + .send(WebRtcMessage::String(both.to_string()), None, None) + .expect("Failed to send message"); + + // Must be received by dataConsumer1 and dataConsumer2. + direct_data_producer + .send( + WebRtcMessage::String(both.to_string()), + Some(vec![1, 2]), + None, + ) + .expect("Failed to send message"); + + // Must be received by dataConsumer1 and dataConsumer2. + direct_data_producer + .send( + WebRtcMessage::String(both.to_string()), + Some(vec![11, 22, 33]), + Some(666), + ) + .expect("Failed to send message"); + + // Must not be received by neither dataConsumer1 nor dataConsumer2. + direct_data_producer + .send( + WebRtcMessage::String(none.to_string()), + Some(vec![3]), + Some(666), + ) + .expect("Failed to send message"); + + // Must not be received by neither dataConsumer1 nor dataConsumer2. + direct_data_producer + .send( + WebRtcMessage::String(none.to_string()), + Some(vec![666]), + Some(3), + ) + .expect("Failed to send message"); + + // Must be received by dataConsumer1. + direct_data_producer + .send(WebRtcMessage::String(dc1.to_string()), Some(vec![1]), None) + .expect("Failed to send message"); + + // Must be received by dataConsumer1. + direct_data_producer + .send(WebRtcMessage::String(dc1.to_string()), Some(vec![11]), None) + .expect("Failed to send message"); + + // Must be received by dataConsumer1. + direct_data_producer + .send( + WebRtcMessage::String(dc1.to_string()), + Some(vec![666]), + Some(11), + ) + .expect("Failed to send message"); + + // Must be received by dataConsumer2. + direct_data_producer + .send( + WebRtcMessage::String(dc2.to_string()), + Some(vec![666]), + Some(2), + ) + .expect("Failed to send message"); + + let mut subchannels = data_consumer_2.subchannels(); + subchannels.push(1); + + direct_data_consumer_2 + .set_subchannels(subchannels) + .await + .expect("Failed to set subchannels"); + + // Must be received by dataConsumer2. + direct_data_producer + .send( + WebRtcMessage::String(both.to_string()), + Some(vec![1]), + Some(666), + ) + .expect("Failed to send message"); + + received_messages_rx_1 + .await + .expect("Failed tor receive all messages"); + + received_messages_rx_2 + .await + .expect("Failed tor receive all messages"); + + let received_messages_1 = received_messages_1.lock().clone(); + assert!(received_messages_1.contains(&both.to_string())); + assert!(received_messages_1.contains(&dc1.to_string())); + assert!(!received_messages_1.contains(&dc2.to_string())); + + let received_messages_2 = received_messages_2.lock().clone(); + assert!(received_messages_2.contains(&both.to_string())); + assert!(!received_messages_2.contains(&dc1.to_string())); + assert!(received_messages_2.contains(&dc2.to_string())); + }); +} + #[test] fn close_event() { future::block_on(async move { diff --git a/worker/.clang-tidy b/worker/.clang-tidy index f3ade0aca9..ec3efa87e8 100644 --- a/worker/.clang-tidy +++ b/worker/.clang-tidy @@ -2,6 +2,7 @@ Checks: "*,\ -altera*,\ -boost-use-to-string,\ + -bugprone-easily-swappable-parameters,\ -cert-*,\ -clang-analyzer-optin.osx.*,\ -clang-analyzer-osx.*,\ diff --git a/worker/fbs/dataConsumer.fbs b/worker/fbs/dataConsumer.fbs index 8adcfdc70f..a458b2cdd3 100644 --- a/worker/fbs/dataConsumer.fbs +++ b/worker/fbs/dataConsumer.fbs @@ -53,11 +53,11 @@ table SendRequest { } table SetSubchannelsRequest { - subchannels: [uint16]; + subchannels: [uint16] (required); } table SetSubchannelsResponse { - subchannels: [uint16]; + subchannels: [uint16] (required); } // Notifications from Worker.