Skip to content

Commit

Permalink
Rust: remove temporal FBS suffixes (#1171)
Browse files Browse the repository at this point in the history
They were temporal until every message was ported to flatbuffers.
  • Loading branch information
jmillan authored Oct 9, 2023
1 parent eefbeef commit 096ee1e
Show file tree
Hide file tree
Showing 17 changed files with 200 additions and 225 deletions.
152 changes: 67 additions & 85 deletions rust/src/messages.rs

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl Inner {
let request = RouterCloseRequest { router_id: self.id };
self.executor
.spawn(async move {
if let Err(error) = channel.request_fbs("", request).await {
if let Err(error) = channel.request("", request).await {
error!("router closing failed on drop: {}", error);
}
})
Expand Down Expand Up @@ -534,7 +534,7 @@ impl Router {

self.inner
.channel
.request_fbs(self.inner.id, RouterDumpRequest {})
.request(self.inner.id, RouterDumpRequest {})
.await
}

Expand Down Expand Up @@ -563,7 +563,7 @@ impl Router {

self.inner
.channel
.request_fbs(
.request(
self.inner.id,
RouterCreateDirectTransportRequest {
data: RouterCreateDirectTransportData::from_options(
Expand Down Expand Up @@ -629,7 +629,7 @@ impl Router {
let data = self
.inner
.channel
.request_fbs(
.request(
self.inner.id,
RouterCreateWebRtcTransportRequest {
data: RouterCreateWebrtcTransportData::from_options(
Expand Down Expand Up @@ -698,7 +698,7 @@ impl Router {
let data = self
.inner
.channel
.request_fbs(
.request(
self.inner.id,
RouterCreatePipeTransportRequest {
data: RouterCreatePipeTransportData::from_options(
Expand Down Expand Up @@ -763,7 +763,7 @@ impl Router {
let data = self
.inner
.channel
.request_fbs(
.request(
self.inner.id,
RouterCreatePlainTransportRequest {
data: RouterCreatePlainTransportData::from_options(
Expand Down Expand Up @@ -829,7 +829,7 @@ impl Router {

self.inner
.channel
.request_fbs(
.request(
self.inner.id,
RouterCreateAudioLevelObserverRequest {
data: RouterCreateAudioLevelObserverData::from_options(
Expand Down Expand Up @@ -889,7 +889,7 @@ impl Router {

self.inner
.channel
.request_fbs(
.request(
self.inner.id,
RouterCreateActiveSpeakerObserverRequest {
data: RouterCreateActiveSpeakerObserverData::from_options(
Expand Down
12 changes: 6 additions & 6 deletions rust/src/router/active_speaker_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Inner {

self.executor
.spawn(async move {
if let Err(error) = channel.request_fbs(router_id, request).await {
if let Err(error) = channel.request(router_id, request).await {
error!("active speaker observer closing failed on drop: {}", error);
}
})
Expand Down Expand Up @@ -199,7 +199,7 @@ impl RtpObserver for ActiveSpeakerObserver {

self.inner
.channel
.request_fbs(self.id(), RtpObserverPauseRequest {})
.request(self.id(), RtpObserverPauseRequest {})
.await?;

let was_paused = self.inner.paused.swap(true, Ordering::SeqCst);
Expand All @@ -216,7 +216,7 @@ impl RtpObserver for ActiveSpeakerObserver {

self.inner
.channel
.request_fbs(self.id(), RtpObserverResumeRequest {})
.request(self.id(), RtpObserverResumeRequest {})
.await?;

let was_paused = self.inner.paused.swap(false, Ordering::SeqCst);
Expand All @@ -240,7 +240,7 @@ impl RtpObserver for ActiveSpeakerObserver {
};
self.inner
.channel
.request_fbs(self.id(), RtpObserverAddProducerRequest { producer_id })
.request(self.id(), RtpObserverAddProducerRequest { producer_id })
.await?;

self.inner.handlers.add_producer.call_simple(&producer);
Expand All @@ -257,7 +257,7 @@ impl RtpObserver for ActiveSpeakerObserver {
};
self.inner
.channel
.request_fbs(self.id(), RtpObserverRemoveProducerRequest { producer_id })
.request(self.id(), RtpObserverRemoveProducerRequest { producer_id })
.await?;

self.inner.handlers.remove_producer.call_simple(&producer);
Expand Down Expand Up @@ -317,7 +317,7 @@ impl ActiveSpeakerObserver {
let router = router.clone();
let handlers = Arc::clone(&handlers);

channel.subscribe_to_fbs_notifications(id.into(), move |notification| {
channel.subscribe_to_notifications(id.into(), move |notification| {
match Notification::from_fbs(notification) {
Ok(notification) => match notification {
Notification::DominantSpeaker(dominant_speaker) => {
Expand Down
12 changes: 6 additions & 6 deletions rust/src/router/audio_level_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Inner {

self.executor
.spawn(async move {
if let Err(error) = channel.request_fbs(router_id, request).await {
if let Err(error) = channel.request(router_id, request).await {
error!("audio level observer closing failed on drop: {}", error);
}
})
Expand Down Expand Up @@ -221,7 +221,7 @@ impl RtpObserver for AudioLevelObserver {

self.inner
.channel
.request_fbs(self.id(), RtpObserverPauseRequest {})
.request(self.id(), RtpObserverPauseRequest {})
.await?;

let was_paused = self.inner.paused.swap(true, Ordering::SeqCst);
Expand All @@ -238,7 +238,7 @@ impl RtpObserver for AudioLevelObserver {

self.inner
.channel
.request_fbs(self.id(), RtpObserverResumeRequest {})
.request(self.id(), RtpObserverResumeRequest {})
.await?;

let was_paused = self.inner.paused.swap(false, Ordering::SeqCst);
Expand All @@ -262,7 +262,7 @@ impl RtpObserver for AudioLevelObserver {
};
self.inner
.channel
.request_fbs(self.id(), RtpObserverAddProducerRequest { producer_id })
.request(self.id(), RtpObserverAddProducerRequest { producer_id })
.await?;

self.inner.handlers.add_producer.call_simple(&producer);
Expand All @@ -279,7 +279,7 @@ impl RtpObserver for AudioLevelObserver {
};
self.inner
.channel
.request_fbs(self.id(), RtpObserverRemoveProducerRequest { producer_id })
.request(self.id(), RtpObserverRemoveProducerRequest { producer_id })
.await?;

self.inner.handlers.remove_producer.call_simple(&producer);
Expand Down Expand Up @@ -339,7 +339,7 @@ impl AudioLevelObserver {
let router = router.clone();
let handlers = Arc::clone(&handlers);

channel.subscribe_to_fbs_notifications(id.into(), move |notification| {
channel.subscribe_to_notifications(id.into(), move |notification| {
match Notification::from_fbs(notification) {
Ok(notification) => match notification {
Notification::Volumes(volumes) => {
Expand Down
22 changes: 11 additions & 11 deletions rust/src/router/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ impl Inner {
self.executor
.spawn(async move {
if weak_producer.upgrade().is_some() {
if let Err(error) = channel.request_fbs(transport_id, request).await {
if let Err(error) = channel.request(transport_id, request).await {
error!("consumer closing failed on drop: {}", error);
}
}
Expand Down Expand Up @@ -783,7 +783,7 @@ impl Consumer {
let current_layers = Arc::clone(&current_layers);
let inner_weak = Arc::clone(&inner_weak);

channel.subscribe_to_fbs_notifications(id.into(), move |notification| {
channel.subscribe_to_notifications(id.into(), move |notification| {
match Notification::from_fbs(notification) {
Ok(notification) => match notification {
Notification::ProducerClose => {
Expand Down Expand Up @@ -993,7 +993,7 @@ impl Consumer {
let response = self
.inner
.channel
.request_fbs(self.id(), ConsumerDumpRequest {})
.request(self.id(), ConsumerDumpRequest {})
.await?;

if let response::Body::ConsumerDumpResponse(data) = response {
Expand All @@ -1013,7 +1013,7 @@ impl Consumer {
let response = self
.inner
.channel
.request_fbs(self.id(), ConsumerGetStatsRequest {})
.request(self.id(), ConsumerGetStatsRequest {})
.await;

if let Ok(response::Body::ConsumerGetStatsResponse(data)) = response {
Expand Down Expand Up @@ -1042,7 +1042,7 @@ impl Consumer {

self.inner
.channel
.request_fbs(self.id(), ConsumerPauseRequest {})
.request(self.id(), ConsumerPauseRequest {})
.await?;

let mut paused = self.inner.paused.lock();
Expand All @@ -1062,7 +1062,7 @@ impl Consumer {

self.inner
.channel
.request_fbs(self.id(), ConsumerResumeRequest {})
.request(self.id(), ConsumerResumeRequest {})
.await?;

let mut paused = self.inner.paused.lock();
Expand All @@ -1087,7 +1087,7 @@ impl Consumer {
let consumer_layers = self
.inner
.channel
.request_fbs(
.request(
self.id(),
ConsumerSetPreferredLayersRequest {
data: consumer_layers,
Expand All @@ -1109,7 +1109,7 @@ impl Consumer {
let result = self
.inner
.channel
.request_fbs(self.id(), ConsumerSetPriorityRequest { priority })
.request(self.id(), ConsumerSetPriorityRequest { priority })
.await?;

*self.inner.priority.lock() = result.priority;
Expand All @@ -1126,7 +1126,7 @@ impl Consumer {
let result = self
.inner
.channel
.request_fbs(self.id(), ConsumerSetPriorityRequest { priority })
.request(self.id(), ConsumerSetPriorityRequest { priority })
.await?;

*self.inner.priority.lock() = result.priority;
Expand All @@ -1140,7 +1140,7 @@ impl Consumer {

self.inner
.channel
.request_fbs(self.id(), ConsumerRequestKeyFrameRequest {})
.request(self.id(), ConsumerRequestKeyFrameRequest {})
.await
}

Expand All @@ -1153,7 +1153,7 @@ impl Consumer {

self.inner
.channel
.request_fbs(self.id(), ConsumerEnableTraceEventRequest { types })
.request(self.id(), ConsumerEnableTraceEventRequest { types })
.await
}

Expand Down
18 changes: 9 additions & 9 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl Inner {
self.executor
.spawn(async move {
if weak_data_producer.upgrade().is_some() {
if let Err(error) = channel.request_fbs(transport_id, request).await {
if let Err(error) = channel.request(transport_id, request).await {
error!("consumer closing failed on drop: {}", error);
}
}
Expand Down Expand Up @@ -470,7 +470,7 @@ impl DataConsumer {
let data_producer_paused = Arc::clone(&data_producer_paused);
let inner_weak = Arc::clone(&inner_weak);

channel.subscribe_to_fbs_notifications(id.into(), move |notification| {
channel.subscribe_to_notifications(id.into(), move |notification| {
match Notification::from_fbs(notification) {
Ok(notification) => match notification {
Notification::DataProducerClose => {
Expand Down Expand Up @@ -658,7 +658,7 @@ impl DataConsumer {
let response = self
.inner()
.channel
.request_fbs(self.id(), DataConsumerDumpRequest {})
.request(self.id(), DataConsumerDumpRequest {})
.await?;

if let response::Body::DataConsumerDumpResponse(data) = response {
Expand All @@ -678,7 +678,7 @@ impl DataConsumer {
let response = self
.inner()
.channel
.request_fbs(self.id(), DataConsumerGetStatsRequest {})
.request(self.id(), DataConsumerGetStatsRequest {})
.await?;

if let response::Body::DataConsumerGetStatsResponse(data) = response {
Expand All @@ -694,7 +694,7 @@ impl DataConsumer {

self.inner()
.channel
.request_fbs(self.id(), DataConsumerPauseRequest {})
.request(self.id(), DataConsumerPauseRequest {})
.await?;

let mut paused = self.inner().paused.lock();
Expand All @@ -714,7 +714,7 @@ impl DataConsumer {

self.inner()
.channel
.request_fbs(self.id(), DataConsumerResumeRequest {})
.request(self.id(), DataConsumerResumeRequest {})
.await?;

let mut paused = self.inner().paused.lock();
Expand All @@ -741,7 +741,7 @@ impl DataConsumer {
let response = self
.inner()
.channel
.request_fbs(self.id(), DataConsumerGetBufferedAmountRequest {})
.request(self.id(), DataConsumerGetBufferedAmountRequest {})
.await?;

Ok(response.buffered_amount)
Expand All @@ -760,7 +760,7 @@ impl DataConsumer {

self.inner()
.channel
.request_fbs(
.request(
self.id(),
DataConsumerSetBufferedAmountLowThresholdRequest { threshold },
)
Expand Down Expand Up @@ -888,7 +888,7 @@ impl DirectDataConsumer {

self.inner
.channel
.request_fbs(
.request(
self.inner.id,
DataConsumerSendRequest {
ppid,
Expand Down
Loading

0 comments on commit 096ee1e

Please sign in to comment.