Skip to content

Commit

Permalink
Rust: Data[Producer|Consumer] subchannels (#1191)
Browse files Browse the repository at this point in the history
Rust: Data[Producer|Consumer] subchannels
  • Loading branch information
jmillan authored Oct 23, 2023
1 parent 6e1684c commit 860014c
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 128 deletions.
2 changes: 2 additions & 0 deletions node/src/tests/test-DirectTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,13 @@ test('dataProducer.send() with subchannels succeeds', async () =>
for (const message of receivedMessages1)
{
expect([ 'both', 'dc1' ].includes(message)).toBe(true);
expect([ 'dc2' ].includes(message)).toBe(false);
}

for (const message of receivedMessages2)
{
expect([ 'both', 'dc2' ].includes(message)).toBe(true);
expect([ 'dc1' ].includes(message)).toBe(false);
}
}, 5000);

Expand Down
5 changes: 3 additions & 2 deletions rust/benches/direct_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn create_data_producer_consumer_pair(
.produce_data(DataProducerOptions::new_direct())
.await?;
let data_consumer = direct_transport
.consume_data(DataConsumerOptions::new_direct(data_producer.id()))
.consume_data(DataConsumerOptions::new_direct(data_producer.id(), None))
.await?;

Ok((data_producer, data_consumer))
Expand Down Expand Up @@ -51,7 +51,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let _ = sender.send(());
});

let _ = direct_data_producer.send(WebRtcMessage::Binary(Cow::from(data)));
let _ =
direct_data_producer.send(WebRtcMessage::Binary(Cow::from(data)), None, None);

let _ = receiver.recv();
})
Expand Down
120 changes: 26 additions & 94 deletions rust/src/fbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26331,16 +26331,7 @@ mod root {
)]
pub struct SetSubchannelsRequest {
/// The field `subchannels` in the table `SetSubchannelsRequest`
pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec<u16>>,
}

#[allow(clippy::derivable_impls)]
impl ::core::default::Default for SetSubchannelsRequest {
fn default() -> Self {
Self {
subchannels: ::core::default::Default::default(),
}
}
pub subchannels: ::planus::alloc::vec::Vec<u16>,
}

