From 3dd69b5367aaa7fcac04a6f361c6a7d081701ada Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Thu, 26 Oct 2023 12:52:25 +0200 Subject: [PATCH] Rust: FBS do not use heap to process responses (#1194) * Rust: FBS do not use heap to process responses Avoid heap allocations. Provides 10% of performance improvement[*]. This is a test bed for rewritting the rest of `from_fbs` to `from_fbs_ref`. NOTE: Once all `from_fbs` are moved to `from_fbs_ref`, rename the methods back to `from_fbs` [*]: benched producer::dump() --- rust/benches/producer.rs | 168 ++++++++++++++++++----- rust/src/messages.rs | 257 +++++++++++++++++++++--------------- rust/src/ortc.rs | 46 ++++--- rust/src/prelude.rs | 3 +- rust/src/router/consumer.rs | 149 +++++++++++---------- rust/src/router/producer.rs | 60 +++++---- rust/src/rtp_parameters.rs | 137 ++++++++++--------- rust/src/worker/channel.rs | 42 ++++-- 8 files changed, 522 insertions(+), 340 deletions(-) diff --git a/rust/benches/producer.rs b/rust/benches/producer.rs index 18493d9c93..b573896980 100644 --- a/rust/benches/producer.rs +++ b/rust/benches/producer.rs @@ -120,6 +120,77 @@ fn audio_producer_options() -> ProducerOptions { ) } +fn video_producer_options() -> ProducerOptions { + ProducerOptions::new( + MediaKind::Video, + RtpParameters { + mid: Some(fastrand::u32(100_000_000..999_999_999).to_string()), + codecs: vec![ + RtpCodecParameters::Video { + mime_type: MimeTypeVideo::H264, + payload_type: 112, + clock_rate: NonZeroU32::new(90000).unwrap(), + parameters: RtpCodecParametersParameters::from([ + ("packetization-mode", 1_u32.into()), + ("profile-level-id", "4d0032".into()), + ]), + rtcp_feedback: vec![ + RtcpFeedback::Nack, + RtcpFeedback::NackPli, + RtcpFeedback::GoogRemb, + ], + }, + RtpCodecParameters::Video { + mime_type: MimeTypeVideo::Rtx, + payload_type: 113, + clock_rate: NonZeroU32::new(90000).unwrap(), + parameters: RtpCodecParametersParameters::from([("apt", 112u32.into())]), + rtcp_feedback: vec![], + }, + ], + header_extensions: vec![ + RtpHeaderExtensionParameters { + uri: RtpHeaderExtensionUri::Mid, + id: 10, + encrypt: false, + }, + RtpHeaderExtensionParameters { + uri: RtpHeaderExtensionUri::VideoOrientation, + id: 13, + encrypt: false, + }, + ], + encodings: vec![ + RtpEncodingParameters { + ssrc: Some(22222222), + rtx: Some(RtpEncodingParametersRtx { ssrc: 22222223 }), + scalability_mode: "L1T3".parse().unwrap(), + ..RtpEncodingParameters::default() + }, + RtpEncodingParameters { + ssrc: Some(22222224), + rtx: Some(RtpEncodingParametersRtx { ssrc: 22222225 }), + ..RtpEncodingParameters::default() + }, + RtpEncodingParameters { + ssrc: Some(22222226), + rtx: Some(RtpEncodingParametersRtx { ssrc: 22222227 }), + ..RtpEncodingParameters::default() + }, + RtpEncodingParameters { + ssrc: Some(22222228), + rtx: Some(RtpEncodingParametersRtx { ssrc: 22222229 }), + ..RtpEncodingParameters::default() + }, + ], + rtcp: RtcpParameters { + cname: Some("video-1".to_string()), + ..RtcpParameters::default() + }, + }, + ) +} + pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("producer"); @@ -127,36 +198,73 @@ pub fn criterion_benchmark(c: &mut Criterion) { let (_worker, _router, transport_1, _transport_2) = futures_lite::future::block_on(async { init().await }); - let producer = futures_lite::future::block_on(async { - let (_worker, _router, transport_1, _transport_2) = init().await; - transport_1 - .produce(audio_producer_options()) - .await - .expect("Failed to produce audio") - }); - - group.bench_function("create", |b| { - b.iter(|| { - let _ = futures_lite::future::block_on(async { - transport_1 - .produce(audio_producer_options()) - .await - .expect("Failed to produce audio") - }); - }) - }); - - group.bench_function("dump", |b| { - b.iter(|| { - let _ = futures_lite::future::block_on(async { producer.dump().await }); - }) - }); - - group.bench_function("stats", |b| { - b.iter(|| { - let _ = futures_lite::future::block_on(async { producer.get_stats().await }); - }) - }); + { + let audio_producer = futures_lite::future::block_on(async { + let (_worker, _router, transport_1, _transport_2) = init().await; + transport_1 + .produce(audio_producer_options()) + .await + .expect("Failed to produce audio") + }); + + group.bench_function("create/audio", |b| { + b.iter(|| { + let _ = futures_lite::future::block_on(async { + transport_1 + .produce(audio_producer_options()) + .await + .expect("Failed to produce audio") + }); + }) + }); + + group.bench_function("dump/audio", |b| { + b.iter(|| { + let _ = futures_lite::future::block_on(async { audio_producer.dump().await }); + }) + }); + + group.bench_function("stats/audio", |b| { + b.iter(|| { + let _ = + futures_lite::future::block_on(async { audio_producer.get_stats().await }); + }) + }); + } + + { + let video_producer = futures_lite::future::block_on(async { + let (_worker, _router, transport_1, _transport_2) = init().await; + transport_1 + .produce(video_producer_options()) + .await + .expect("Failed to produce video") + }); + + group.bench_function("create/video", |b| { + b.iter(|| { + let _ = futures_lite::future::block_on(async { + transport_1 + .produce(video_producer_options()) + .await + .expect("Failed to produce video") + }); + }) + }); + + group.bench_function("dump/video", |b| { + b.iter(|| { + let _ = futures_lite::future::block_on(async { video_producer.dump().await }); + }) + }); + + group.bench_function("stats/video", |b| { + b.iter(|| { + let _ = + futures_lite::future::block_on(async { video_producer.get_stats().await }); + }) + }); + } } group.finish(); diff --git a/rust/src/messages.rs b/rust/src/messages.rs index 86ad756298..5e6f3b808f 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -13,12 +13,14 @@ use crate::direct_transport::DirectTransportOptions; use crate::fbs::{ active_speaker_observer, audio_level_observer, consumer, data_consumer, data_producer, direct_transport, message, notification, pipe_transport, plain_transport, producer, request, - response, router, rtp_observer, transport, web_rtc_transport, worker, + response, router, rtp_observer, transport, web_rtc_server, web_rtc_transport, worker, }; use crate::ortc::RtpMapping; use crate::pipe_transport::PipeTransportOptions; use crate::plain_transport::PlainTransportOptions; use crate::producer::{ProducerId, ProducerTraceEventType, ProducerType}; +use crate::router::consumer::ConsumerDump; +use crate::router::producer::ProducerDump; use crate::router::{RouterDump, RouterId}; use crate::rtp_observer::RtpObserverId; use crate::rtp_parameters::{MediaKind, RtpEncodingParameters, RtpParameters}; @@ -60,8 +62,9 @@ where } /// Convert generic response into specific type of this request. - fn convert_response(response: Option) - -> Result>; + fn convert_response( + response: Option>, + ) -> Result>; } pub(crate) trait Notification: Debug { @@ -98,7 +101,7 @@ impl Request for WorkerCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -129,12 +132,14 @@ impl Request for WorkerDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::WorkerDumpResponse(data)) = response else { + let Some(response::BodyRef::WorkerDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = worker::DumpResponse::try_from(data)?; + Ok(WorkerDump { router_ids: data .router_ids @@ -204,7 +209,7 @@ impl Request for WorkerUpdateSettingsRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -244,7 +249,7 @@ impl Request for WorkerCreateWebRtcServerRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -283,7 +288,7 @@ impl Request for WebRtcServerCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -314,12 +319,14 @@ impl Request for WebRtcServerDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::WebRtcServerDumpResponse(data)) = response else { + let Some(response::BodyRef::WebRtcServerDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = web_rtc_server::DumpResponse::try_from(data)?; + Ok(WebRtcServerDump { id: data.id.parse()?, udp_sockets: data @@ -394,7 +401,7 @@ impl Request for WorkerCreateRouterRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -429,7 +436,7 @@ impl Request for RouterCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -460,12 +467,14 @@ impl Request for RouterDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::RouterDumpResponse(data)) = response else { + let Some(response::BodyRef::RouterDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = router::DumpResponse::try_from(data)?; + Ok(RouterDump { id: data.id.parse()?, map_consumer_id_producer_id: data @@ -603,7 +612,7 @@ impl Request for RouterCreateDirectTransportRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -769,12 +778,14 @@ impl Request for RouterCreateWebRtcTransportRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::WebRtcTransportDumpResponse(data)) = response else { + let Some(response::BodyRef::WebRtcTransportDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = web_rtc_transport::DumpResponse::try_from(data)?; + Ok(WebRtcTransportData { ice_role: IceRole::from_fbs(data.ice_role), ice_parameters: IceParameters::from_fbs(*data.ice_parameters), @@ -844,12 +855,14 @@ impl Request for RouterCreateWebRtcTransportWithServerRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::WebRtcTransportDumpResponse(data)) = response else { + let Some(response::BodyRef::WebRtcTransportDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = web_rtc_transport::DumpResponse::try_from(data)?; + Ok(WebRtcTransportData { ice_role: IceRole::from_fbs(data.ice_role), ice_parameters: IceParameters::from_fbs(*data.ice_parameters), @@ -974,12 +987,14 @@ impl Request for RouterCreatePlainTransportRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::PlainTransportDumpResponse(data)) = response else { + let Some(response::BodyRef::PlainTransportDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = plain_transport::DumpResponse::try_from(data)?; + Ok(PlainTransportData { tuple: Mutex::new(TransportTuple::from_fbs(data.tuple.as_ref())), rtcp_tuple: Mutex::new( @@ -1098,12 +1113,14 @@ impl Request for RouterCreatePipeTransportRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::PipeTransportDumpResponse(data)) = response else { + let Some(response::BodyRef::PipeTransportDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = pipe_transport::DumpResponse::try_from(data)?; + Ok(PipeTransportData { tuple: Mutex::new(TransportTuple::from_fbs(data.tuple.as_ref())), sctp_parameters: data @@ -1197,7 +1214,7 @@ impl Request for RouterCreateAudioLevelObserverRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -1262,7 +1279,7 @@ impl Request for RouterCreateActiveSpeakerObserverRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -1293,10 +1310,10 @@ impl Request for TransportDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -1328,10 +1345,10 @@ impl Request for TransportGetStatsRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -1369,7 +1386,7 @@ impl Request for TransportCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -1410,12 +1427,14 @@ impl Request for WebRtcTransportConnectRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::WebRtcTransportConnectResponse(data)) = response else { + let Some(response::BodyRef::WebRtcTransportConnectResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = web_rtc_transport::ConnectResponse::try_from(data)?; + Ok(WebRtcTransportConnectResponse { dtls_local_role: DtlsRole::from_fbs(data.dtls_local_role), }) @@ -1462,12 +1481,14 @@ impl Request for PipeTransportConnectRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::PipeTransportConnectResponse(data)) = response else { + let Some(response::BodyRef::PipeTransportConnectResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = pipe_transport::ConnectResponse::try_from(data)?; + Ok(PipeTransportConnectResponse { tuple: TransportTuple::from_fbs(data.tuple.as_ref()), }) @@ -1519,12 +1540,14 @@ impl Request for TransportConnectPlainRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::PlainTransportConnectResponse(data)) = response else { + let Some(response::BodyRef::PlainTransportConnectResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = plain_transport::ConnectResponse::try_from(data)?; + Ok(PlainTransportConnectResponse { tuple: TransportTuple::from_fbs(data.tuple.as_ref()), rtcp_tuple: data @@ -1567,7 +1590,7 @@ impl Request for TransportSetMaxIncomingBitrateRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -1603,7 +1626,7 @@ impl Request for TransportSetMaxOutgoingBitrateRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -1639,7 +1662,7 @@ impl Request for TransportSetMinOutgoingBitrateRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -1669,12 +1692,14 @@ impl Request for TransportRestartIceRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::TransportRestartIceResponse(data)) = response else { + let Some(response::BodyRef::TransportRestartIceResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = transport::RestartIceResponse::try_from(data)?; + Ok(IceParameters::from_fbs(web_rtc_transport::IceParameters { username_fragment: data.username_fragment, password: data.password, @@ -1729,12 +1754,14 @@ impl Request for TransportProduceRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::TransportProduceResponse(data)) = response else { + let Some(response::BodyRef::TransportProduceResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = transport::ProduceResponse::try_from(data)?; + Ok(TransportProduceResponse { r#type: ProducerType::from_fbs(data.type_), }) @@ -1799,12 +1826,14 @@ impl Request for TransportConsumeRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::TransportConsumeResponse(data)) = response else { + let Some(response::BodyRef::TransportConsumeResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = transport::ConsumeResponse::try_from(data)?; + Ok(TransportConsumeResponse { paused: data.paused, producer_paused: data.producer_paused, @@ -1879,12 +1908,14 @@ impl Request for TransportProduceDataRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::DataProducerDumpResponse(data)) = response else { + let Some(response::BodyRef::DataProducerDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = data_producer::DumpResponse::try_from(data)?; + Ok(TransportProduceDataResponse { r#type: match data.type_ { data_producer::Type::Sctp => DataProducerType::Sctp, @@ -1968,12 +1999,14 @@ impl Request for TransportConsumeDataRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::DataConsumerDumpResponse(data)) = response else { + let Some(response::BodyRef::DataConsumerDumpResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = data_consumer::DumpResponse::try_from(data)?; + Ok(TransportConsumeDataResponse { r#type: match data.type_ { data_producer::Type::Sctp => DataConsumerType::Sctp, @@ -2027,7 +2060,7 @@ impl Request for TransportEnableTraceEventRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2098,7 +2131,7 @@ impl Request for ProducerCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2110,7 +2143,7 @@ pub(crate) struct ProducerDumpRequest {} impl Request for ProducerDumpRequest { const METHOD: request::Method = request::Method::ProducerDump; type HandlerId = ProducerId; - type Response = response::Body; + type Response = ProducerDump; fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { let mut builder = Builder::new(); @@ -2129,14 +2162,13 @@ impl Request for ProducerDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - match response { - Some(data) => Ok(data), - _ => { - panic!("Wrong message from worker: {response:?}"); - } - } + let Some(response::BodyRef::ProducerDumpResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + ProducerDump::from_fbs_ref(data) } } @@ -2165,10 +2197,10 @@ impl Request for ProducerGetStatsRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -2201,7 +2233,7 @@ impl Request for ProducerPauseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2232,7 +2264,7 @@ impl Request for ProducerResumeRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2274,7 +2306,7 @@ impl Request for ProducerEnableTraceEventRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2345,7 +2377,7 @@ impl Request for ConsumerCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2357,7 +2389,7 @@ pub(crate) struct ConsumerDumpRequest {} impl Request for ConsumerDumpRequest { const METHOD: request::Method = request::Method::ConsumerDump; type HandlerId = ConsumerId; - type Response = response::Body; + type Response = ConsumerDump; fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec { let mut builder = Builder::new(); @@ -2376,14 +2408,13 @@ impl Request for ConsumerDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - match response { - Some(data) => Ok(data), - _ => { - panic!("Wrong message from worker: {response:?}"); - } - } + let Some(response::BodyRef::ConsumerDumpResponse(data)) = response else { + panic!("Wrong message from worker: {response:?}"); + }; + + ConsumerDump::from_fbs_ref(data) } } @@ -2412,10 +2443,10 @@ impl Request for ConsumerGetStatsRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -2448,7 +2479,7 @@ impl Request for ConsumerPauseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2479,7 +2510,7 @@ impl Request for ConsumerResumeRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2519,12 +2550,14 @@ impl Request for ConsumerSetPreferredLayersRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::ConsumerSetPreferredLayersResponse(data)) = response else { + let Some(response::BodyRef::ConsumerSetPreferredLayersResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = consumer::SetPreferredLayersResponse::try_from(data)?; + match data.preferred_layers { Some(preferred_layers) => Ok(Some(ConsumerLayers::from_fbs(*preferred_layers))), None => Ok(None), @@ -2567,12 +2600,14 @@ impl Request for ConsumerSetPriorityRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::ConsumerSetPriorityResponse(data)) = response else { + let Some(response::BodyRef::ConsumerSetPriorityResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = consumer::SetPriorityResponse::try_from(data)?; + Ok(ConsumerSetPriorityResponse { priority: data.priority, }) @@ -2603,7 +2638,7 @@ impl Request for ConsumerRequestKeyFrameRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2645,7 +2680,7 @@ impl Request for ConsumerEnableTraceEventRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2688,7 +2723,7 @@ impl Request for DataProducerCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2719,10 +2754,10 @@ impl Request for DataProducerDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -2755,10 +2790,10 @@ impl Request for DataProducerGetStatsRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -2791,7 +2826,7 @@ impl Request for DataProducerPauseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2822,7 +2857,7 @@ impl Request for DataProducerResumeRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2902,7 +2937,7 @@ impl Request for DataConsumerCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -2933,10 +2968,10 @@ impl Request for DataConsumerDumpRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -2969,10 +3004,10 @@ impl Request for DataConsumerGetStatsRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { match response { - Some(data) => Ok(data), + Some(data) => Ok(data.try_into().unwrap()), _ => { panic!("Wrong message from worker: {response:?}"); } @@ -3005,7 +3040,7 @@ impl Request for DataConsumerPauseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3036,7 +3071,7 @@ impl Request for DataConsumerResumeRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3072,12 +3107,14 @@ impl Request for DataConsumerGetBufferedAmountRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::DataConsumerGetBufferedAmountResponse(data)) = response else { + let Some(response::BodyRef::DataConsumerGetBufferedAmountResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = data_consumer::GetBufferedAmountResponse::try_from(data)?; + Ok(DataConsumerGetBufferedAmountResponse { buffered_amount: data.buffered_amount, }) @@ -3121,7 +3158,7 @@ impl Request for DataConsumerSetBufferedAmountLowThresholdRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3161,7 +3198,7 @@ impl Request for DataConsumerSendRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3209,12 +3246,14 @@ impl Request for DataConsumerSetSubchannelsRequest { } fn convert_response( - response: Option, + response: Option>, ) -> Result> { - let Some(response::Body::DataConsumerSetSubchannelsResponse(data)) = response else { + let Some(response::BodyRef::DataConsumerSetSubchannelsResponse(data)) = response else { panic!("Wrong message from worker: {response:?}"); }; + let data = data_consumer::SetSubchannelsResponse::try_from(data)?; + Ok(DataConsumerSetSubchannelsResponse { subchannels: data.subchannels, }) @@ -3252,7 +3291,7 @@ impl Request for RtpObserverCloseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3282,7 +3321,7 @@ impl Request for RtpObserverPauseRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3312,7 +3351,7 @@ impl Request for RtpObserverResumeRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3351,7 +3390,7 @@ impl Request for RtpObserverAddProducerRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } @@ -3390,7 +3429,7 @@ impl Request for RtpObserverRemoveProducerRequest { } fn convert_response( - _response: Option, + _response: Option>, ) -> Result> { Ok(()) } diff --git a/rust/src/ortc.rs b/rust/src/ortc.rs index 03f6726cb3..f176e477f6 100644 --- a/rust/src/ortc.rs +++ b/rust/src/ortc.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::BTreeMap; use std::convert::TryFrom; +use std::error::Error; use std::mem; use std::num::{NonZeroU32, NonZeroU8}; use std::ops::Deref; @@ -77,32 +78,37 @@ impl RtpMapping { } } - pub(crate) fn from_fbs(mapping: rtp_parameters::RtpMapping) -> Self { - Self { + pub(crate) fn from_fbs_ref( + mapping: rtp_parameters::RtpMappingRef<'_>, + ) -> Result> { + Ok(Self { codecs: mapping - .codecs + .codecs()? .iter() - .map(|mapping| RtpMappingCodec { - payload_type: mapping.payload_type, - mapped_payload_type: mapping.mapped_payload_type, + .map(|mapping| { + Ok(RtpMappingCodec { + payload_type: mapping?.payload_type()?, + mapped_payload_type: mapping?.mapped_payload_type()?, + }) }) - .collect(), + .collect::, Box>>()?, encodings: mapping - .encodings + .encodings()? .iter() - .map(|mapping| RtpMappingEncoding { - rid: mapping.rid.clone().map(|rid| rid.to_string()), - ssrc: mapping.ssrc, - scalability_mode: mapping - .scalability_mode - .clone() - .unwrap_or(String::from("S1T1")) - .parse() - .unwrap(), - mapped_ssrc: mapping.mapped_ssrc, + .map(|mapping| { + Ok(RtpMappingEncoding { + rid: mapping?.rid()?.map(|rid| rid.to_string()), + ssrc: mapping?.ssrc()?, + scalability_mode: mapping? + .scalability_mode()? + .map(|maybe_scalability_mode| maybe_scalability_mode.parse()) + .transpose()? + .unwrap_or_default(), + mapped_ssrc: mapping?.mapped_ssrc()?, + }) }) - .collect(), - } + .collect::, Box>>()?, + }) } } diff --git a/rust/src/prelude.rs b/rust/src/prelude.rs index 9195a393b4..63a010fd5c 100644 --- a/rust/src/prelude.rs +++ b/rust/src/prelude.rs @@ -65,7 +65,8 @@ pub use crate::data_structures::{ pub use crate::rtp_parameters::{ MediaKind, MimeTypeAudio, MimeTypeVideo, RtcpFeedback, RtcpParameters, RtpCapabilities, RtpCapabilitiesFinalized, RtpCodecCapability, RtpCodecParameters, RtpCodecParametersParameters, - RtpHeaderExtensionParameters, RtpHeaderExtensionUri, RtpParameters, + RtpEncodingParameters, RtpEncodingParametersRtx, RtpHeaderExtensionParameters, + RtpHeaderExtensionUri, RtpParameters, }; pub use crate::sctp_parameters::SctpStreamParameters; pub use crate::srtp_parameters::SrtpCryptoSuite; diff --git a/rust/src/router/consumer.rs b/rust/src/router/consumer.rs index eaff8b425e..8280bc8614 100644 --- a/rust/src/router/consumer.rs +++ b/rust/src/router/consumer.rs @@ -161,24 +161,24 @@ pub struct RtpStreamParams { } impl RtpStreamParams { - pub(crate) fn from_fbs(params: &rtp_stream::Params) -> Self { - Self { - clock_rate: params.clock_rate, - cname: params.cname.clone(), - encoding_idx: params.encoding_idx, - mime_type: params.mime_type.clone().parse().unwrap(), - payload_type: params.payload_type, - spatial_layers: params.spatial_layers, - ssrc: params.ssrc, - temporal_layers: params.temporal_layers, - use_dtx: params.use_dtx, - use_in_band_fec: params.use_in_band_fec, - use_nack: params.use_nack, - use_pli: params.use_pli, - rid: params.rid.clone(), - rtx_ssrc: params.rtx_ssrc, - rtx_payload_type: params.rtx_payload_type, - } + pub(crate) fn from_fbs_ref(params: rtp_stream::ParamsRef<'_>) -> Result> { + Ok(Self { + clock_rate: params.clock_rate()?, + cname: params.cname()?.to_string(), + encoding_idx: params.encoding_idx()?, + mime_type: params.mime_type()?.parse()?, + payload_type: params.payload_type()?, + spatial_layers: params.spatial_layers()?, + ssrc: params.ssrc()?, + temporal_layers: params.temporal_layers()?, + use_dtx: params.use_dtx()?, + use_in_band_fec: params.use_in_band_fec()?, + use_nack: params.use_nack()?, + use_pli: params.use_pli()?, + rid: params.rid()?.map(|rid| rid.to_string()), + rtx_ssrc: params.rtx_ssrc()?, + rtx_payload_type: params.rtx_payload_type()?, + }) } } @@ -195,15 +195,15 @@ pub struct RtxStreamParams { } impl RtxStreamParams { - pub(crate) fn from_fbs(params: &rtx_stream::Params) -> Self { - Self { - clock_rate: params.clock_rate, - cname: params.cname.clone(), - mime_type: params.mime_type.clone().parse().unwrap(), - payload_type: params.payload_type, - ssrc: params.ssrc, - rrid: params.rrid.clone(), - } + pub(crate) fn from_fbs_ref(params: rtx_stream::ParamsRef<'_>) -> Result> { + Ok(Self { + clock_rate: params.clock_rate()?, + cname: params.cname()?.to_string(), + mime_type: params.mime_type()?.parse()?, + payload_type: params.payload_type()?, + ssrc: params.ssrc()?, + rrid: params.rrid()?.map(|rrid| rrid.to_string()), + }) } } @@ -216,11 +216,11 @@ pub struct RtpStream { } impl RtpStream { - pub(crate) fn from_fbs(dump: rtp_stream::Dump) -> Self { - Self { - params: RtpStreamParams::from_fbs(&dump.params), - score: dump.score, - } + pub(crate) fn from_fbs_ref(dump: rtp_stream::DumpRef<'_>) -> Result> { + Ok(Self { + params: RtpStreamParams::from_fbs_ref(dump.params()?)?, + score: dump.score()?, + }) } } @@ -263,42 +263,48 @@ pub struct ConsumerDump { } impl ConsumerDump { - pub(crate) fn from_fbs(dump: consumer::DumpResponse) -> Result> { - let dump = dump.data; + pub(crate) fn from_fbs_ref( + dump: consumer::DumpResponseRef<'_>, + ) -> Result> { + let dump = dump.data(); Ok(Self { - id: dump.base.id.parse()?, - kind: MediaKind::from_fbs(dump.base.kind), - paused: dump.base.paused, - priority: dump.base.priority, - producer_id: dump.base.producer_id.parse()?, - producer_paused: dump.base.producer_paused, - rtp_parameters: RtpParameters::from_fbs(*dump.base.rtp_parameters).unwrap(), - supported_codec_payload_types: dump.base.supported_codec_payload_types, - trace_event_types: dump - .base - .trace_event_types + id: dump?.base()?.id()?.parse()?, + kind: MediaKind::from_fbs(dump?.base()?.kind()?), + paused: dump?.base()?.paused()?, + priority: dump?.base()?.priority()?, + producer_id: dump?.base()?.producer_id()?.parse()?, + producer_paused: dump?.base()?.producer_paused()?, + rtp_parameters: RtpParameters::from_fbs_ref(dump?.base()?.rtp_parameters()?)?, + supported_codec_payload_types: Vec::from( + dump?.base()?.supported_codec_payload_types()?, + ), + trace_event_types: dump? + .base()? + .trace_event_types()? .iter() - .map(ConsumerTraceEventType::from_fbs) - .collect(), - r#type: ConsumerType::from_fbs(dump.base.type_), - consumable_rtp_encodings: dump - .base - .consumable_rtp_encodings - .into_iter() - .map(RtpEncodingParameters::from_fbs) - .collect(), - rtp_streams: dump - .rtp_streams - .into_iter() - .map(RtpStream::from_fbs) - .collect(), - preferred_spatial_layer: dump.preferred_spatial_layer, - target_spatial_layer: dump.target_spatial_layer, - current_spatial_layer: dump.current_spatial_layer, - preferred_temporal_layer: dump.preferred_temporal_layer, - target_temporal_layer: dump.target_temporal_layer, - current_temporal_layer: dump.current_temporal_layer, + .map(|trace_event_type| Ok(ConsumerTraceEventType::from_fbs(trace_event_type?))) + .collect::>>()?, + r#type: ConsumerType::from_fbs(dump?.base()?.type_()?), + consumable_rtp_encodings: dump? + .base()? + .consumable_rtp_encodings()? + .iter() + .map(|encoding_parameters| { + RtpEncodingParameters::from_fbs_ref(encoding_parameters?) + }) + .collect::>>()?, + rtp_streams: dump? + .rtp_streams()? + .iter() + .map(|stream| RtpStream::from_fbs_ref(stream?)) + .collect::>>()?, + preferred_spatial_layer: dump?.preferred_spatial_layer()?, + target_spatial_layer: dump?.target_spatial_layer()?, + current_spatial_layer: dump?.current_spatial_layer()?, + preferred_temporal_layer: dump?.preferred_temporal_layer()?, + target_temporal_layer: dump?.target_temporal_layer()?, + current_temporal_layer: dump?.current_temporal_layer()?, }) } } @@ -563,7 +569,7 @@ impl ConsumerTraceEventType { } } - pub(crate) fn from_fbs(event_type: &consumer::TraceEventType) -> Self { + pub(crate) fn from_fbs(event_type: consumer::TraceEventType) -> Self { match event_type { consumer::TraceEventType::Rtp => ConsumerTraceEventType::Rtp, consumer::TraceEventType::Keyframe => ConsumerTraceEventType::KeyFrame, @@ -986,17 +992,10 @@ impl Consumer { pub async fn dump(&self) -> Result { debug!("dump()"); - let response = self - .inner + self.inner .channel .request(self.id(), ConsumerDumpRequest {}) - .await?; - - if let response::Body::ConsumerDumpResponse(data) = response { - Ok(ConsumerDump::from_fbs(*data).expect("Error parsing dump response")) - } else { - panic!("Wrong message from worker"); - } + .await } /// Returns current RTC statistics of the consumer. diff --git a/rust/src/router/producer.rs b/rust/src/router/producer.rs index 4bfde5a99c..5b0e1b68fd 100644 --- a/rust/src/router/producer.rs +++ b/rust/src/router/producer.rs @@ -106,14 +106,18 @@ pub struct RtpStreamRecv { } impl RtpStreamRecv { - pub(crate) fn from_fbs(dump: &rtp_stream::Dump) -> Self { - Self { - params: RtpStreamParams::from_fbs(&dump.params), - score: dump.score, - rtx_stream: dump.rtx_stream.clone().map(|stream| RtxStream { - params: RtxStreamParams::from_fbs(&stream.params), - }), - } + pub(crate) fn from_fbs_ref(dump: rtp_stream::DumpRef<'_>) -> Result> { + Ok(Self { + params: RtpStreamParams::from_fbs_ref(dump.params()?)?, + score: dump.score()?, + rtx_stream: if let Some(rtx_stream) = dump.rtx_stream()? { + Some(RtxStream { + params: RtxStreamParams::from_fbs_ref(rtx_stream.params()?)?, + }) + } else { + None + }, + }) } } @@ -133,24 +137,29 @@ pub struct ProducerDump { } impl ProducerDump { - pub(crate) fn from_fbs(dump: producer::DumpResponse) -> Result> { + pub(crate) fn from_fbs_ref( + dump: producer::DumpResponseRef<'_>, + ) -> Result> { Ok(Self { - id: dump.id.parse()?, - kind: MediaKind::from_fbs(dump.kind), - paused: dump.paused, - rtp_mapping: RtpMapping::from_fbs(*dump.rtp_mapping), - rtp_parameters: RtpParameters::from_fbs(*dump.rtp_parameters).unwrap(), + id: dump.id()?.parse()?, + kind: MediaKind::from_fbs(dump.kind()?), + paused: dump.paused()?, + rtp_mapping: RtpMapping::from_fbs_ref(dump.rtp_mapping()?)?, + rtp_parameters: RtpParameters::from_fbs_ref(dump.rtp_parameters()?)?, rtp_streams: dump - .rtp_streams + .rtp_streams()? .iter() - .map(RtpStreamRecv::from_fbs) - .collect(), + .map(|rtp_stream| RtpStreamRecv::from_fbs_ref(rtp_stream?)) + .collect::>>()?, trace_event_types: dump - .trace_event_types + .trace_event_types()? .iter() - .map(ProducerTraceEventType::from_fbs) + .map(|trace_event_type| { + ProducerTraceEventType::from_fbs(&trace_event_type.unwrap()) + }) .collect(), - r#type: ProducerType::from_fbs(dump.type_), + + r#type: ProducerType::from_fbs(dump.type_()?), }) } } @@ -845,17 +854,10 @@ impl Producer { pub async fn dump(&self) -> Result { debug!("dump()"); - let response = self - .inner() + self.inner() .channel .request(self.id(), ProducerDumpRequest {}) - .await?; - - if let response::Body::ProducerDumpResponse(data) = response { - Ok(ProducerDump::from_fbs(*data).expect("Error parsing dump response")) - } else { - panic!("Wrong message from worker"); - } + .await } /// Returns current RTC statistics of the producer. diff --git a/rust/src/rtp_parameters.rs b/rust/src/rtp_parameters.rs index a4bfa84ae7..e063baa242 100644 --- a/rust/src/rtp_parameters.rs +++ b/rust/src/rtp_parameters.rs @@ -787,61 +787,63 @@ pub struct RtpParameters { } impl RtpParameters { - pub(crate) fn from_fbs( - rtp_parameters: rtp_parameters::RtpParameters, + pub(crate) fn from_fbs_ref( + rtp_parameters: rtp_parameters::RtpParametersRef<'_>, ) -> Result> { Ok(Self { - mid: rtp_parameters.mid, + mid: rtp_parameters.mid()?.map(|mid| mid.to_string()), codecs: rtp_parameters - .codecs + .codecs()? .into_iter() .map(|codec| { - let parameters = codec - .parameters - .unwrap_or_default() + let parameters = codec? + .parameters()? + .unwrap_or(planus::Vector::new_empty()) .into_iter() .map(|parameters| { Ok(( - Cow::Owned(parameters.name), - match parameters.value { - rtp_parameters::Value::Boolean(_) - | rtp_parameters::Value::Double(_) - | rtp_parameters::Value::Integer32Array(_) => { + Cow::Owned(parameters?.name()?.to_string()), + match parameters?.value()? { + rtp_parameters::ValueRef::Boolean(_) + | rtp_parameters::ValueRef::Double(_) + | rtp_parameters::ValueRef::Integer32Array(_) => { // TODO: Above value variant should not exist in the // first place panic!("Invalid parameter") } - rtp_parameters::Value::Integer32(n) => { + rtp_parameters::ValueRef::Integer32(n) => { RtpCodecParametersParametersValue::Number( - n.value.try_into()?, + n.value()?.try_into()?, ) } - rtp_parameters::Value::String(s) => { - RtpCodecParametersParametersValue::String(s.value.into()) + rtp_parameters::ValueRef::String(s) => { + RtpCodecParametersParametersValue::String( + s.value()?.to_string().into(), + ) } }, )) }) .collect::>>()?; - let rtcp_feedback = codec - .rtcp_feedback - .unwrap_or_default() + let rtcp_feedback = codec? + .rtcp_feedback()? + .unwrap_or(planus::Vector::new_empty()) .into_iter() .map(|rtcp_feedback| { - RtcpFeedback::from_type_parameter( - &rtcp_feedback.type_, - &rtcp_feedback.parameter.unwrap_or_default(), - ) + Ok(RtcpFeedback::from_type_parameter( + rtcp_feedback?.type_()?, + rtcp_feedback?.parameter()?.unwrap_or_default(), + )?) }) - .collect::>()?; + .collect::>>()?; - Ok(match MimeType::from_str(&codec.mime_type)? { + Ok(match MimeType::from_str(codec?.mime_type()?)? { MimeType::Audio(mime_type) => RtpCodecParameters::Audio { mime_type, - payload_type: codec.payload_type, - clock_rate: codec.clock_rate.try_into()?, - channels: codec - .channels + payload_type: codec?.payload_type()?, + clock_rate: codec?.clock_rate()?.try_into()?, + channels: codec? + .channels()? .ok_or("Audio must have channels specified")? .try_into()?, parameters, @@ -849,8 +851,8 @@ impl RtpParameters { }, MimeType::Video(mime_type) => RtpCodecParameters::Video { mime_type, - payload_type: codec.payload_type, - clock_rate: codec.clock_rate.try_into()?, + payload_type: codec?.payload_type()?, + clock_rate: codec?.clock_rate()?.try_into()?, parameters, rtcp_feedback, }, @@ -858,44 +860,47 @@ impl RtpParameters { }) .collect::>>()?, header_extensions: rtp_parameters - .header_extensions + .header_extensions()? .into_iter() .map(|header_extension_parameters| { Ok(RtpHeaderExtensionParameters { - uri: RtpHeaderExtensionUri::from_fbs(header_extension_parameters.uri), - id: u16::from(header_extension_parameters.id), - encrypt: header_extension_parameters.encrypt, + uri: RtpHeaderExtensionUri::from_fbs(header_extension_parameters?.uri()?), + id: u16::from(header_extension_parameters?.id()?), + encrypt: header_extension_parameters?.encrypt()?, }) }) .collect::>>()?, encodings: rtp_parameters - .encodings + .encodings()? .into_iter() .map(|encoding| { Ok(RtpEncodingParameters { - ssrc: encoding.ssrc, - rid: encoding.rid, - codec_payload_type: encoding.codec_payload_type, - rtx: encoding - .rtx - .map(|rtx| RtpEncodingParametersRtx { ssrc: rtx.ssrc }), + ssrc: encoding?.ssrc()?, + rid: encoding?.rid()?.map(|rid| rid.to_string()), + codec_payload_type: encoding?.codec_payload_type()?, + rtx: encoding?.rtx()?.map(|rtx| RtpEncodingParametersRtx { + ssrc: rtx.ssrc().unwrap(), + }), dtx: { - match encoding.dtx { + match encoding?.dtx()? { true => Some(true), false => None, } }, - scalability_mode: encoding - .scalability_mode - .unwrap_or(String::from("S1T1")) + scalability_mode: encoding? + .scalability_mode()? + .unwrap_or(String::from("S1T1").as_str()) .parse()?, - max_bitrate: encoding.max_bitrate, + max_bitrate: encoding?.max_bitrate()?, }) }) .collect::>>()?, rtcp: RtcpParameters { - cname: rtp_parameters.rtcp.cname, - reduced_size: rtp_parameters.rtcp.reduced_size, + cname: rtp_parameters + .rtcp()? + .cname()? + .map(|cname| cname.to_string()), + reduced_size: rtp_parameters.rtcp()?.reduced_size()?, }, }) } @@ -1303,27 +1308,31 @@ impl RtpEncodingParameters { } } - pub(crate) fn from_fbs(encoding_parameters: rtp_parameters::RtpEncodingParameters) -> Self { - Self { - ssrc: encoding_parameters.ssrc, - rid: encoding_parameters.rid.clone(), - codec_payload_type: encoding_parameters.codec_payload_type, - rtx: encoding_parameters - .rtx - .map(|rtx| RtpEncodingParametersRtx { ssrc: rtx.ssrc }), + pub(crate) fn from_fbs_ref( + encoding_parameters: rtp_parameters::RtpEncodingParametersRef<'_>, + ) -> Result> { + Ok(Self { + ssrc: encoding_parameters.ssrc()?, + rid: encoding_parameters.rid()?.map(|rid| rid.to_string()), + codec_payload_type: encoding_parameters.codec_payload_type()?, + rtx: if let Some(rtx) = encoding_parameters.rtx()? { + Some(RtpEncodingParametersRtx { ssrc: rtx.ssrc()? }) + } else { + None + }, dtx: { - match encoding_parameters.dtx { + match encoding_parameters.dtx()? { true => Some(true), false => None, } }, scalability_mode: encoding_parameters - .scalability_mode - .unwrap_or(String::from("S1T1")) - .parse() - .unwrap(), - max_bitrate: encoding_parameters.max_bitrate, - } + .scalability_mode()? + .map(|maybe_scalability_mode| maybe_scalability_mode.parse()) + .transpose()? + .unwrap_or_default(), + max_bitrate: encoding_parameters.max_bitrate()?, + }) } } diff --git a/rust/src/worker/channel.rs b/rust/src/worker/channel.rs index a0dd9f8b08..ea24ebe8c3 100644 --- a/rust/src/worker/channel.rs +++ b/rust/src/worker/channel.rs @@ -91,8 +91,14 @@ enum ChannelReceiveMessage<'a> { Event(InternalMessage), } +// Remove the first 4 bytes which represent the buffer size. +// NOTE: The prefix is only needed for NodeJS. +fn unprefix_message(bytes: &[u8]) -> &[u8] { + &bytes[4..] +} + fn deserialize_message(bytes: &[u8]) -> ChannelReceiveMessage<'_> { - let message_ref = message::MessageRef::read_as_root(&bytes[4..]).unwrap(); + let message_ref = message::MessageRef::read_as_root(bytes).unwrap(); match message_ref.data().unwrap() { message::BodyRef::Log(data) => match data.data().unwrap().chars().next() { @@ -126,7 +132,7 @@ struct ResponseError { reason: String, } -type FBSResponseResult = Result, ResponseError>; +type FBSResponseResult = Result>, ResponseError>; struct RequestDropGuard<'a> { id: u32, @@ -239,6 +245,8 @@ impl Channel { move |message| { trace!("received raw message: {}", String::from_utf8_lossy(message)); + let message = unprefix_message(message); + match deserialize_message(message) { ChannelReceiveMessage::Notification(notification) => { let target_id = notification.handler_id().unwrap(); @@ -254,7 +262,7 @@ impl Channel { // target_id if let Some(list) = buffer_notifications_for.get_mut(&target_id) { // Store the whole message removing the size prefix. - list.push(Vec::from(&message[4..])); + list.push(Vec::from(message)); return; } @@ -279,12 +287,8 @@ impl Channel { else { match response.body().expect("failed accessing response body") { // Response has body. - Some(body_ref) => { - let _ = sender.send(Ok(Some( - body_ref - .try_into() - .expect("failed to retrieve response body"), - ))); + Some(_) => { + let _ = sender.send(Ok(Some(Vec::from(message)))); } // Response does not have body. None => { @@ -421,11 +425,25 @@ impl Channel { }; match response_result { - Ok(data) => { + Ok(bytes) => { debug!("request succeeded [method:{:?}, id:{}]", R::METHOD, id); - trace!("{data:?}"); - Ok(R::convert_response(data).map_err(RequestError::ResponseConversion)?) + match bytes { + Some(bytes) => { + let message_ref = message::MessageRef::read_as_root(&bytes).unwrap(); + + let message::BodyRef::Response(response_ref) = message_ref.data().unwrap() + else { + panic!("Wrong response stored: {message_ref:?}"); + }; + + Ok(R::convert_response(response_ref.body().unwrap()) + .map_err(RequestError::ResponseConversion)?) + } + None => { + Ok(R::convert_response(None).map_err(RequestError::ResponseConversion)?) + } + } } Err(ResponseError { reason }) => { debug!(