Skip to content

Commit

Permalink
Rust: Data[Producer|Consumer] subchannels
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillan committed Oct 21, 2023
1 parent c889e1a commit 481b654
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 126 deletions.
2 changes: 2 additions & 0 deletions node/src/tests/test-DirectTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,13 @@ test('dataProducer.send() with subchannels succeeds', async () =>
for (const message of receivedMessages1)
{
expect([ 'both', 'dc1' ].includes(message)).toBe(true);
expect([ 'dc2' ].includes(message)).toBe(false);
}

for (const message of receivedMessages2)
{
expect([ 'both', 'dc2' ].includes(message)).toBe(true);
expect([ 'dc1' ].includes(message)).toBe(false);
}
}, 5000);

Expand Down
120 changes: 26 additions & 94 deletions rust/src/fbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26331,16 +26331,7 @@ mod root {
)]
pub struct SetSubchannelsRequest {
/// The field `subchannels` in the table `SetSubchannelsRequest`
pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec<u16>>,
}

#[allow(clippy::derivable_impls)]
impl ::core::default::Default for SetSubchannelsRequest {
fn default() -> Self {
Self {
subchannels: ::core::default::Default::default(),
}
}
pub subchannels: ::planus::alloc::vec::Vec<u16>,
}

impl SetSubchannelsRequest {
Expand All @@ -26353,23 +26344,17 @@ mod root {
#[allow(clippy::too_many_arguments)]
pub fn create(
builder: &mut ::planus::Builder,
field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>,
) -> ::planus::Offset<Self> {
let prepared_subchannels = field_subchannels.prepare(builder);

let mut table_writer: ::planus::table_writer::TableWriter<6> =
::core::default::Default::default();
if prepared_subchannels.is_some() {
table_writer.write_entry::<::planus::Offset<[u16]>>(0);
}
table_writer.write_entry::<::planus::Offset<[u16]>>(0);

unsafe {
table_writer.finish(builder, |object_writer| {
if let ::core::option::Option::Some(prepared_subchannels) =
prepared_subchannels
{
object_writer.write::<_, _, 4>(&prepared_subchannels);
}
object_writer.write::<_, _, 4>(&prepared_subchannels);
});
}
builder.current_offset()
Expand Down Expand Up @@ -26424,17 +26409,10 @@ mod root {
#[allow(clippy::type_complexity)]
pub fn subchannels<T0>(self, value: T0) -> SetSubchannelsRequestBuilder<(T0,)>
where
T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
T0: ::planus::WriteAs<::planus::Offset<[u16]>>,
{
SetSubchannelsRequestBuilder((value,))
}

/// Sets the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels) to null.
#[inline]
#[allow(clippy::type_complexity)]
pub fn subchannels_as_null(self) -> SetSubchannelsRequestBuilder<((),)> {
self.subchannels(())
}
}

impl<T0> SetSubchannelsRequestBuilder<(T0,)> {
Expand All @@ -26451,7 +26429,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAs<::planus::Offset<SetSubchannelsRequest>>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26466,7 +26444,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOptional<::planus::Offset<SetSubchannelsRequest>>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26482,7 +26460,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOffset<SetSubchannelsRequest>
for SetSubchannelsRequestBuilder<(T0,)>
{
Expand All @@ -26503,22 +26481,16 @@ mod root {
impl<'a> SetSubchannelsRequestRef<'a> {
/// Getter for the [`subchannels` field](SetSubchannelsRequest#structfield.subchannels).
#[inline]
pub fn subchannels(
&self,
) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>>
{
self.0.access(0, "SetSubchannelsRequest", "subchannels")
pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> {
self.0
.access_required(0, "SetSubchannelsRequest", "subchannels")
}
}

impl<'a> ::core::fmt::Debug for SetSubchannelsRequestRef<'a> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
let mut f = f.debug_struct("SetSubchannelsRequestRef");
if let ::core::option::Option::Some(field_subchannels) =
self.subchannels().transpose()
{
f.field("subchannels", &field_subchannels);
}
f.field("subchannels", &self.subchannels());
f.finish()
}
}
Expand All @@ -26529,13 +26501,7 @@ mod root {
#[allow(unreachable_code)]
fn try_from(value: SetSubchannelsRequestRef<'a>) -> ::planus::Result<Self> {
::core::result::Result::Ok(Self {
subchannels: if let ::core::option::Option::Some(subchannels) =
value.subchannels()?
{
::core::option::Option::Some(subchannels.to_vec()?)
} else {
::core::option::Option::None
},
subchannels: value.subchannels()?.to_vec()?,
})
}
}
Expand Down Expand Up @@ -26631,16 +26597,7 @@ mod root {
)]
pub struct SetSubchannelsResponse {
/// The field `subchannels` in the table `SetSubchannelsResponse`
pub subchannels: ::core::option::Option<::planus::alloc::vec::Vec<u16>>,
}

#[allow(clippy::derivable_impls)]
impl ::core::default::Default for SetSubchannelsResponse {
fn default() -> Self {
Self {
subchannels: ::core::default::Default::default(),
}
}
pub subchannels: ::planus::alloc::vec::Vec<u16>,
}

impl SetSubchannelsResponse {
Expand All @@ -26653,23 +26610,17 @@ mod root {
#[allow(clippy::too_many_arguments)]
pub fn create(
builder: &mut ::planus::Builder,
field_subchannels: impl ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
field_subchannels: impl ::planus::WriteAs<::planus::Offset<[u16]>>,
) -> ::planus::Offset<Self> {
let prepared_subchannels = field_subchannels.prepare(builder);

let mut table_writer: ::planus::table_writer::TableWriter<6> =
::core::default::Default::default();
if prepared_subchannels.is_some() {
table_writer.write_entry::<::planus::Offset<[u16]>>(0);
}
table_writer.write_entry::<::planus::Offset<[u16]>>(0);

unsafe {
table_writer.finish(builder, |object_writer| {
if let ::core::option::Option::Some(prepared_subchannels) =
prepared_subchannels
{
object_writer.write::<_, _, 4>(&prepared_subchannels);
}
object_writer.write::<_, _, 4>(&prepared_subchannels);
});
}
builder.current_offset()
Expand Down Expand Up @@ -26726,17 +26677,10 @@ mod root {
#[allow(clippy::type_complexity)]
pub fn subchannels<T0>(self, value: T0) -> SetSubchannelsResponseBuilder<(T0,)>
where
T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>,
T0: ::planus::WriteAs<::planus::Offset<[u16]>>,
{
SetSubchannelsResponseBuilder((value,))
}

/// Sets the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels) to null.
#[inline]
#[allow(clippy::type_complexity)]
pub fn subchannels_as_null(self) -> SetSubchannelsResponseBuilder<((),)> {
self.subchannels(())
}
}

impl<T0> SetSubchannelsResponseBuilder<(T0,)> {
Expand All @@ -26753,7 +26697,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAs<::planus::Offset<SetSubchannelsResponse>>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26768,7 +26712,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOptional<::planus::Offset<SetSubchannelsResponse>>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26784,7 +26728,7 @@ mod root {
}
}

impl<T0: ::planus::WriteAsOptional<::planus::Offset<[u16]>>>
impl<T0: ::planus::WriteAs<::planus::Offset<[u16]>>>
::planus::WriteAsOffset<SetSubchannelsResponse>
for SetSubchannelsResponseBuilder<(T0,)>
{
Expand All @@ -26805,22 +26749,16 @@ mod root {
impl<'a> SetSubchannelsResponseRef<'a> {
/// Getter for the [`subchannels` field](SetSubchannelsResponse#structfield.subchannels).
#[inline]
pub fn subchannels(
&self,
) -> ::planus::Result<::core::option::Option<::planus::Vector<'a, u16>>>
{
self.0.access(0, "SetSubchannelsResponse", "subchannels")
pub fn subchannels(&self) -> ::planus::Result<::planus::Vector<'a, u16>> {
self.0
.access_required(0, "SetSubchannelsResponse", "subchannels")
}
}

impl<'a> ::core::fmt::Debug for SetSubchannelsResponseRef<'a> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result {
let mut f = f.debug_struct("SetSubchannelsResponseRef");
if let ::core::option::Option::Some(field_subchannels) =
self.subchannels().transpose()
{
f.field("subchannels", &field_subchannels);
}
f.field("subchannels", &self.subchannels());
f.finish()
}
}
Expand All @@ -26831,13 +26769,7 @@ mod root {
#[allow(unreachable_code)]
fn try_from(value: SetSubchannelsResponseRef<'a>) -> ::planus::Result<Self> {
::core::result::Result::Ok(Self {
subchannels: if let ::core::option::Option::Some(subchannels) =
value.subchannels()?
{
::core::option::Option::Some(subchannels.to_vec()?)
} else {
::core::option::Option::None
},
subchannels: value.subchannels()?.to_vec()?,
})
}
}
Expand Down
63 changes: 56 additions & 7 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1909,6 +1909,7 @@ pub(crate) struct TransportConsumeDataRequest {
pub(crate) label: String,
pub(crate) protocol: String,
pub(crate) paused: bool,
pub(crate) subchannels: Option<Vec<u16>>,
}

#[derive(Debug)]
Expand All @@ -1919,6 +1920,7 @@ pub(crate) struct TransportConsumeDataResponse {
pub(crate) protocol: String,
pub(crate) paused: bool,
pub(crate) data_producer_paused: bool,
pub(crate) subchannels: Vec<u16>,
}

impl Request for TransportConsumeDataRequest {
Expand Down Expand Up @@ -1949,8 +1951,7 @@ impl Request for TransportConsumeDataRequest {
Some(self.protocol)
},
self.paused,
// TODO.
None::<Vec<u16>>,
self.subchannels,
);
let request_body = request::Body::create_transport_consume_data_request(&mut builder, data);
let request = request::Request::create(
Expand Down Expand Up @@ -1985,6 +1986,7 @@ impl Request for TransportConsumeDataRequest {
protocol: data.protocol.to_string(),
paused: data.paused,
data_producer_paused: data.data_producer_paused,
subchannels: data.subchannels,
})
}
}
Expand Down Expand Up @@ -2830,6 +2832,8 @@ impl Request for DataProducerResumeRequest {
pub(crate) struct DataProducerSendNotification {
pub(crate) ppid: u32,
pub(crate) payload: Vec<u8>,
pub(crate) subchannels: Option<Vec<u16>>,
pub(crate) required_subchannel: Option<u16>,
}

impl Notification for DataProducerSendNotification {
Expand All @@ -2839,17 +2843,14 @@ impl Notification for DataProducerSendNotification {
fn into_bytes(self, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

// TODO: Implement subchannels.
let subchannels: Vec<u16> = vec![];
let required_subchannel: Option<u16> = Default::default();
let binary_data = data_producer::Binary::create(&mut builder, self.payload);
let binary = data_producer::Data::create_binary(&mut builder, binary_data);
let data = data_producer::SendNotification::create(
&mut builder,
self.ppid,
binary,
subchannels,
required_subchannel,
self.subchannels,
self.required_subchannel,
);
let notification_body =
notification::Body::create_data_producer_send_notification(&mut builder, data);
Expand Down Expand Up @@ -3172,6 +3173,54 @@ impl From<DataConsumerSendRequest> for u32 {
}
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerSetSubchannelsRequest {
pub(crate) subchannels: Vec<u16>,
}

#[derive(Debug, Clone, Serialize)]
pub(crate) struct DataConsumerSetSubchannelsResponse {
pub(crate) subchannels: Vec<u16>,
}

impl Request for DataConsumerSetSubchannelsRequest {
const METHOD: request::Method = request::Method::DataconsumerSetSubchannels;
type HandlerId = DataConsumerId;
type Response = DataConsumerSetSubchannelsResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();

let data = data_consumer::SetSubchannelsRequest::create(&mut builder, self.subchannels);
let request_body =
request::Body::create_data_consumer_set_subchannels_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message::Type::Request, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::Body>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::Body::DataConsumerSetSubchannelsResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

Ok(DataConsumerSetSubchannelsResponse {
subchannels: data.subchannels,
})
}
}

#[derive(Debug)]
pub(crate) struct RtpObserverCloseRequest {
pub(crate) rtp_observer_id: RtpObserverId,
Expand Down
Loading

0 comments on commit 481b654

Please sign in to comment.