From 096ee1ed76cb5e289a2d225831745812aee283d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Luis=20Mill=C3=A1n?= Date: Mon, 9 Oct 2023 11:03:10 +0200 Subject: [PATCH] Rust: remove temporal FBS suffixes (#1171) They were temporal until every message was ported to flatbuffers. --- rust/src/messages.rs | 152 +++++++++------------ rust/src/router.rs | 16 +-- rust/src/router/active_speaker_observer.rs | 12 +- rust/src/router/audio_level_observer.rs | 12 +- rust/src/router/consumer.rs | 22 +-- rust/src/router/data_consumer.rs | 18 +-- rust/src/router/data_producer.rs | 12 +- rust/src/router/direct_transport.rs | 6 +- rust/src/router/pipe_transport.rs | 6 +- rust/src/router/plain_transport.rs | 6 +- rust/src/router/producer.rs | 16 +-- rust/src/router/transport.rs | 20 +-- rust/src/router/webrtc_transport.rs | 8 +- rust/src/webrtc_server.rs | 4 +- rust/src/worker.rs | 15 +- rust/src/worker/channel.rs | 82 ++++++----- rust/src/worker/common.rs | 18 +-- 17 files changed, 200 insertions(+), 225 deletions(-) diff --git a/rust/src/messages.rs b/rust/src/messages.rs index 441c5c6d43..5ae6bd603b 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -35,7 +35,6 @@ use crate::webrtc_transport::{ use crate::worker::{ChannelMessageHandlers, WorkerDump, WorkerUpdateSettings}; use parking_lot::Mutex; use planus::Builder; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::error::Error; use std::fmt::{Debug, Display}; @@ -43,23 +42,6 @@ use std::net::IpAddr; use std::num::NonZeroU16; pub(crate) trait Request -where - Self: Debug + Serialize, -{ - type HandlerId: Display; - type Response: DeserializeOwned; - - /// Request method to call on worker. - fn as_method(&self) -> &'static str; - - /// Default response to return in case of soft error, such as channel already closed, entity - /// doesn't exist on worker during closing. - fn default_for_soft_error() -> Option { - None - } -} - -pub(crate) trait RequestFbs where Self: Debug, { @@ -82,7 +64,7 @@ where -> Result>; } -pub(crate) trait NotificationFbs: Debug { +pub(crate) trait Notification: Debug { /// Notification event to call on worker. const EVENT: notification::Event; type HandlerId: Display; @@ -94,7 +76,7 @@ pub(crate) trait NotificationFbs: Debug { #[derive(Debug)] pub(crate) struct WorkerCloseRequest {} -impl RequestFbs for WorkerCloseRequest { +impl Request for WorkerCloseRequest { const METHOD: request::Method = request::Method::WorkerClose; type HandlerId = &'static str; type Response = (); @@ -125,7 +107,7 @@ impl RequestFbs for WorkerCloseRequest { #[derive(Debug)] pub(crate) struct WorkerDumpRequest {} -impl RequestFbs for WorkerDumpRequest { +impl Request for WorkerDumpRequest { const METHOD: request::Method = request::Method::WorkerDump; type HandlerId = &'static str; type Response = WorkerDump; @@ -187,7 +169,7 @@ pub(crate) struct WorkerUpdateSettingsRequest { pub(crate) data: WorkerUpdateSettings, } -impl RequestFbs for WorkerUpdateSettingsRequest { +impl Request for WorkerUpdateSettingsRequest { const METHOD: request::Method = request::Method::WorkerUpdateSettings; type HandlerId = &'static str; type Response = (); @@ -234,7 +216,7 @@ pub(crate) struct WorkerCreateWebRtcServerRequest { pub(crate) listen_infos: WebRtcServerListenInfos, } -impl RequestFbs for WorkerCreateWebRtcServerRequest { +impl Request for WorkerCreateWebRtcServerRequest { const METHOD: request::Method = request::Method::WorkerCreateWebrtcserver; type HandlerId = &'static str; type Response = (); @@ -273,7 +255,7 @@ pub(crate) struct WebRtcServerCloseRequest { pub(crate) webrtc_server_id: WebRtcServerId, } -impl RequestFbs for WebRtcServerCloseRequest { +impl Request for WebRtcServerCloseRequest { const METHOD: request::Method = request::Method::WorkerWebrtcserverClose; type HandlerId = &'static str; type Response = (); @@ -310,7 +292,7 @@ impl RequestFbs for WebRtcServerCloseRequest { #[derive(Debug)] pub(crate) struct WebRtcServerDumpRequest {} -impl RequestFbs for WebRtcServerDumpRequest { +impl Request for WebRtcServerDumpRequest { const METHOD: request::Method = request::Method::WebrtcserverDump; type HandlerId = WebRtcServerId; type Response = WebRtcServerDump; @@ -389,7 +371,7 @@ pub(crate) struct WorkerCreateRouterRequest { pub(crate) router_id: RouterId, } -impl RequestFbs for WorkerCreateRouterRequest { +impl Request for WorkerCreateRouterRequest { const METHOD: request::Method = request::Method::WorkerCreateRouter; type HandlerId = &'static str; type Response = (); @@ -423,7 +405,7 @@ pub(crate) struct RouterCloseRequest { pub(crate) router_id: RouterId, } -impl RequestFbs for RouterCloseRequest { +impl Request for RouterCloseRequest { const METHOD: request::Method = request::Method::WorkerCloseRouter; type HandlerId = &'static str; type Response = (); @@ -456,7 +438,7 @@ impl RequestFbs for RouterCloseRequest { #[derive(Debug)] pub(crate) struct RouterDumpRequest {} -impl RequestFbs for RouterDumpRequest { +impl Request for RouterDumpRequest { const METHOD: request::Method = request::Method::RouterDump; type HandlerId = RouterId; type Response = RouterDump; @@ -593,7 +575,7 @@ pub(crate) struct RouterCreateDirectTransportRequest { pub(crate) data: RouterCreateDirectTransportData, } -impl RequestFbs for RouterCreateDirectTransportRequest { +impl Request for RouterCreateDirectTransportRequest { const METHOD: request::Method = request::Method::RouterCreateDirecttransport; type HandlerId = RouterId; type Response = (); @@ -742,7 +724,7 @@ pub(crate) struct RouterCreateWebRtcTransportRequest { pub(crate) data: RouterCreateWebrtcTransportData, } -impl RequestFbs for RouterCreateWebRtcTransportRequest { +impl Request for RouterCreateWebRtcTransportRequest { const METHOD: request::Method = request::Method::RouterCreateWebrtctransport; type HandlerId = RouterId; type Response = WebRtcTransportData; @@ -872,7 +854,7 @@ pub(crate) struct RouterCreatePlainTransportRequest { pub(crate) data: RouterCreatePlainTransportData, } -impl RequestFbs for RouterCreatePlainTransportRequest { +impl Request for RouterCreatePlainTransportRequest { const METHOD: request::Method = request::Method::RouterCreatePlaintransport; type HandlerId = RouterId; type Response = PlainTransportData; @@ -996,7 +978,7 @@ pub(crate) struct RouterCreatePipeTransportRequest { pub(crate) data: RouterCreatePipeTransportData, } -impl RequestFbs for RouterCreatePipeTransportRequest { +impl Request for RouterCreatePipeTransportRequest { const METHOD: request::Method = request::Method::RouterCreatePipetransport; type HandlerId = RouterId; type Response = PipeTransportData; @@ -1087,7 +1069,7 @@ impl RouterCreateAudioLevelObserverData { } } -impl RequestFbs for RouterCreateAudioLevelObserverRequest { +impl Request for RouterCreateAudioLevelObserverRequest { const METHOD: request::Method = request::Method::RouterCreateAudiolevelobserver; type HandlerId = RouterId; type Response = (); @@ -1154,7 +1136,7 @@ impl RouterCreateActiveSpeakerObserverData { } } -impl RequestFbs for RouterCreateActiveSpeakerObserverRequest { +impl Request for RouterCreateActiveSpeakerObserverRequest { const METHOD: request::Method = request::Method::RouterCreateActivespeakerobserver; type HandlerId = RouterId; type Response = (); @@ -1197,7 +1179,7 @@ impl RequestFbs for RouterCreateActiveSpeakerObserverRequest { #[derive(Debug)] pub(crate) struct TransportDumpRequest {} -impl RequestFbs for TransportDumpRequest { +impl Request for TransportDumpRequest { const METHOD: request::Method = request::Method::TransportDump; type HandlerId = TransportId; type Response = response::Body; @@ -1232,7 +1214,7 @@ impl RequestFbs for TransportDumpRequest { #[derive(Debug)] pub(crate) struct TransportGetStatsRequest {} -impl RequestFbs for TransportGetStatsRequest { +impl Request for TransportGetStatsRequest { const METHOD: request::Method = request::Method::TransportGetStats; type HandlerId = TransportId; type Response = response::Body; @@ -1270,7 +1252,7 @@ pub(crate) struct TransportCloseRequest { pub(crate) transport_id: TransportId, } -impl RequestFbs for TransportCloseRequest { +impl Request for TransportCloseRequest { const METHOD: request::Method = request::Method::RouterCloseTransport; type HandlerId = RouterId; type Response = (); @@ -1311,7 +1293,7 @@ pub(crate) struct WebRtcTransportConnectRequest { pub(crate) dtls_parameters: DtlsParameters, } -impl RequestFbs for WebRtcTransportConnectRequest { +impl Request for WebRtcTransportConnectRequest { const METHOD: request::Method = request::Method::WebrtctransportConnect; type HandlerId = TransportId; type Response = WebRtcTransportConnectResponse; @@ -1360,7 +1342,7 @@ pub(crate) struct PipeTransportConnectRequest { pub(crate) srtp_parameters: Option, } -impl RequestFbs for PipeTransportConnectRequest { +impl Request for PipeTransportConnectRequest { const METHOD: request::Method = request::Method::PipetransportConnect; type HandlerId = TransportId; type Response = PipeTransportConnectResponse; @@ -1415,7 +1397,7 @@ pub(crate) struct TransportConnectPlainRequest { pub(crate) srtp_parameters: Option, } -impl RequestFbs for TransportConnectPlainRequest { +impl Request for TransportConnectPlainRequest { const METHOD: request::Method = request::Method::PlaintransportConnect; type HandlerId = TransportId; type Response = PlainTransportConnectResponse; @@ -1468,7 +1450,7 @@ pub(crate) struct TransportSetMaxIncomingBitrateRequest { pub(crate) bitrate: u32, } -impl RequestFbs for TransportSetMaxIncomingBitrateRequest { +impl Request for TransportSetMaxIncomingBitrateRequest { const METHOD: request::Method = request::Method::TransportSetMaxIncomingBitrate; type HandlerId = TransportId; type Response = (); @@ -1504,7 +1486,7 @@ pub(crate) struct TransportSetMaxOutgoingBitrateRequest { pub(crate) bitrate: u32, } -impl RequestFbs for TransportSetMaxOutgoingBitrateRequest { +impl Request for TransportSetMaxOutgoingBitrateRequest { const METHOD: request::Method = request::Method::TransportSetMaxOutgoingBitrate; type HandlerId = TransportId; type Response = (); @@ -1540,7 +1522,7 @@ pub(crate) struct TransportSetMinOutgoingBitrateRequest { pub(crate) bitrate: u32, } -impl RequestFbs for TransportSetMinOutgoingBitrateRequest { +impl Request for TransportSetMinOutgoingBitrateRequest { const METHOD: request::Method = request::Method::TransportSetMinOutgoingBitrate; type HandlerId = TransportId; type Response = (); @@ -1574,7 +1556,7 @@ impl RequestFbs for TransportSetMinOutgoingBitrateRequest { #[derive(Debug)] pub(crate) struct TransportRestartIceRequest {} -impl RequestFbs for TransportRestartIceRequest { +impl Request for TransportRestartIceRequest { const METHOD: request::Method = request::Method::TransportRestartIce; type HandlerId = TransportId; type Response = IceParameters; @@ -1624,7 +1606,7 @@ pub(crate) struct TransportProduceResponse { pub(crate) r#type: ProducerType, } -impl RequestFbs for TransportProduceRequest { +impl Request for TransportProduceRequest { const METHOD: request::Method = request::Method::TransportProduce; type HandlerId = TransportId; type Response = TransportProduceResponse; @@ -1688,7 +1670,7 @@ pub(crate) struct TransportConsumeResponse { pub(crate) preferred_layers: Option, } -impl RequestFbs for TransportConsumeRequest { +impl Request for TransportConsumeRequest { const METHOD: request::Method = request::Method::TransportConsume; type HandlerId = TransportId; type Response = TransportConsumeResponse; @@ -1762,7 +1744,7 @@ pub(crate) struct TransportProduceDataResponse { pub(crate) paused: bool, } -impl RequestFbs for TransportProduceDataRequest { +impl Request for TransportProduceDataRequest { const METHOD: request::Method = request::Method::TransportProduceData; type HandlerId = TransportId; type Response = TransportProduceDataResponse; @@ -1847,7 +1829,7 @@ pub(crate) struct TransportConsumeDataResponse { pub(crate) data_producer_paused: bool, } -impl RequestFbs for TransportConsumeDataRequest { +impl Request for TransportConsumeDataRequest { const METHOD: request::Method = request::Method::TransportConsumeData; type HandlerId = TransportId; type Response = TransportConsumeDataResponse; @@ -1920,7 +1902,7 @@ pub(crate) struct TransportEnableTraceEventRequest { pub(crate) types: Vec, } -impl RequestFbs for TransportEnableTraceEventRequest { +impl Request for TransportEnableTraceEventRequest { const METHOD: request::Method = request::Method::TransportEnableTraceEvent; type HandlerId = TransportId; type Response = (); @@ -1967,7 +1949,7 @@ pub(crate) struct TransportSendRtcpNotification { pub(crate) rtcp_packet: Vec, } -impl NotificationFbs for TransportSendRtcpNotification { +impl Notification for TransportSendRtcpNotification { const EVENT: notification::Event = notification::Event::TransportSendRtcp; type HandlerId = TransportId; @@ -1996,7 +1978,7 @@ pub(crate) struct ProducerCloseRequest { pub(crate) producer_id: ProducerId, } -impl RequestFbs for ProducerCloseRequest { +impl Request for ProducerCloseRequest { const METHOD: request::Method = request::Method::TransportCloseProducer; type HandlerId = TransportId; type Response = (); @@ -2031,7 +2013,7 @@ impl RequestFbs for ProducerCloseRequest { #[derive(Debug)] pub(crate) struct ProducerDumpRequest {} -impl RequestFbs for ProducerDumpRequest { +impl Request for ProducerDumpRequest { const METHOD: request::Method = request::Method::ProducerDump; type HandlerId = ProducerId; type Response = response::Body; @@ -2067,7 +2049,7 @@ impl RequestFbs for ProducerDumpRequest { #[derive(Debug)] pub(crate) struct ProducerGetStatsRequest {} -impl RequestFbs for ProducerGetStatsRequest { +impl Request for ProducerGetStatsRequest { const METHOD: request::Method = request::Method::ProducerGetStats; type HandlerId = ProducerId; type Response = response::Body; @@ -2103,7 +2085,7 @@ impl RequestFbs for ProducerGetStatsRequest { #[derive(Debug, Serialize)] pub(crate) struct ProducerPauseRequest {} -impl RequestFbs for ProducerPauseRequest { +impl Request for ProducerPauseRequest { const METHOD: request::Method = request::Method::ProducerPause; type HandlerId = ProducerId; type Response = (); @@ -2134,7 +2116,7 @@ impl RequestFbs for ProducerPauseRequest { #[derive(Debug, Serialize)] pub(crate) struct ProducerResumeRequest {} -impl RequestFbs for ProducerResumeRequest { +impl Request for ProducerResumeRequest { const METHOD: request::Method = request::Method::ProducerResume; type HandlerId = ProducerId; type Response = (); @@ -2167,7 +2149,7 @@ pub(crate) struct ProducerEnableTraceEventRequest { pub(crate) types: Vec, } -impl RequestFbs for ProducerEnableTraceEventRequest { +impl Request for ProducerEnableTraceEventRequest { const METHOD: request::Method = request::Method::ProducerEnableTraceEvent; type HandlerId = ProducerId; type Response = (); @@ -2214,7 +2196,7 @@ pub(crate) struct ProducerSendNotification { pub(crate) rtp_packet: Vec, } -impl NotificationFbs for ProducerSendNotification { +impl Notification for ProducerSendNotification { const EVENT: notification::Event = notification::Event::ProducerSend; type HandlerId = ProducerId; @@ -2243,7 +2225,7 @@ pub(crate) struct ConsumerCloseRequest { pub(crate) consumer_id: ConsumerId, } -impl RequestFbs for ConsumerCloseRequest { +impl Request for ConsumerCloseRequest { const METHOD: request::Method = request::Method::TransportCloseConsumer; type HandlerId = TransportId; type Response = (); @@ -2278,7 +2260,7 @@ impl RequestFbs for ConsumerCloseRequest { #[derive(Debug)] pub(crate) struct ConsumerDumpRequest {} -impl RequestFbs for ConsumerDumpRequest { +impl Request for ConsumerDumpRequest { const METHOD: request::Method = request::Method::ConsumerDump; type HandlerId = ConsumerId; type Response = response::Body; @@ -2314,7 +2296,7 @@ impl RequestFbs for ConsumerDumpRequest { #[derive(Debug)] pub(crate) struct ConsumerGetStatsRequest {} -impl RequestFbs for ConsumerGetStatsRequest { +impl Request for ConsumerGetStatsRequest { const METHOD: request::Method = request::Method::ConsumerGetStats; type HandlerId = ConsumerId; type Response = response::Body; @@ -2350,7 +2332,7 @@ impl RequestFbs for ConsumerGetStatsRequest { #[derive(Debug)] pub(crate) struct ConsumerPauseRequest {} -impl RequestFbs for ConsumerPauseRequest { +impl Request for ConsumerPauseRequest { const METHOD: request::Method = request::Method::ConsumerPause; type HandlerId = ConsumerId; type Response = (); @@ -2381,7 +2363,7 @@ impl RequestFbs for ConsumerPauseRequest { #[derive(Debug, Serialize)] pub(crate) struct ConsumerResumeRequest {} -impl RequestFbs for ConsumerResumeRequest { +impl Request for ConsumerResumeRequest { const METHOD: request::Method = request::Method::ConsumerResume; type HandlerId = ConsumerId; type Response = (); @@ -2414,7 +2396,7 @@ pub(crate) struct ConsumerSetPreferredLayersRequest { pub(crate) data: ConsumerLayers, } -impl RequestFbs for ConsumerSetPreferredLayersRequest { +impl Request for ConsumerSetPreferredLayersRequest { const METHOD: request::Method = request::Method::ConsumerSetPreferredLayers; type HandlerId = ConsumerId; type Response = Option; @@ -2466,7 +2448,7 @@ pub(crate) struct ConsumerSetPriorityResponse { pub(crate) priority: u8, } -impl RequestFbs for ConsumerSetPriorityRequest { +impl Request for ConsumerSetPriorityRequest { const METHOD: request::Method = request::Method::ConsumerSetPriority; type HandlerId = ConsumerId; type Response = ConsumerSetPriorityResponse; @@ -2506,7 +2488,7 @@ impl RequestFbs for ConsumerSetPriorityRequest { #[derive(Debug)] pub(crate) struct ConsumerRequestKeyFrameRequest {} -impl RequestFbs for ConsumerRequestKeyFrameRequest { +impl Request for ConsumerRequestKeyFrameRequest { const METHOD: request::Method = request::Method::ConsumerRequestKeyFrame; type HandlerId = ConsumerId; type Response = (); @@ -2538,7 +2520,7 @@ pub(crate) struct ConsumerEnableTraceEventRequest { pub(crate) types: Vec, } -impl RequestFbs for ConsumerEnableTraceEventRequest { +impl Request for ConsumerEnableTraceEventRequest { const METHOD: request::Method = request::Method::ConsumerEnableTraceEvent; type HandlerId = ConsumerId; type Response = (); @@ -2584,7 +2566,7 @@ pub(crate) struct DataProducerCloseRequest { pub(crate) data_producer_id: DataProducerId, } -impl RequestFbs for DataProducerCloseRequest { +impl Request for DataProducerCloseRequest { const METHOD: request::Method = request::Method::TransportCloseDataproducer; type HandlerId = TransportId; type Response = (); @@ -2621,7 +2603,7 @@ impl RequestFbs for DataProducerCloseRequest { #[derive(Debug)] pub(crate) struct DataProducerDumpRequest {} -impl RequestFbs for DataProducerDumpRequest { +impl Request for DataProducerDumpRequest { const METHOD: request::Method = request::Method::DataproducerDump; type HandlerId = DataProducerId; type Response = response::Body; @@ -2657,7 +2639,7 @@ impl RequestFbs for DataProducerDumpRequest { #[derive(Debug)] pub(crate) struct DataProducerGetStatsRequest {} -impl RequestFbs for DataProducerGetStatsRequest { +impl Request for DataProducerGetStatsRequest { const METHOD: request::Method = request::Method::DataproducerGetStats; type HandlerId = DataProducerId; type Response = response::Body; @@ -2693,7 +2675,7 @@ impl RequestFbs for DataProducerGetStatsRequest { #[derive(Debug, Serialize)] pub(crate) struct DataProducerPauseRequest {} -impl RequestFbs for DataProducerPauseRequest { +impl Request for DataProducerPauseRequest { const METHOD: request::Method = request::Method::DataproducerPause; type HandlerId = DataProducerId; type Response = (); @@ -2724,7 +2706,7 @@ impl RequestFbs for DataProducerPauseRequest { #[derive(Debug, Serialize)] pub(crate) struct DataProducerResumeRequest {} -impl RequestFbs for DataProducerResumeRequest { +impl Request for DataProducerResumeRequest { const METHOD: request::Method = request::Method::DataproducerResume; type HandlerId = DataProducerId; type Response = (); @@ -2758,7 +2740,7 @@ pub(crate) struct DataProducerSendNotification { pub(crate) payload: Vec, } -impl NotificationFbs for DataProducerSendNotification { +impl Notification for DataProducerSendNotification { const EVENT: notification::Event = notification::Event::DataproducerSend; type HandlerId = DataProducerId; @@ -2798,7 +2780,7 @@ pub(crate) struct DataConsumerCloseRequest { pub(crate) data_consumer_id: DataConsumerId, } -impl RequestFbs for DataConsumerCloseRequest { +impl Request for DataConsumerCloseRequest { const METHOD: request::Method = request::Method::TransportCloseDataconsumer; type HandlerId = TransportId; type Response = (); @@ -2835,7 +2817,7 @@ impl RequestFbs for DataConsumerCloseRequest { #[derive(Debug)] pub(crate) struct DataConsumerDumpRequest {} -impl RequestFbs for DataConsumerDumpRequest { +impl Request for DataConsumerDumpRequest { const METHOD: request::Method = request::Method::DataconsumerDump; type HandlerId = DataConsumerId; type Response = response::Body; @@ -2871,7 +2853,7 @@ impl RequestFbs for DataConsumerDumpRequest { #[derive(Debug)] pub(crate) struct DataConsumerGetStatsRequest {} -impl RequestFbs for DataConsumerGetStatsRequest { +impl Request for DataConsumerGetStatsRequest { const METHOD: request::Method = request::Method::DataconsumerGetStats; type HandlerId = DataConsumerId; type Response = response::Body; @@ -2907,7 +2889,7 @@ impl RequestFbs for DataConsumerGetStatsRequest { #[derive(Debug, Serialize)] pub(crate) struct DataConsumerPauseRequest {} -impl RequestFbs for DataConsumerPauseRequest { +impl Request for DataConsumerPauseRequest { const METHOD: request::Method = request::Method::DataconsumerPause; type HandlerId = DataConsumerId; type Response = (); @@ -2938,7 +2920,7 @@ impl RequestFbs for DataConsumerPauseRequest { #[derive(Debug, Serialize)] pub(crate) struct DataConsumerResumeRequest {} -impl RequestFbs for DataConsumerResumeRequest { +impl Request for DataConsumerResumeRequest { const METHOD: request::Method = request::Method::DataconsumerResume; type HandlerId = DataConsumerId; type Response = (); @@ -2974,7 +2956,7 @@ pub(crate) struct DataConsumerGetBufferedAmountResponse { pub(crate) buffered_amount: u32, } -impl RequestFbs for DataConsumerGetBufferedAmountRequest { +impl Request for DataConsumerGetBufferedAmountRequest { const METHOD: request::Method = request::Method::DataconsumerGetBufferedAmount; type HandlerId = DataConsumerId; type Response = DataConsumerGetBufferedAmountResponse; @@ -3013,7 +2995,7 @@ pub(crate) struct DataConsumerSetBufferedAmountLowThresholdRequest { pub(crate) threshold: u32, } -impl RequestFbs for DataConsumerSetBufferedAmountLowThresholdRequest { +impl Request for DataConsumerSetBufferedAmountLowThresholdRequest { const METHOD: request::Method = request::Method::DataconsumerSetBufferedAmountLowThreshold; type HandlerId = DataConsumerId; type Response = (); @@ -3058,7 +3040,7 @@ pub(crate) struct DataConsumerSendRequest { pub(crate) payload: Vec, } -impl RequestFbs for DataConsumerSendRequest { +impl Request for DataConsumerSendRequest { const METHOD: request::Method = request::Method::DataconsumerSend; type HandlerId = DataConsumerId; type Response = (); @@ -3102,7 +3084,7 @@ pub(crate) struct RtpObserverCloseRequest { pub(crate) rtp_observer_id: RtpObserverId, } -impl RequestFbs for RtpObserverCloseRequest { +impl Request for RtpObserverCloseRequest { const METHOD: request::Method = request::Method::RouterCloseRtpobserver; type HandlerId = RouterId; type Response = (); @@ -3137,7 +3119,7 @@ impl RequestFbs for RtpObserverCloseRequest { #[derive(Debug)] pub(crate) struct RtpObserverPauseRequest {} -impl RequestFbs for RtpObserverPauseRequest { +impl Request for RtpObserverPauseRequest { const METHOD: request::Method = request::Method::RtpobserverPause; type HandlerId = RtpObserverId; type Response = (); @@ -3167,7 +3149,7 @@ impl RequestFbs for RtpObserverPauseRequest { #[derive(Debug)] pub(crate) struct RtpObserverResumeRequest {} -impl RequestFbs for RtpObserverResumeRequest { +impl Request for RtpObserverResumeRequest { const METHOD: request::Method = request::Method::RtpobserverResume; type HandlerId = RtpObserverId; type Response = (); @@ -3200,7 +3182,7 @@ pub(crate) struct RtpObserverAddProducerRequest { pub(crate) producer_id: ProducerId, } -impl RequestFbs for RtpObserverAddProducerRequest { +impl Request for RtpObserverAddProducerRequest { const METHOD: request::Method = request::Method::RtpobserverAddProducer; type HandlerId = RtpObserverId; type Response = (); @@ -3239,7 +3221,7 @@ pub(crate) struct RtpObserverRemoveProducerRequest { pub(crate) producer_id: ProducerId, } -impl RequestFbs for RtpObserverRemoveProducerRequest { +impl Request for RtpObserverRemoveProducerRequest { const METHOD: request::Method = request::Method::RtpobserverRemoveProducer; type HandlerId = RtpObserverId; type Response = (); diff --git a/rust/src/router.rs b/rust/src/router.rs index 65e67b40ff..21ac35e13e 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -403,7 +403,7 @@ impl Inner { let request = RouterCloseRequest { router_id: self.id }; self.executor .spawn(async move { - if let Err(error) = channel.request_fbs("", request).await { + if let Err(error) = channel.request("", request).await { error!("router closing failed on drop: {}", error); } }) @@ -534,7 +534,7 @@ impl Router { self.inner .channel - .request_fbs(self.inner.id, RouterDumpRequest {}) + .request(self.inner.id, RouterDumpRequest {}) .await } @@ -563,7 +563,7 @@ impl Router { self.inner .channel - .request_fbs( + .request( self.inner.id, RouterCreateDirectTransportRequest { data: RouterCreateDirectTransportData::from_options( @@ -629,7 +629,7 @@ impl Router { let data = self .inner .channel - .request_fbs( + .request( self.inner.id, RouterCreateWebRtcTransportRequest { data: RouterCreateWebrtcTransportData::from_options( @@ -698,7 +698,7 @@ impl Router { let data = self .inner .channel - .request_fbs( + .request( self.inner.id, RouterCreatePipeTransportRequest { data: RouterCreatePipeTransportData::from_options( @@ -763,7 +763,7 @@ impl Router { let data = self .inner .channel - .request_fbs( + .request( self.inner.id, RouterCreatePlainTransportRequest { data: RouterCreatePlainTransportData::from_options( @@ -829,7 +829,7 @@ impl Router { self.inner .channel - .request_fbs( + .request( self.inner.id, RouterCreateAudioLevelObserverRequest { data: RouterCreateAudioLevelObserverData::from_options( @@ -889,7 +889,7 @@ impl Router { self.inner .channel - .request_fbs( + .request( self.inner.id, RouterCreateActiveSpeakerObserverRequest { data: RouterCreateActiveSpeakerObserverData::from_options( diff --git a/rust/src/router/active_speaker_observer.rs b/rust/src/router/active_speaker_observer.rs index b1676b4bd6..e7dd9a5207 100644 --- a/rust/src/router/active_speaker_observer.rs +++ b/rust/src/router/active_speaker_observer.rs @@ -138,7 +138,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(router_id, request).await { + if let Err(error) = channel.request(router_id, request).await { error!("active speaker observer closing failed on drop: {}", error); } }) @@ -199,7 +199,7 @@ impl RtpObserver for ActiveSpeakerObserver { self.inner .channel - .request_fbs(self.id(), RtpObserverPauseRequest {}) + .request(self.id(), RtpObserverPauseRequest {}) .await?; let was_paused = self.inner.paused.swap(true, Ordering::SeqCst); @@ -216,7 +216,7 @@ impl RtpObserver for ActiveSpeakerObserver { self.inner .channel - .request_fbs(self.id(), RtpObserverResumeRequest {}) + .request(self.id(), RtpObserverResumeRequest {}) .await?; let was_paused = self.inner.paused.swap(false, Ordering::SeqCst); @@ -240,7 +240,7 @@ impl RtpObserver for ActiveSpeakerObserver { }; self.inner .channel - .request_fbs(self.id(), RtpObserverAddProducerRequest { producer_id }) + .request(self.id(), RtpObserverAddProducerRequest { producer_id }) .await?; self.inner.handlers.add_producer.call_simple(&producer); @@ -257,7 +257,7 @@ impl RtpObserver for ActiveSpeakerObserver { }; self.inner .channel - .request_fbs(self.id(), RtpObserverRemoveProducerRequest { producer_id }) + .request(self.id(), RtpObserverRemoveProducerRequest { producer_id }) .await?; self.inner.handlers.remove_producer.call_simple(&producer); @@ -317,7 +317,7 @@ impl ActiveSpeakerObserver { let router = router.clone(); let handlers = Arc::clone(&handlers); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::DominantSpeaker(dominant_speaker) => { diff --git a/rust/src/router/audio_level_observer.rs b/rust/src/router/audio_level_observer.rs index abfc09b706..0e174c9b39 100644 --- a/rust/src/router/audio_level_observer.rs +++ b/rust/src/router/audio_level_observer.rs @@ -160,7 +160,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(router_id, request).await { + if let Err(error) = channel.request(router_id, request).await { error!("audio level observer closing failed on drop: {}", error); } }) @@ -221,7 +221,7 @@ impl RtpObserver for AudioLevelObserver { self.inner .channel - .request_fbs(self.id(), RtpObserverPauseRequest {}) + .request(self.id(), RtpObserverPauseRequest {}) .await?; let was_paused = self.inner.paused.swap(true, Ordering::SeqCst); @@ -238,7 +238,7 @@ impl RtpObserver for AudioLevelObserver { self.inner .channel - .request_fbs(self.id(), RtpObserverResumeRequest {}) + .request(self.id(), RtpObserverResumeRequest {}) .await?; let was_paused = self.inner.paused.swap(false, Ordering::SeqCst); @@ -262,7 +262,7 @@ impl RtpObserver for AudioLevelObserver { }; self.inner .channel - .request_fbs(self.id(), RtpObserverAddProducerRequest { producer_id }) + .request(self.id(), RtpObserverAddProducerRequest { producer_id }) .await?; self.inner.handlers.add_producer.call_simple(&producer); @@ -279,7 +279,7 @@ impl RtpObserver for AudioLevelObserver { }; self.inner .channel - .request_fbs(self.id(), RtpObserverRemoveProducerRequest { producer_id }) + .request(self.id(), RtpObserverRemoveProducerRequest { producer_id }) .await?; self.inner.handlers.remove_producer.call_simple(&producer); @@ -339,7 +339,7 @@ impl AudioLevelObserver { let router = router.clone(); let handlers = Arc::clone(&handlers); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::Volumes(volumes) => { diff --git a/rust/src/router/consumer.rs b/rust/src/router/consumer.rs index 199d6b6878..6110998303 100644 --- a/rust/src/router/consumer.rs +++ b/rust/src/router/consumer.rs @@ -707,7 +707,7 @@ impl Inner { self.executor .spawn(async move { if weak_producer.upgrade().is_some() { - if let Err(error) = channel.request_fbs(transport_id, request).await { + if let Err(error) = channel.request(transport_id, request).await { error!("consumer closing failed on drop: {}", error); } } @@ -783,7 +783,7 @@ impl Consumer { let current_layers = Arc::clone(¤t_layers); let inner_weak = Arc::clone(&inner_weak); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::ProducerClose => { @@ -993,7 +993,7 @@ impl Consumer { let response = self .inner .channel - .request_fbs(self.id(), ConsumerDumpRequest {}) + .request(self.id(), ConsumerDumpRequest {}) .await?; if let response::Body::ConsumerDumpResponse(data) = response { @@ -1013,7 +1013,7 @@ impl Consumer { let response = self .inner .channel - .request_fbs(self.id(), ConsumerGetStatsRequest {}) + .request(self.id(), ConsumerGetStatsRequest {}) .await; if let Ok(response::Body::ConsumerGetStatsResponse(data)) = response { @@ -1042,7 +1042,7 @@ impl Consumer { self.inner .channel - .request_fbs(self.id(), ConsumerPauseRequest {}) + .request(self.id(), ConsumerPauseRequest {}) .await?; let mut paused = self.inner.paused.lock(); @@ -1062,7 +1062,7 @@ impl Consumer { self.inner .channel - .request_fbs(self.id(), ConsumerResumeRequest {}) + .request(self.id(), ConsumerResumeRequest {}) .await?; let mut paused = self.inner.paused.lock(); @@ -1087,7 +1087,7 @@ impl Consumer { let consumer_layers = self .inner .channel - .request_fbs( + .request( self.id(), ConsumerSetPreferredLayersRequest { data: consumer_layers, @@ -1109,7 +1109,7 @@ impl Consumer { let result = self .inner .channel - .request_fbs(self.id(), ConsumerSetPriorityRequest { priority }) + .request(self.id(), ConsumerSetPriorityRequest { priority }) .await?; *self.inner.priority.lock() = result.priority; @@ -1126,7 +1126,7 @@ impl Consumer { let result = self .inner .channel - .request_fbs(self.id(), ConsumerSetPriorityRequest { priority }) + .request(self.id(), ConsumerSetPriorityRequest { priority }) .await?; *self.inner.priority.lock() = result.priority; @@ -1140,7 +1140,7 @@ impl Consumer { self.inner .channel - .request_fbs(self.id(), ConsumerRequestKeyFrameRequest {}) + .request(self.id(), ConsumerRequestKeyFrameRequest {}) .await } @@ -1153,7 +1153,7 @@ impl Consumer { self.inner .channel - .request_fbs(self.id(), ConsumerEnableTraceEventRequest { types }) + .request(self.id(), ConsumerEnableTraceEventRequest { types }) .await } diff --git a/rust/src/router/data_consumer.rs b/rust/src/router/data_consumer.rs index 5f72e4209a..401eec52da 100644 --- a/rust/src/router/data_consumer.rs +++ b/rust/src/router/data_consumer.rs @@ -338,7 +338,7 @@ impl Inner { self.executor .spawn(async move { if weak_data_producer.upgrade().is_some() { - if let Err(error) = channel.request_fbs(transport_id, request).await { + if let Err(error) = channel.request(transport_id, request).await { error!("consumer closing failed on drop: {}", error); } } @@ -470,7 +470,7 @@ impl DataConsumer { let data_producer_paused = Arc::clone(&data_producer_paused); let inner_weak = Arc::clone(&inner_weak); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::DataProducerClose => { @@ -658,7 +658,7 @@ impl DataConsumer { let response = self .inner() .channel - .request_fbs(self.id(), DataConsumerDumpRequest {}) + .request(self.id(), DataConsumerDumpRequest {}) .await?; if let response::Body::DataConsumerDumpResponse(data) = response { @@ -678,7 +678,7 @@ impl DataConsumer { let response = self .inner() .channel - .request_fbs(self.id(), DataConsumerGetStatsRequest {}) + .request(self.id(), DataConsumerGetStatsRequest {}) .await?; if let response::Body::DataConsumerGetStatsResponse(data) = response { @@ -694,7 +694,7 @@ impl DataConsumer { self.inner() .channel - .request_fbs(self.id(), DataConsumerPauseRequest {}) + .request(self.id(), DataConsumerPauseRequest {}) .await?; let mut paused = self.inner().paused.lock(); @@ -714,7 +714,7 @@ impl DataConsumer { self.inner() .channel - .request_fbs(self.id(), DataConsumerResumeRequest {}) + .request(self.id(), DataConsumerResumeRequest {}) .await?; let mut paused = self.inner().paused.lock(); @@ -741,7 +741,7 @@ impl DataConsumer { let response = self .inner() .channel - .request_fbs(self.id(), DataConsumerGetBufferedAmountRequest {}) + .request(self.id(), DataConsumerGetBufferedAmountRequest {}) .await?; Ok(response.buffered_amount) @@ -760,7 +760,7 @@ impl DataConsumer { self.inner() .channel - .request_fbs( + .request( self.id(), DataConsumerSetBufferedAmountLowThresholdRequest { threshold }, ) @@ -888,7 +888,7 @@ impl DirectDataConsumer { self.inner .channel - .request_fbs( + .request( self.inner.id, DataConsumerSendRequest { ppid, diff --git a/rust/src/router/data_producer.rs b/rust/src/router/data_producer.rs index 2fe0baa54e..409b372400 100644 --- a/rust/src/router/data_producer.rs +++ b/rust/src/router/data_producer.rs @@ -209,7 +209,7 @@ impl Inner { }; self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(transport_id, request).await { + if let Err(error) = channel.request(transport_id, request).await { error!("data producer closing failed on drop: {}", error); } }) @@ -420,7 +420,7 @@ impl DataProducer { let response = self .inner() .channel - .request_fbs(self.id(), DataProducerDumpRequest {}) + .request(self.id(), DataProducerDumpRequest {}) .await?; if let response::Body::DataProducerDumpResponse(data) = response { @@ -440,7 +440,7 @@ impl DataProducer { let response = self .inner() .channel - .request_fbs(self.id(), DataProducerGetStatsRequest {}) + .request(self.id(), DataProducerGetStatsRequest {}) .await?; if let response::Body::DataProducerGetStatsResponse(data) = response { @@ -458,7 +458,7 @@ impl DataProducer { self.inner() .channel - .request_fbs(self.id(), DataProducerPauseRequest {}) + .request(self.id(), DataProducerPauseRequest {}) .await?; let was_paused = self.inner().paused.swap(true, Ordering::SeqCst); @@ -478,7 +478,7 @@ impl DataProducer { self.inner() .channel - .request_fbs(self.id(), DataProducerResumeRequest {}) + .request(self.id(), DataProducerResumeRequest {}) .await?; let was_paused = self.inner().paused.swap(false, Ordering::SeqCst); @@ -546,7 +546,7 @@ impl DirectDataProducer { pub fn send(&self, message: WebRtcMessage<'_>) -> Result<(), NotificationError> { let (ppid, _payload) = message.into_ppid_and_payload(); - self.inner.channel.notify_fbs( + self.inner.channel.notify( self.inner.id, DataProducerSendNotification { ppid, diff --git a/rust/src/router/direct_transport.rs b/rust/src/router/direct_transport.rs index 8ccaf5bed0..0098c28bef 100644 --- a/rust/src/router/direct_transport.rs +++ b/rust/src/router/direct_transport.rs @@ -307,7 +307,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(router_id, request).await { + if let Err(error) = channel.request(router_id, request).await { error!("transport closing failed on drop: {}", error); } }) @@ -567,7 +567,7 @@ impl DirectTransport { let subscription_handler = { let handlers = Arc::clone(&handlers); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::Trace(trace_event_data) => { @@ -629,7 +629,7 @@ impl DirectTransport { pub fn send_rtcp(&self, rtcp_packet: Vec) -> Result<(), NotificationError> { self.inner .channel - .notify_fbs(self.id(), TransportSendRtcpNotification { rtcp_packet }) + .notify(self.id(), TransportSendRtcpNotification { rtcp_packet }) } /// Callback is called when the direct transport receives a RTCP packet from its router. diff --git a/rust/src/router/pipe_transport.rs b/rust/src/router/pipe_transport.rs index 45adb1eb10..3461a47cc4 100644 --- a/rust/src/router/pipe_transport.rs +++ b/rust/src/router/pipe_transport.rs @@ -366,7 +366,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(router_id, request).await { + if let Err(error) = channel.request(router_id, request).await { error!("transport closing failed on drop: {}", error); } }) @@ -616,7 +616,7 @@ impl PipeTransport { let handlers = Arc::clone(&handlers); let data = Arc::clone(&data); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::SctpStateChange { sctp_state } => { @@ -691,7 +691,7 @@ impl PipeTransport { let response = self .inner .channel - .request_fbs( + .request( self.id(), PipeTransportConnectRequest { ip: remote_parameters.ip, diff --git a/rust/src/router/plain_transport.rs b/rust/src/router/plain_transport.rs index e10859be72..d29d2adfcd 100644 --- a/rust/src/router/plain_transport.rs +++ b/rust/src/router/plain_transport.rs @@ -439,7 +439,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(router_id, request).await { + if let Err(error) = channel.request(router_id, request).await { error!("transport closing failed on drop: {}", error); } }) @@ -682,7 +682,7 @@ impl PlainTransport { let handlers = Arc::clone(&handlers); let data = Arc::clone(&data); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::Tuple { tuple } => { @@ -862,7 +862,7 @@ impl PlainTransport { let response = self .inner .channel - .request_fbs( + .request( self.inner.id, TransportConnectPlainRequest { ip: remote_parameters.ip, diff --git a/rust/src/router/producer.rs b/rust/src/router/producer.rs index d897fff776..d447703f14 100644 --- a/rust/src/router/producer.rs +++ b/rust/src/router/producer.rs @@ -601,7 +601,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(transport_id, request).await { + if let Err(error) = channel.request(transport_id, request).await { error!("producer closing failed on drop: {}", error); } }) @@ -722,7 +722,7 @@ impl Producer { let handlers = Arc::clone(&handlers); let score = Arc::clone(&score); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::Score(scores) => { @@ -852,7 +852,7 @@ impl Producer { let response = self .inner() .channel - .request_fbs(self.id(), ProducerDumpRequest {}) + .request(self.id(), ProducerDumpRequest {}) .await?; if let response::Body::ProducerDumpResponse(data) = response { @@ -872,7 +872,7 @@ impl Producer { let response = self .inner() .channel - .request_fbs(self.id(), ProducerGetStatsRequest {}) + .request(self.id(), ProducerGetStatsRequest {}) .await; if let Ok(response::Body::ProducerGetStatsResponse(data)) = response { @@ -890,7 +890,7 @@ impl Producer { self.inner() .channel - .request_fbs(self.id(), ProducerPauseRequest {}) + .request(self.id(), ProducerPauseRequest {}) .await?; let was_paused = self.inner().paused.swap(true, Ordering::SeqCst); @@ -910,7 +910,7 @@ impl Producer { self.inner() .channel - .request_fbs(self.id(), ProducerResumeRequest {}) + .request(self.id(), ProducerResumeRequest {}) .await?; let was_paused = self.inner().paused.swap(false, Ordering::SeqCst); @@ -931,7 +931,7 @@ impl Producer { self.inner() .channel - .request_fbs(self.id(), ProducerEnableTraceEventRequest { types }) + .request(self.id(), ProducerEnableTraceEventRequest { types }) .await } @@ -1028,7 +1028,7 @@ impl DirectProducer { pub fn send(&self, rtp_packet: Vec) -> Result<(), NotificationError> { self.inner .channel - .notify_fbs(self.inner.id, ProducerSendNotification { rtp_packet }) + .notify(self.inner.id, ProducerSendNotification { rtp_packet }) } } diff --git a/rust/src/router/transport.rs b/rust/src/router/transport.rs index 46eab83c5c..7e4a35b0e0 100644 --- a/rust/src/router/transport.rs +++ b/rust/src/router/transport.rs @@ -443,25 +443,25 @@ pub(super) trait TransportImpl: TransportGeneric { async fn dump_impl(&self) -> Result { self.channel() - .request_fbs(self.id(), TransportDumpRequest {}) + .request(self.id(), TransportDumpRequest {}) .await } async fn get_stats_impl(&self) -> Result { self.channel() - .request_fbs(self.id(), TransportGetStatsRequest {}) + .request(self.id(), TransportGetStatsRequest {}) .await } async fn set_max_incoming_bitrate_impl(&self, bitrate: u32) -> Result<(), RequestError> { self.channel() - .request_fbs(self.id(), TransportSetMaxIncomingBitrateRequest { bitrate }) + .request(self.id(), TransportSetMaxIncomingBitrateRequest { bitrate }) .await } async fn set_max_outgoing_bitrate_impl(&self, bitrate: u32) -> Result<(), RequestError> { self.channel() - .request_fbs(self.id(), TransportSetMaxOutgoingBitrateRequest { bitrate }) + .request(self.id(), TransportSetMaxOutgoingBitrateRequest { bitrate }) .await } @@ -470,13 +470,13 @@ pub(super) trait TransportImpl: TransportGeneric { types: Vec, ) -> Result<(), RequestError> { self.channel() - .request_fbs(self.id(), TransportEnableTraceEventRequest { types }) + .request(self.id(), TransportEnableTraceEventRequest { types }) .await } async fn set_min_outgoing_bitrate_impl(&self, bitrate: u32) -> Result<(), RequestError> { self.channel() - .request_fbs(self.id(), TransportSetMinOutgoingBitrateRequest { bitrate }) + .request(self.id(), TransportSetMinOutgoingBitrateRequest { bitrate }) .await } @@ -548,7 +548,7 @@ pub(super) trait TransportImpl: TransportGeneric { let response = self .channel() - .request_fbs( + .request( self.id(), TransportProduceRequest { producer_id, @@ -646,7 +646,7 @@ pub(super) trait TransportImpl: TransportGeneric { let response = self .channel() - .request_fbs( + .request( self.id(), TransportConsumeRequest { consumer_id, @@ -724,7 +724,7 @@ pub(super) trait TransportImpl: TransportGeneric { let response = self .channel() - .request_fbs( + .request( self.id(), TransportProduceDataRequest { data_producer_id, @@ -820,7 +820,7 @@ pub(super) trait TransportImpl: TransportGeneric { let response = self .channel() - .request_fbs( + .request( self.id(), TransportConsumeDataRequest { data_consumer_id, diff --git a/rust/src/router/webrtc_transport.rs b/rust/src/router/webrtc_transport.rs index f66623cc18..46836995ec 100644 --- a/rust/src/router/webrtc_transport.rs +++ b/rust/src/router/webrtc_transport.rs @@ -525,7 +525,7 @@ impl Inner { self.executor .spawn(async move { - if let Err(error) = channel.request_fbs(router_id, request).await { + if let Err(error) = channel.request(router_id, request).await { error!("transport closing failed on drop: {}", error); } }) @@ -779,7 +779,7 @@ impl WebRtcTransport { let handlers = Arc::clone(&handlers); let data = Arc::clone(&data); - channel.subscribe_to_fbs_notifications(id.into(), move |notification| { + channel.subscribe_to_notifications(id.into(), move |notification| { match Notification::from_fbs(notification) { Ok(notification) => match notification { Notification::IceStateChange { ice_state } => { @@ -931,7 +931,7 @@ impl WebRtcTransport { let response = self .inner .channel - .request_fbs( + .request( self.id(), WebRtcTransportConnectRequest { dtls_parameters: remote_parameters.dtls_parameters, @@ -1045,7 +1045,7 @@ impl WebRtcTransport { self.inner .channel - .request_fbs(self.id(), TransportRestartIceRequest {}) + .request(self.id(), TransportRestartIceRequest {}) .await } diff --git a/rust/src/webrtc_server.rs b/rust/src/webrtc_server.rs index f4fb0da2bb..fe323d30ba 100644 --- a/rust/src/webrtc_server.rs +++ b/rust/src/webrtc_server.rs @@ -184,7 +184,7 @@ impl Inner { }; self.executor .spawn(async move { - if let Err(error) = channel.request_fbs("", request).await { + if let Err(error) = channel.request("", request).await { error!("WebRTC server closing failed on drop: {}", error); } }) @@ -286,7 +286,7 @@ impl WebRtcServer { self.inner .channel - .request_fbs(self.id(), WebRtcServerDumpRequest {}) + .request(self.id(), WebRtcServerDumpRequest {}) .await } diff --git a/rust/src/worker.rs b/rust/src/worker.rs index e28b0cd6fc..32c473488f 100644 --- a/rust/src/worker.rs +++ b/rust/src/worker.rs @@ -488,7 +488,7 @@ impl Inner { let (sender, receiver) = async_oneshot::oneshot(); let id = self.id; let sender = Mutex::new(Some(sender)); - let _handler = self.channel.subscribe_to_fbs_notifications( + let _handler = self.channel.subscribe_to_notifications( SubscriptionTarget::String(std::process::id().to_string()), move |notification| { let result = match notification.event().unwrap() { @@ -553,7 +553,7 @@ impl Inner { self.executor .spawn(async move { - let _ = channel.request_fbs("", WorkerCloseRequest {}).await; + let _ = channel.request("", WorkerCloseRequest {}).await; // Drop channels in here after response from worker drop(channel); @@ -622,10 +622,7 @@ impl Worker { pub async fn dump(&self) -> Result { debug!("dump()"); - self.inner - .channel - .request_fbs("", WorkerDumpRequest {}) - .await + self.inner.channel.request("", WorkerDumpRequest {}).await } /// Updates the worker settings in runtime. Just a subset of the worker settings can be updated. @@ -635,7 +632,7 @@ impl Worker { match self .inner .channel - .request_fbs("", WorkerUpdateSettingsRequest { data }) + .request("", WorkerUpdateSettingsRequest { data }) .await { Ok(_) => Ok(()), @@ -666,7 +663,7 @@ impl Worker { self.inner .channel - .request_fbs( + .request( "", WorkerCreateWebRtcServerRequest { webrtc_server_id, @@ -715,7 +712,7 @@ impl Worker { self.inner .channel - .request_fbs("", WorkerCreateRouterRequest { router_id }) + .request("", WorkerCreateRouterRequest { router_id }) .await .map_err(CreateRouterError::Request)?; diff --git a/rust/src/worker/channel.rs b/rust/src/worker/channel.rs index 0a70088f13..b04c1e0cdb 100644 --- a/rust/src/worker/channel.rs +++ b/rust/src/worker/channel.rs @@ -1,6 +1,6 @@ use crate::fbs::{message, notification, request, response}; -use crate::messages::{NotificationFbs, RequestFbs}; -use crate::worker::common::{FBSEventHandlers, FBSWeakEventHandlers, SubscriptionTarget}; +use crate::messages::{Notification, Request}; +use crate::worker::common::{EventHandlers, SubscriptionTarget, WeakEventHandlers}; use crate::worker::utils; use crate::worker::utils::{PreparedChannelRead, PreparedChannelWrite}; use crate::worker::{RequestError, SubscriptionHandler}; @@ -56,17 +56,16 @@ pub enum NotificationParseError { #[allow(clippy::type_complexity)] pub(crate) struct BufferMessagesGuard { target_id: SubscriptionTarget, - fbs_buffered_notifications_for: Arc>>>>, - fbs_event_handlers_weak: FBSWeakEventHandlers< - Arc) + Send + Sync + 'static>, - >, + buffered_notifications_for: Arc>>>>, + event_handlers_weak: + WeakEventHandlers) + Send + Sync + 'static>>, } impl Drop for BufferMessagesGuard { fn drop(&mut self) { - let mut fbs_buffered_notifications_for = self.fbs_buffered_notifications_for.lock(); - if let Some(notifications) = fbs_buffered_notifications_for.remove(&self.target_id) { - if let Some(event_handlers) = self.fbs_event_handlers_weak.upgrade() { + let mut buffered_notifications_for = self.buffered_notifications_for.lock(); + if let Some(notifications) = buffered_notifications_for.remove(&self.target_id) { + if let Some(event_handlers) = self.event_handlers_weak.upgrade() { for notification in notifications { let notification = notification::NotificationRef::read_as_root(¬ification).unwrap(); @@ -137,7 +136,7 @@ impl<'a> Drop for RequestDropGuard<'a> { // Drop pending message from memory self.message.take(); // Remove request handler from the container - if let Some(requests_container) = self.channel.inner.fbs_requests_container_weak.upgrade() { + if let Some(requests_container) = self.channel.inner.requests_container_weak.upgrade() { requests_container.lock().handlers.remove(&self.id); } } @@ -165,11 +164,10 @@ struct OutgoingMessageBuffer { struct Inner { outgoing_message_buffer: Arc>, internal_message_receiver: async_channel::Receiver, - fbs_requests_container_weak: Weak>, - fbs_buffered_notifications_for: Arc>>>>, - fbs_event_handlers_weak: FBSWeakEventHandlers< - Arc) + Send + Sync + 'static>, - >, + requests_container_weak: Weak>, + buffered_notifications_for: Arc>>>>, + event_handlers_weak: + WeakEventHandlers) + Send + Sync + 'static>>, worker_closed: Arc, closed: AtomicBool, } @@ -193,12 +191,12 @@ impl Channel { handle: None, messages: VecDeque::with_capacity(10), })); - let fbs_requests_container = Arc::>::default(); - let fbs_requests_container_weak = Arc::downgrade(&fbs_requests_container); - let fbs_buffered_notifications_for = + let requests_container = Arc::>::default(); + let requests_container_weak = Arc::downgrade(&requests_container); + let buffered_notifications_for = Arc::>>>>::default(); - let fbs_event_handlers = FBSEventHandlers::new(); - let fbs_event_handlers_weak = fbs_event_handlers.downgrade(); + let event_handlers = EventHandlers::new(); + let event_handlers_weak = event_handlers.downgrade(); let prepared_channel_read = utils::prepare_channel_read_fn({ let outgoing_message_buffer = Arc::clone(&outgoing_message_buffer); @@ -223,7 +221,7 @@ impl Channel { let (internal_message_sender, internal_message_receiver) = async_channel::unbounded(); let prepared_channel_write = utils::prepare_channel_write_fn({ - let fbs_buffered_notifications_for = Arc::clone(&fbs_buffered_notifications_for); + let buffered_notifications_for = Arc::clone(&buffered_notifications_for); // This this contain cache of targets that are known to not have buffering, so // that we can avoid Mutex locking overhead for them let mut non_buffered_notifications = LruCache::::new( @@ -240,11 +238,10 @@ impl Channel { ); if !non_buffered_notifications.contains(&target_id) { - let mut fbs_buffer_notifications_for = - fbs_buffered_notifications_for.lock(); + let mut buffer_notifications_for = buffered_notifications_for.lock(); // Check if we need to buffer notifications for this // target_id - if let Some(list) = fbs_buffer_notifications_for.get_mut(&target_id) { + if let Some(list) = buffer_notifications_for.get_mut(&target_id) { list.push(Vec::from(message)); return; } @@ -252,11 +249,10 @@ impl Channel { // Remember we don't need to buffer these non_buffered_notifications.put(target_id.clone(), ()); } - fbs_event_handlers - .call_callbacks_with_single_value(&target_id, notification); + event_handlers.call_callbacks_with_single_value(&target_id, notification); } ChannelReceiveMessage::Response(response) => { - let sender = fbs_requests_container + let sender = requests_container .lock() .handlers .remove(&response.id().unwrap()); @@ -301,9 +297,9 @@ impl Channel { let inner = Arc::new(Inner { outgoing_message_buffer, internal_message_receiver, - fbs_requests_container_weak, - fbs_buffered_notifications_for, - fbs_event_handlers_weak, + requests_container_weak, + buffered_notifications_for, + event_handlers_weak, worker_closed, closed: AtomicBool::new(false), }); @@ -322,33 +318,33 @@ impl Channel { /// This allows to enable buffering for messages for specific target while the target itself is /// being created. This allows to avoid missing notifications due to race conditions. pub(crate) fn buffer_messages_for(&self, target_id: SubscriptionTarget) -> BufferMessagesGuard { - let fbs_buffered_notifications_for = Arc::clone(&self.inner.fbs_buffered_notifications_for); - let fbs_event_handlers_weak = self.inner.fbs_event_handlers_weak.clone(); - fbs_buffered_notifications_for + let buffered_notifications_for = Arc::clone(&self.inner.buffered_notifications_for); + let event_handlers_weak = self.inner.event_handlers_weak.clone(); + buffered_notifications_for .lock() .entry(target_id.clone()) .or_default(); BufferMessagesGuard { target_id, - fbs_buffered_notifications_for, - fbs_event_handlers_weak, + buffered_notifications_for, + event_handlers_weak, } } - pub(crate) async fn request_fbs( + pub(crate) async fn request( &self, handler_id: HandlerId, request: R, ) -> Result where - R: RequestFbs + 'static, + R: Request + 'static, HandlerId: Display, { let id; let (result_sender, result_receiver) = async_oneshot::oneshot(); { - let requests_container = match self.inner.fbs_requests_container_weak.upgrade() { + let requests_container = match self.inner.requests_container_weak.upgrade() { Some(requests_container_lock) => requests_container_lock, None => { return Err(RequestError::ChannelClosed); @@ -439,13 +435,13 @@ impl Channel { } } - pub(crate) fn notify_fbs( + pub(crate) fn notify( &self, handler_id: HandlerId, notification: N, ) -> Result<(), NotificationError> where - N: NotificationFbs, + N: Notification, HandlerId: Display, { debug!("notify() [{notification:?}]"); @@ -476,7 +472,7 @@ impl Channel { Ok(()) } - pub(crate) fn subscribe_to_fbs_notifications( + pub(crate) fn subscribe_to_notifications( &self, target_id: SubscriptionTarget, callback: F, @@ -485,8 +481,8 @@ impl Channel { F: Fn(notification::NotificationRef<'_>) + Send + Sync + 'static, { self.inner - .fbs_event_handlers_weak + .event_handlers_weak .upgrade() - .map(|fbs_event_handlers| fbs_event_handlers.add(target_id, Arc::new(callback))) + .map(|event_handlers| event_handlers.add(target_id, Arc::new(callback))) } } diff --git a/rust/src/worker/common.rs b/rust/src/worker/common.rs index 84f20611ba..f6ef339ea8 100644 --- a/rust/src/worker/common.rs +++ b/rust/src/worker/common.rs @@ -21,11 +21,11 @@ impl Default for EventHandlersList { } #[derive(Clone)] -pub(super) struct FBSEventHandlers { +pub(super) struct EventHandlers { handlers: Arc>>>, } -impl FBSEventHandlers { +impl EventHandlers { pub(super) fn new() -> Self { let handlers = Arc::>>>::default(); Self { handlers } @@ -73,14 +73,14 @@ impl FBSEventHandlers { }) } - pub(super) fn downgrade(&self) -> FBSWeakEventHandlers { - FBSWeakEventHandlers { + pub(super) fn downgrade(&self) -> WeakEventHandlers { + WeakEventHandlers { handlers: Arc::downgrade(&self.handlers), } } } -impl FBSEventHandlers) + Send + Sync + 'static>> { +impl EventHandlers) + Send + Sync + 'static>> { pub(super) fn call_callbacks_with_single_value( &self, target_id: &SubscriptionTarget, @@ -96,15 +96,15 @@ impl FBSEventHandlers) + Send + Syn } #[derive(Clone)] -pub(super) struct FBSWeakEventHandlers { +pub(super) struct WeakEventHandlers { handlers: Weak>>>, } -impl FBSWeakEventHandlers { - pub(super) fn upgrade(&self) -> Option> { +impl WeakEventHandlers { + pub(super) fn upgrade(&self) -> Option> { self.handlers .upgrade() - .map(|handlers| FBSEventHandlers { handlers }) + .map(|handlers| EventHandlers { handlers }) } }