diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 1f8ebb5b..cf56d747 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -96,6 +96,13 @@ impl Stream { )); } + // Start the pipeline. This will automatically start sinks with linked proxy-isolated pipelines + stream + .pipeline + .inner_state_as_ref() + .pipeline_runner + .start()?; + Ok(stream) } } diff --git a/src/stream/pipeline/runner.rs b/src/stream/pipeline/runner.rs index fda68a83..a5097d8d 100644 --- a/src/stream/pipeline/runner.rs +++ b/src/stream/pipeline/runner.rs @@ -1,6 +1,6 @@ use gst::prelude::*; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use tokio::sync::broadcast; use tracing::*; @@ -11,6 +11,7 @@ use crate::stream::gst::utils::wait_for_element_state; #[allow(dead_code)] pub struct PipelineRunner { pipeline_weak: gst::glib::WeakRef, + start_signal_sender: broadcast::Sender<()>, killswitch_sender: broadcast::Sender, _killswitch_receiver: broadcast::Receiver, _watcher_thread_handle: std::thread::JoinHandle<()>, @@ -26,8 +27,11 @@ impl PipelineRunner { let pipeline_weak = pipeline.downgrade(); let (killswitch_sender, _killswitch_receiver) = broadcast::channel(1); let watcher_killswitch_receiver = killswitch_sender.subscribe(); + let (start_signal_sender, start_signal_receiver) = broadcast::channel(1); + Ok(Self { pipeline_weak: pipeline_weak.clone(), + start_signal_sender, killswitch_sender: killswitch_sender.clone(), _killswitch_receiver, _watcher_thread_handle: std::thread::Builder::new() @@ -38,6 +42,7 @@ impl PipelineRunner { pipeline_weak, pipeline_id, watcher_killswitch_receiver, + start_signal_receiver, allow_block, ) { error!("PipelineWatcher ended with error: {error}"); @@ -64,6 +69,12 @@ impl PipelineRunner { self.killswitch_sender.subscribe() } + #[instrument(level = "debug", skip(self))] + pub fn start(&self) -> Result<()> { + self.start_signal_sender.send(())?; + Ok(()) + } + #[instrument(level = "debug", skip(self))] pub fn is_running(&self) -> bool { !self._watcher_thread_handle.is_finished() @@ -74,6 +85,7 @@ impl PipelineRunner { pipeline_weak: gst::glib::WeakRef, pipeline_id: uuid::Uuid, mut killswitch_receiver: broadcast::Receiver, + mut start_signal_receiver: broadcast::Receiver<()>, allow_block: bool, ) -> Result<()> { let pipeline = pipeline_weak @@ -92,15 +104,28 @@ impl PipelineRunner { let mut lost_timestamps: usize = 0; let max_lost_timestamps: usize = 15; + let mut start_received = false; + 'outer: loop { std::thread::sleep(std::time::Duration::from_millis(100)); + // Wait the signal to start + if !start_received { + if let Err(error) = start_signal_receiver.try_recv() { + match error { + broadcast::error::TryRecvError::Empty => continue, + _ => return Err(anyhow!("Failed receiving start signal: {error:?}")), + } + } + debug!("Starting signal received in Pipeline {pipeline_id}"); + start_received = true; + } + if pipeline.current_state() != gst::State::Playing { if let Err(error) = pipeline.set_state(gst::State::Playing) { - error!( + return Err(anyhow!( "Failed setting Pipeline {pipeline_id} to Playing state. Reason: {error:?}" - ); - break; + )); } if let Err(error) = wait_for_element_state( pipeline.upcast_ref::(),