Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rtsp static rtsp sink #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ impl Stream {
));
}

// Start the pipeline. This will automatically start sinks with linked proxy-isolated pipelines
stream
.pipeline
.inner_state_as_ref()
.pipeline_runner
.start()?;

Ok(stream)
}
}
Expand Down
33 changes: 29 additions & 4 deletions src/stream/pipeline/runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use gst::prelude::*;

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};

use tokio::sync::broadcast;
use tracing::*;
Expand All @@ -11,6 +11,7 @@ use crate::stream::gst::utils::wait_for_element_state;
#[allow(dead_code)]
pub struct PipelineRunner {
pipeline_weak: gst::glib::WeakRef<gst::Pipeline>,
start_signal_sender: broadcast::Sender<()>,
killswitch_sender: broadcast::Sender<String>,
_killswitch_receiver: broadcast::Receiver<String>,
_watcher_thread_handle: std::thread::JoinHandle<()>,
Expand All @@ -26,8 +27,11 @@ impl PipelineRunner {
let pipeline_weak = pipeline.downgrade();
let (killswitch_sender, _killswitch_receiver) = broadcast::channel(1);
let watcher_killswitch_receiver = killswitch_sender.subscribe();
let (start_signal_sender, start_signal_receiver) = broadcast::channel(1);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to point out that while solutions using the oneshot channel or barrier would have solved it, the broadcast channel is the easiest to deal with without fighting with async.


Ok(Self {
pipeline_weak: pipeline_weak.clone(),
start_signal_sender,
killswitch_sender: killswitch_sender.clone(),
_killswitch_receiver,
_watcher_thread_handle: std::thread::Builder::new()
Expand All @@ -38,6 +42,7 @@ impl PipelineRunner {
pipeline_weak,
pipeline_id,
watcher_killswitch_receiver,
start_signal_receiver,
allow_block,
) {
error!("PipelineWatcher ended with error: {error}");
Expand All @@ -64,6 +69,12 @@ impl PipelineRunner {
self.killswitch_sender.subscribe()
}

#[instrument(level = "debug", skip(self))]
pub fn start(&self) -> Result<()> {
self.start_signal_sender.send(())?;
Ok(())
}

#[instrument(level = "debug", skip(self))]
pub fn is_running(&self) -> bool {
!self._watcher_thread_handle.is_finished()
Expand All @@ -74,6 +85,7 @@ impl PipelineRunner {
pipeline_weak: gst::glib::WeakRef<gst::Pipeline>,
pipeline_id: uuid::Uuid,
mut killswitch_receiver: broadcast::Receiver<String>,
mut start_signal_receiver: broadcast::Receiver<()>,
allow_block: bool,
) -> Result<()> {
let pipeline = pipeline_weak
Expand All @@ -92,15 +104,28 @@ impl PipelineRunner {
let mut lost_timestamps: usize = 0;
let max_lost_timestamps: usize = 15;

let mut start_received = false;

'outer: loop {
std::thread::sleep(std::time::Duration::from_millis(100));

// Wait the signal to start
if !start_received {
if let Err(error) = start_signal_receiver.try_recv() {
match error {
broadcast::error::TryRecvError::Empty => continue,
_ => return Err(anyhow!("Failed receiving start signal: {error:?}")),
}
}
debug!("Starting signal received in Pipeline {pipeline_id}");
start_received = true;
}

if pipeline.current_state() != gst::State::Playing {
if let Err(error) = pipeline.set_state(gst::State::Playing) {
error!(
return Err(anyhow!(
"Failed setting Pipeline {pipeline_id} to Playing state. Reason: {error:?}"
);
break;
));
}
if let Err(error) = wait_for_element_state(
pipeline.upcast_ref::<gst::Element>(),
Expand Down