impl SetSubchannelsRequest {
Expand All @@ -26353,23 +26344,17 @@ mod root {
#[allow(clippy::too_many_arguments)]
pub fn create(
builder: &mut ::planus::Builder,
field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>,
) -> ::planus::Offset<Self> {
let prepared_subchannels = field_subchannels.prepare(builder);

let mut table_writer: ::planus::table_writer::TableWriter<6> =
::core::default::Default::default();
if prepared_subchannels.is_some() {
table_writer.write_entry::<::planus::Offset<[u16]>>(0);
}
table_writer.write_entry::<::planus::Offset<[u16]>>(0);

unsafe {
table_writer.finish(builder, |object_writer| {
if let ::core::option::Option::Some(prepared_subchannels) =
prepared_subchannels
{
object_writer.write::<_, _, 4>(&prepared_subchannels);
}
object_writer.write::<_, _, 4>(&prepared_subchannels);
});
}
builder.current_offset()
Expand Down Expand Up @@ -26424,17 +26409,10 @@ mod root {
#[allow(clippy::type_complexity)]
pub fn subchannels<T0>(self, value: T0) -> SetSubchannelsRequestBuilder<(T0,)>
where
T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
T0: ::planus::WriteAs<::planus::Offset<[u16]>>,
{
SetSubchannelsRequestBuilder((value,))
}

/// Sets the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels) to null.
#[inline]
#[allow(clippy::type_complexity)]
pub fn subchannels_as_null(self) -> SetSubchannelsRequestBuilder<((),)> {
self.subchannels(())
}
}

impl<T0> SetSubchannelsRequestBuilder<(T0,)> {
Expand All @@ -26451,7 +26429,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAs<::planus::Offset<SetSubchannelsRequest>>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26466,7 +26444,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOptional<::planus::Offset<SetSubchannelsRequest>>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26482,7 +26460,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOffset<SetSubchannelsRequest>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26503,22 +26481,16 @@ mod root {
impl<'a> SetSubchannelsRequestRef<'a> {
/// Getter for the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels).
#[inline]
pub fn subchannels(
&self,
) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>>
{
self.0.access(0, "SetSubchannelsRequest", "subchannels")
pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> {
self.0
.access_required(0, "SetSubchannelsRequest", "subchannels")
}
}

impl<'a> ::core::fmt::Debug for SetSubchannelsRequestRef<'a> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
let mut f = f.debug_struct("SetSubchannelsRequestRef");
if let ::core::option::Option::Some(field_subchannels) =
self.subchannels().transpose()
{
f.field("subchannels", &field_subchannels);
}
f.field("subchannels", &self.subchannels());
f.finish()
}
}
Expand All @@ -26529,13 +26501,7 @@ mod root {
#[allow(unreachable_code)]
fn try_from(value: SetSubchannelsRequestRef<'a>) -> ::planus::Result<Self> {
::core::result::Result::Ok(Self {
subchannels: if let ::core::option::Option::Some(subchannels) =
value.subchannels()?
{
::core::option::Option::Some(subchannels.to_vec()?)
} else {
::core::option::Option::None
},
subchannels: value.subchannels()?.to_vec()?,
})
}
}
Expand Down Expand Up @@ -26631,16 +26597,7 @@ mod root {
)]
pub struct SetSubchannelsResponse {
/// The field `subchannels` in the table `SetSubchannelsResponse`
pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec<u16>>,
}

#[allow(clippy::derivable_impls)]
impl ::core::default::Default for SetSubchannelsResponse {
fn default() -> Self {
Self {
subchannels: ::core::default::Default::default(),
}
}
pub subchannels: ::planus::alloc::vec::Vec<u16>,
}

impl SetSubchannelsResponse {
Expand All @@ -26653,23 +26610,17 @@ mod root {
#[allow(clippy::too_many_arguments)]
pub fn create(
builder: &mut ::planus::Builder,
field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>,
) -> ::planus::Offset<Self> {
let prepared_subchannels = field_subchannels.prepare(builder);

let mut table_writer: ::planus::table_writer::TableWriter<6> =
::core::default::Default::default();
if prepared_subchannels.is_some() {
table_writer.write_entry::<::planus::Offset<[u16]>>(0);
}
table_writer.write_entry::<::planus::Offset<[u16]>>(0);

unsafe {
table_writer.finish(builder, |object_writer| {
if let ::core::option::Option::Some(prepared_subchannels) =
prepared_subchannels
{
object_writer.write::<_, _, 4>(&prepared_subchannels);
}
object_writer.write::<_, _, 4>(&prepared_subchannels);
});
}
builder.current_offset()
Expand Down Expand Up @@ -26726,17 +26677,10 @@ mod root {
#[allow(clippy::type_complexity)]
pub fn subchannels<T0>(self, value: T0) -> SetSubchannelsResponseBuilder<(T0,)>
where
T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
T0: ::planus::WriteAs<::planus::Offset<[u16]>>,
{
SetSubchannelsResponseBuilder((value,))
}

/// Sets the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels) to null.
#[inline]
#[allow(clippy::type_complexity)]
pub fn subchannels_as_null(self) -> SetSubchannelsResponseBuilder<((),)> {
self.subchannels(())
}
}

impl<T0> SetSubchannelsResponseBuilder<(T0,)> {
Expand All @@ -26753,7 +26697,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAs<::planus::Offset<SetSubchannelsResponse>>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26768,7 +26712,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOptional<::planus::Offset<SetSubchannelsResponse>>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26784,7 +26728,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOffset<SetSubchannelsResponse>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26805,22 +26749,16 @@ mod root {
impl<'a> SetSubchannelsResponseRef<'a> {
/// Getter for the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels).
#[inline]
pub fn subchannels(
&self,
) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>>
{
self.0.access(0, "SetSubchannelsResponse", "subchannels")
pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> {
self.0
.access_required(0, "SetSubchannelsResponse", "subchannels")
}
}

impl<'a> ::core::fmt::Debug for SetSubchannelsResponseRef<'a> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
let mut f = f.debug_struct("SetSubchannelsResponseRef");
if let ::core::option::Option::Some(field_subchannels) =
self.subchannels().transpose()
{
f.field("subchannels", &field_subchannels);
}
f.field("subchannels", &self.subchannels());
f.finish()
}
}
Expand All @@ -26831,13 +26769,7 @@ mod root {
#[allow(unreachable_code)]
fn try_from(value: SetSubchannelsResponseRef<'a>) -> ::planus::Result<Self> {
::core::result::Result::Ok(Self {
subchannels: if let ::core::option::Option::Some(subchannels) =
value.subchannels()?
{
::core::option::Option::Some(subchannels.to_vec()?)
} else {
::core::option::Option::None
},
subchannels: value.subchannels()?.to_vec()?,
})
}
}
Expand Down
Loading

0 comments on commit 860014c

Please sign in to comment.