From 2e28a2b438468fe314e171774d80b1029e53c46d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Fri, 10 Jul 2020 16:28:58 +0200 Subject: [PATCH 1/5] Expose method to push data to media stream --- backends/dummy/lib.rs | 12 ++++++++++-- backends/gstreamer/lib.rs | 14 +++++++++++--- backends/gstreamer/media_capture.rs | 19 ++++++++++++++----- backends/gstreamer/media_stream.rs | 22 ++++++++++++++++++++++ examples/simple_webrtc.rs | 2 +- examples/videoinput_stream.rs | 5 ++++- servo-media/lib.rs | 10 +++++++--- streams/lib.rs | 11 +++++++++++ 8 files changed, 80 insertions(+), 15 deletions(-) diff --git a/backends/dummy/lib.rs b/backends/dummy/lib.rs index 35d0a30e..bb75f2a4 100644 --- a/backends/dummy/lib.rs +++ b/backends/dummy/lib.rs @@ -21,7 +21,7 @@ use servo_media_player::{audio, video, Player, PlayerError, PlayerEvent, StreamT use servo_media_streams::capture::MediaTrackConstraintSet; use servo_media_streams::device_monitor::{MediaDeviceInfo, MediaDeviceMonitor}; use servo_media_streams::registry::{register_stream, unregister_stream, MediaStreamId}; -use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType}; +use servo_media_streams::{MediaOutput, MediaSocket, MediaSource, MediaStream, MediaStreamType}; use servo_media_traits::{ClientContextId, MediaInstance}; use servo_media_webrtc::{ thread, BundlePolicy, DataChannelId, DataChannelInit, DataChannelMessage, IceCandidate, @@ -74,7 +74,11 @@ impl Backend for DummyBackend { (Box::new(DummySocket), id) } - fn create_videoinput_stream(&self, _: MediaTrackConstraintSet) -> Option { + fn create_videoinput_stream( + &self, + _: MediaTrackConstraintSet, + _: MediaSource, + ) -> Option { Some(register_stream(Arc::new(Mutex::new(DummyMediaStream { id: MediaStreamId::new(), })))) @@ -118,6 +122,8 @@ impl Backend for DummyBackend { fn get_device_monitor(&self) -> Box { Box::new(DummyMediaDeviceMonitor {}) } + + fn push_stream_data(&self, _: &MediaStreamId, _: Vec) {} } impl AudioBackend for DummyBackend { @@ -242,6 +248,8 @@ impl MediaStream for DummyMediaStream { fn ty(&self) -> MediaStreamType { MediaStreamType::Audio } + + fn push_data(&self, _: Vec) {} } impl Drop for DummyMediaStream { diff --git a/backends/gstreamer/lib.rs b/backends/gstreamer/lib.rs index 44be69a8..3ea7656d 100644 --- a/backends/gstreamer/lib.rs +++ b/backends/gstreamer/lib.rs @@ -64,7 +64,7 @@ use servo_media_player::{Player, PlayerEvent, StreamType}; use servo_media_streams::capture::MediaTrackConstraintSet; use servo_media_streams::device_monitor::MediaDeviceMonitor; use servo_media_streams::registry::MediaStreamId; -use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType}; +use servo_media_streams::{MediaOutput, MediaSocket, MediaSource, MediaStreamType}; use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance}; use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller}; use std::collections::HashMap; @@ -245,12 +245,20 @@ impl Backend for GStreamerBackend { media_capture::create_audioinput_stream(set) } - fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option { + fn create_videoinput_stream( + &self, + set: MediaTrackConstraintSet, + source: MediaSource, + ) -> Option { if self.capture_mocking.load(Ordering::Acquire) { // XXXManishearth we should caps filter this return Some(self.create_videostream()); } - media_capture::create_videoinput_stream(set) + media_capture::create_videoinput_stream(set, source) + } + + fn push_stream_data(&self, stream: &MediaStreamId, data: Vec) { + GStreamerMediaStream::push_data(stream, data); } fn can_play_type(&self, media_type: &str) -> SupportsMediaType { diff --git a/backends/gstreamer/media_capture.rs b/backends/gstreamer/media_capture.rs index 1ef476ad..3871309f 100644 --- a/backends/gstreamer/media_capture.rs +++ b/backends/gstreamer/media_capture.rs @@ -3,7 +3,7 @@ use gst; use gst::prelude::*; use servo_media_streams::capture::*; use servo_media_streams::registry::MediaStreamId; -use servo_media_streams::MediaStreamType; +use servo_media_streams::{MediaSource, MediaStreamType}; use std::i32; trait AddToCaps { @@ -150,6 +150,7 @@ pub struct GstMediaTrack { fn create_input_stream( stream_type: MediaStreamType, constraint_set: MediaTrackConstraintSet, + source: MediaSource, ) -> Option { let devices = GstMediaDevices::new(); devices @@ -159,14 +160,22 @@ fn create_input_stream( MediaStreamType::Audio => GStreamerMediaStream::create_audio_from, MediaStreamType::Video => GStreamerMediaStream::create_video_from, }; - f(track.element) + f(match source { + MediaSource::Device => track.element, + MediaSource::App => { + gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed") + } + }) }) } pub fn create_audioinput_stream(constraint_set: MediaTrackConstraintSet) -> Option { - create_input_stream(MediaStreamType::Audio, constraint_set) + create_input_stream(MediaStreamType::Audio, constraint_set, MediaSource::Device) } -pub fn create_videoinput_stream(constraint_set: MediaTrackConstraintSet) -> Option { - create_input_stream(MediaStreamType::Video, constraint_set) +pub fn create_videoinput_stream( + constraint_set: MediaTrackConstraintSet, + source: MediaSource, +) -> Option { + create_input_stream(MediaStreamType::Video, constraint_set, source) } diff --git a/backends/gstreamer/media_stream.rs b/backends/gstreamer/media_stream.rs index 6215d34c..f4ea13a2 100644 --- a/backends/gstreamer/media_stream.rs +++ b/backends/gstreamer/media_stream.rs @@ -2,6 +2,7 @@ use super::BACKEND_BASE_TIME; use glib::prelude::*; use gst; use gst::prelude::*; +use gst_app::AppSrc; use servo_media_streams::registry::{ get_stream, register_stream, unregister_stream, MediaStreamId, }; @@ -47,6 +48,17 @@ impl MediaStream for GStreamerMediaStream { fn ty(&self) -> MediaStreamType { self.type_ } + + fn push_data(&self, data: Vec) { + if let Some(source) = self.elements.last() { + if let Some(appsrc) = source.downcast_ref::() { + let buffer = gst::Buffer::from_slice(data); + if let Err(error) = appsrc.push_buffer(buffer) { + warn!("{}", error); + } + } + } + } } impl GStreamerMediaStream { @@ -217,6 +229,16 @@ impl GStreamerMediaStream { (stream, GstreamerMediaSocket { proxy_sink }) } + + pub fn push_data(stream: &MediaStreamId, data: Vec) { + let stream = get_stream(stream).expect("Media streams registry does not contain such ID"); + let mut stream = stream.lock().unwrap(); + let stream = stream + .as_mut_any() + .downcast_mut::() + .unwrap(); + stream.push_data(data); + } } impl Drop for GStreamerMediaStream { diff --git a/examples/simple_webrtc.rs b/examples/simple_webrtc.rs index c24e1342..b906b5b4 100644 --- a/examples/simple_webrtc.rs +++ b/examples/simple_webrtc.rs @@ -132,7 +132,7 @@ impl State { let (video, audio) = if !self.peer_id.is_some() { ( self.media - .create_videoinput_stream(Default::default()) + .create_videoinput_stream(Default::default(), MediaSource::Device) .unwrap_or_else(|| self.media.create_videostream()), self.media .create_audioinput_stream(Default::default()) diff --git a/examples/videoinput_stream.rs b/examples/videoinput_stream.rs index 7bf25c0b..50313778 100644 --- a/examples/videoinput_stream.rs +++ b/examples/videoinput_stream.rs @@ -1,12 +1,15 @@ extern crate servo_media; extern crate servo_media_auto; +use servo_media::streams::MediaSource; use servo_media::ServoMedia; use std::sync::Arc; use std::{thread, time}; fn run_example(servo_media: Arc) { - if let Some(stream) = servo_media.create_videoinput_stream(Default::default()) { + if let Some(stream) = + servo_media.create_videoinput_stream(Default::default(), MediaSource::Device) + { let mut output = servo_media.create_stream_output(); output.add_stream(&stream); thread::sleep(time::Duration::from_millis(6000)); diff --git a/servo-media/lib.rs b/servo-media/lib.rs index 88cb35b2..01508621 100644 --- a/servo-media/lib.rs +++ b/servo-media/lib.rs @@ -18,7 +18,7 @@ use player::{Player, PlayerEvent, StreamType}; use streams::capture::MediaTrackConstraintSet; use streams::device_monitor::MediaDeviceMonitor; use streams::registry::MediaStreamId; -use streams::{MediaOutput, MediaSocket, MediaStreamType}; +use streams::{MediaOutput, MediaSocket, MediaSource, MediaStreamType}; use webrtc::{WebRtcController, WebRtcSignaller}; pub struct ServoMedia(Box); @@ -48,7 +48,12 @@ pub trait Backend: Send + Sync { ty: MediaStreamType, ) -> (Box, MediaStreamId); fn create_audioinput_stream(&self, set: MediaTrackConstraintSet) -> Option; - fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option; + fn create_videoinput_stream( + &self, + set: MediaTrackConstraintSet, + source: MediaSource, + ) -> Option; + fn push_stream_data(&self, stream: &MediaStreamId, data: Vec); fn create_audio_context( &self, id: &ClientContextId, @@ -76,7 +81,6 @@ pub trait Backend: Send + Sync { /// and the media instances created for these contexts. /// The client context identifier is currently an abstraction of Servo's PipelineId. fn resume(&self, _id: &ClientContextId) {} - fn get_device_monitor(&self) -> Box; } diff --git a/streams/lib.rs b/streams/lib.rs index ba192984..0bf2e30d 100644 --- a/streams/lib.rs +++ b/streams/lib.rs @@ -14,6 +14,7 @@ pub trait MediaStream: Any + Send { fn as_mut_any(&mut self) -> &mut dyn Any; fn set_id(&mut self, id: registry::MediaStreamId); fn ty(&self) -> MediaStreamType; + fn push_data(&self, data: Vec); } /// A MediaSocket is a way for a backend to represent a @@ -22,6 +23,16 @@ pub trait MediaSocket: Any + Send { fn as_any(&self) -> &dyn Any; } +/// Determines the source of the media stream. +pub enum MediaSource { + // The media stream source is a capture device. + // i.e. getUserMedia + Device, + // The media stream source is the client application. + // i.e. captureStream + App, +} + /// This isn't part of the webrtc spec; it's a leaky abstaction while media streams /// are under development and example consumers need to be able to inspect them. pub trait MediaOutput: Send { From b3b77f3526767c7f22fc7a341e818099c94403fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Mon, 13 Jul 2020 22:19:56 +0200 Subject: [PATCH 2/5] Implement Debug, Deserialize and Serialize for MediaStreamId --- Cargo.lock | 1 + streams/Cargo.toml | 1 + streams/registry.rs | 19 ++++++++++++++++++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ef27c5dc..19358652 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2259,6 +2259,7 @@ name = "servo-media-streams" version = "0.1.0" dependencies = [ "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 62c661c3..d106127b 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -10,4 +10,5 @@ path = "lib.rs" [dependencies] lazy_static = "1.0" +serde = "1.0.66" uuid = { version = "0.8", features = ["v4"] } diff --git a/streams/registry.rs b/streams/registry.rs index 1215e222..e7648105 100644 --- a/streams/registry.rs +++ b/streams/registry.rs @@ -1,5 +1,8 @@ use super::MediaStream; +use serde::de::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::collections::HashMap; +use std::str::FromStr; use std::sync::{Arc, Mutex}; use uuid::Uuid; @@ -8,7 +11,7 @@ lazy_static! { Mutex::new(HashMap::new()); } -#[derive(Clone, Copy, Hash, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] pub struct MediaStreamId(Uuid); impl MediaStreamId { pub fn new() -> MediaStreamId { @@ -20,6 +23,20 @@ impl MediaStreamId { } } +impl Serialize for MediaStreamId { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(&format!("{:p}", &self.0.to_string())) + } +} + +impl<'de> Deserialize<'de> for MediaStreamId { + fn deserialize>(d: D) -> Result { + let value: &str = Deserialize::deserialize(d)?; + let uuid = Uuid::from_str(value).map_err(D::Error::custom)?; + Ok(MediaStreamId(uuid)) + } +} + pub fn register_stream(stream: Arc>) -> MediaStreamId { let id = MediaStreamId::new(); stream.lock().unwrap().set_id(id.clone()); From d46d87dc9ad65001ca9947a9aa40cf36bf8eb4ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Tue, 14 Jul 2020 13:00:30 +0200 Subject: [PATCH 3/5] Use Uuid serde feature --- Cargo.lock | 1 + streams/Cargo.toml | 2 +- streams/lib.rs | 2 ++ streams/registry.rs | 19 +------------------ 4 files changed, 5 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19358652..f8d75ba3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2709,6 +2709,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/streams/Cargo.toml b/streams/Cargo.toml index d106127b..be518960 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -11,4 +11,4 @@ path = "lib.rs" [dependencies] lazy_static = "1.0" serde = "1.0.66" -uuid = { version = "0.8", features = ["v4"] } +uuid = { version = "0.8", features = ["v4", "serde"] } diff --git a/streams/lib.rs b/streams/lib.rs index 0bf2e30d..a65f8983 100644 --- a/streams/lib.rs +++ b/streams/lib.rs @@ -1,5 +1,7 @@ #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate serde; pub mod capture; pub mod device_monitor; diff --git a/streams/registry.rs b/streams/registry.rs index e7648105..42bb0324 100644 --- a/streams/registry.rs +++ b/streams/registry.rs @@ -1,8 +1,5 @@ use super::MediaStream; -use serde::de::Error; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::collections::HashMap; -use std::str::FromStr; use std::sync::{Arc, Mutex}; use uuid::Uuid; @@ -11,7 +8,7 @@ lazy_static! { Mutex::new(HashMap::new()); } -#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Deserialize, Hash, Eq, PartialEq, Serialize)] pub struct MediaStreamId(Uuid); impl MediaStreamId { pub fn new() -> MediaStreamId { @@ -23,20 +20,6 @@ impl MediaStreamId { } } -impl Serialize for MediaStreamId { - fn serialize(&self, serializer: S) -> Result { - serializer.serialize_str(&format!("{:p}", &self.0.to_string())) - } -} - -impl<'de> Deserialize<'de> for MediaStreamId { - fn deserialize>(d: D) -> Result { - let value: &str = Deserialize::deserialize(d)?; - let uuid = Uuid::from_str(value).map_err(D::Error::custom)?; - Ok(MediaStreamId(uuid)) - } -} - pub fn register_stream(stream: Arc>) -> MediaStreamId { let id = MediaStreamId::new(); stream.lock().unwrap().set_id(id.clone()); From 6a8f315ea28fc7a887173344287513dfe054ea8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Wed, 15 Jul 2020 16:43:59 +0200 Subject: [PATCH 4/5] Attach encoding adapters to ServoMediaStreamSrc --- backends/gstreamer/media_stream_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends/gstreamer/media_stream_source.rs b/backends/gstreamer/media_stream_source.rs index 8dc49176..cfa5faf9 100644 --- a/backends/gstreamer/media_stream_source.rs +++ b/backends/gstreamer/media_stream_source.rs @@ -65,7 +65,7 @@ mod imp { // Append a proxysink to the media stream pipeline. let pipeline = stream.pipeline_or_new(); - let last_element = stream.src_element(); + let last_element = stream.encoded(); let sink = gst::ElementFactory::make("proxysink", None).unwrap(); pipeline.add(&sink).unwrap(); gst::Element::link_many(&[&last_element, &sink][..]).unwrap(); From 23083e6a807ab7391456a1277967873bdde07172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20Jim=C3=A9nez=20Moreno?= Date: Tue, 21 Jul 2020 17:01:06 +0200 Subject: [PATCH 5/5] Detach independent video pipeline with a decodebin for media streams --- Cargo.lock | 2 + backends/gstreamer/Cargo.toml | 1 + backends/gstreamer/lib.rs | 1 + backends/gstreamer/media_capture.rs | 21 +++--- backends/gstreamer/media_stream.rs | 92 +++++++++++++++++++---- backends/gstreamer/media_stream_source.rs | 5 ++ backends/gstreamer/webrtc.rs | 2 +- streams/Cargo.toml | 1 + streams/lib.rs | 4 +- 9 files changed, 105 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8d75ba3..807401f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2181,6 +2181,7 @@ version = "0.1.0" dependencies = [ "boxfnonce 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "byte-slice-cast 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "euclid 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", "glib 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", "glib-sys 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "gstreamer 0.15.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2258,6 +2259,7 @@ dependencies = [ name = "servo-media-streams" version = "0.1.0" dependencies = [ + "euclid 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/backends/gstreamer/Cargo.toml b/backends/gstreamer/Cargo.toml index 21c96723..eff77d91 100644 --- a/backends/gstreamer/Cargo.toml +++ b/backends/gstreamer/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" [dependencies] boxfnonce = "0.1.0" +euclid = "0.20" mime = "0.3.13" log = "0.4" diff --git a/backends/gstreamer/lib.rs b/backends/gstreamer/lib.rs index 3ea7656d..77b3aecf 100644 --- a/backends/gstreamer/lib.rs +++ b/backends/gstreamer/lib.rs @@ -1,6 +1,7 @@ #![feature(nll)] extern crate boxfnonce; extern crate byte_slice_cast; +extern crate euclid; extern crate mime; extern crate glib_sys as glib_ffi; diff --git a/backends/gstreamer/media_capture.rs b/backends/gstreamer/media_capture.rs index 3871309f..050de058 100644 --- a/backends/gstreamer/media_capture.rs +++ b/backends/gstreamer/media_capture.rs @@ -1,4 +1,5 @@ use crate::media_stream::GStreamerMediaStream; + use gst; use gst::prelude::*; use servo_media_streams::capture::*; @@ -155,17 +156,19 @@ fn create_input_stream( let devices = GstMediaDevices::new(); devices .get_track(stream_type == MediaStreamType::Video, constraint_set) - .map(|track| { - let f = match stream_type { - MediaStreamType::Audio => GStreamerMediaStream::create_audio_from, - MediaStreamType::Video => GStreamerMediaStream::create_video_from, - }; - f(match source { + .map(|track| match stream_type { + MediaStreamType::Audio => GStreamerMediaStream::create_audio_from(match source { MediaSource::Device => track.element, - MediaSource::App => { - gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed") + MediaSource::App(_) => unimplemented!(), + }), + MediaStreamType::Video => match source { + MediaSource::Device => GStreamerMediaStream::create_video_from(track.element, None), + MediaSource::App(size) => { + let appsrc = + gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed"); + GStreamerMediaStream::create_video_from(appsrc, Some(size)) } - }) + }, }) } diff --git a/backends/gstreamer/media_stream.rs b/backends/gstreamer/media_stream.rs index f4ea13a2..cde23d7e 100644 --- a/backends/gstreamer/media_stream.rs +++ b/backends/gstreamer/media_stream.rs @@ -1,4 +1,6 @@ use super::BACKEND_BASE_TIME; + +use euclid::default::Size2D; use glib::prelude::*; use gst; use gst::prelude::*; @@ -30,6 +32,7 @@ pub struct GStreamerMediaStream { type_: MediaStreamType, elements: Vec, pipeline: Option, + video_app_source: Option, } impl MediaStream for GStreamerMediaStream { @@ -50,12 +53,10 @@ impl MediaStream for GStreamerMediaStream { } fn push_data(&self, data: Vec) { - if let Some(source) = self.elements.last() { - if let Some(appsrc) = source.downcast_ref::() { - let buffer = gst::Buffer::from_slice(data); - if let Err(error) = appsrc.push_buffer(buffer) { - warn!("{}", error); - } + if let Some(ref appsrc) = self.video_app_source { + let buffer = gst::Buffer::from_slice(data); + if let Err(error) = appsrc.push_buffer(buffer) { + warn!("{}", error); } } } @@ -68,6 +69,7 @@ impl GStreamerMediaStream { type_, elements, pipeline: None, + video_app_source: None, } } @@ -103,6 +105,10 @@ impl GStreamerMediaStream { self.elements.last().unwrap().clone() } + pub fn first_element(&self) -> gst::Element { + self.elements.first().unwrap().clone() + } + pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) { assert!(self.pipeline.is_none()); let elements: Vec<_> = self.elements.iter().collect(); @@ -135,7 +141,7 @@ impl GStreamerMediaStream { .set_property("is-live", &true) .expect("videotestsrc doesn't have expected 'is-live' property"); - Self::create_video_from(videotestsrc) + Self::create_video_from(videotestsrc, None) } /// Attaches encoding adapters to the stream, returning the source element @@ -186,14 +192,74 @@ impl GStreamerMediaStream { } } - pub fn create_video_from(source: gst::Element) -> MediaStreamId { + pub fn set_video_app_source(&mut self, source: &AppSrc) { + self.video_app_source = Some(source.clone()); + } + + pub fn create_video_from(source: gst::Element, size: Option>) -> MediaStreamId { + let src = gst::ElementFactory::make("proxysrc", None).unwrap(); let videoconvert = gst::ElementFactory::make("videoconvert", None).unwrap(); let queue = gst::ElementFactory::make("queue", None).unwrap(); - - register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new( + let stream = Arc::new(Mutex::new(GStreamerMediaStream::new( MediaStreamType::Video, - vec![source, videoconvert, queue], - )))) + vec![src, videoconvert, queue], + ))); + + let pipeline = gst::Pipeline::new(Some("video pipeline")); + let clock = gst::SystemClock::obtain(); + pipeline.set_start_time(gst::ClockTime::none()); + pipeline.set_base_time(*BACKEND_BASE_TIME); + pipeline.use_clock(Some(&clock)); + + let decodebin = gst::ElementFactory::make("decodebin", None).unwrap(); + + let stream_ = stream.clone(); + let video_pipeline = pipeline.clone(); + decodebin.connect_pad_added(move |decodebin, _| { + // Append a proxysink to the video pipeline. + let proxy_sink = gst::ElementFactory::make("proxysink", None).unwrap(); + video_pipeline.add(&proxy_sink).unwrap(); + gst::Element::link_many(&[decodebin, &proxy_sink]).unwrap(); + + // And connect the video and media stream pipelines. + let stream = stream_.lock().unwrap(); + let first_element = stream.first_element(); + first_element + .set_property("proxysink", &proxy_sink) + .unwrap(); + + proxy_sink.sync_state_with_parent().unwrap(); + decodebin.sync_state_with_parent().unwrap(); + }); + + if let Some(size) = size { + let caps = gst::Caps::builder("video/x-raw") + .field("format", &gst_video::VideoFormat::Bgra.to_string()) + .field("pixel-aspect-ratio", &gst::Fraction::from((1, 1))) + .field("width", &(size.width as i32)) + .field("height", &(size.height as i32)) + .build(); + source + .set_property("caps", &caps) + .expect("source doesn't have expected 'caps' property"); + } + + if let Some(appsrc) = source.downcast_ref::() { + appsrc.set_property_format(gst::Format::Time); + stream.lock().unwrap().set_video_app_source(appsrc); + } + + pipeline.add_many(&[&source, &decodebin]).unwrap(); + gst::Element::link_many(&[&source, &decodebin]).unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + #[cfg(debug_assertions)] + pipeline + .upcast::() + .debug_to_dot_file(gst::DebugGraphDetails::all(), "VideoPipeline_PLAYING"); + + register_stream(stream) } pub fn create_audio() -> MediaStreamId { @@ -224,7 +290,7 @@ impl GStreamerMediaStream { proxy_src.set_property("proxysink", &proxy_sink).unwrap(); let stream = match ty { MediaStreamType::Audio => Self::create_audio_from(proxy_src), - MediaStreamType::Video => Self::create_video_from(proxy_src), + MediaStreamType::Video => Self::create_video_from(proxy_src, None), }; (stream, GstreamerMediaSocket { proxy_sink }) diff --git a/backends/gstreamer/media_stream_source.rs b/backends/gstreamer/media_stream_source.rs index cfa5faf9..629c891d 100644 --- a/backends/gstreamer/media_stream_source.rs +++ b/backends/gstreamer/media_stream_source.rs @@ -77,6 +77,11 @@ mod imp { sink.sync_state_with_parent().unwrap(); pipeline.set_state(gst::State::Playing).unwrap(); + + #[cfg(debug_assertions)] + pipeline + .upcast::() + .debug_to_dot_file(gst::DebugGraphDetails::all(), "ServoMediaStreamSrc_PLAYING"); } fn setup_proxy_src( diff --git a/backends/gstreamer/webrtc.rs b/backends/gstreamer/webrtc.rs index fe393bc5..6689c219 100644 --- a/backends/gstreamer/webrtc.rs +++ b/backends/gstreamer/webrtc.rs @@ -744,7 +744,7 @@ fn on_incoming_decodebin_stream( let (stream, ty) = if name == "video" { ( - GStreamerMediaStream::create_video_from(proxy_src), + GStreamerMediaStream::create_video_from(proxy_src, None), MediaStreamType::Video, ) } else { diff --git a/streams/Cargo.toml b/streams/Cargo.toml index be518960..0f3b8a2a 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -9,6 +9,7 @@ name = "servo_media_streams" path = "lib.rs" [dependencies] +euclid = "0.20" lazy_static = "1.0" serde = "1.0.66" uuid = { version = "0.8", features = ["v4", "serde"] } diff --git a/streams/lib.rs b/streams/lib.rs index a65f8983..aeae7d81 100644 --- a/streams/lib.rs +++ b/streams/lib.rs @@ -1,3 +1,4 @@ +extern crate euclid; #[macro_use] extern crate lazy_static; #[macro_use] @@ -7,6 +8,7 @@ pub mod capture; pub mod device_monitor; pub mod registry; +use euclid::default::Size2D; use std::any::Any; pub use registry::*; @@ -32,7 +34,7 @@ pub enum MediaSource { Device, // The media stream source is the client application. // i.e. captureStream - App, + App(Size2D), } /// This isn't part of the webrtc spec; it's a leaky abstaction while media streams