Skip to content

Allow consuming stream data from a client app #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions backends/dummy/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use servo_media_player::{audio, video, Player, PlayerError, PlayerEvent, StreamT
use servo_media_streams::capture::MediaTrackConstraintSet;
use servo_media_streams::device_monitor::{MediaDeviceInfo, MediaDeviceMonitor};
use servo_media_streams::registry::{register_stream, unregister_stream, MediaStreamId};
use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType};
use servo_media_streams::{MediaOutput, MediaSocket, MediaSource, MediaStream, MediaStreamType};
use servo_media_traits::{ClientContextId, MediaInstance};
use servo_media_webrtc::{
thread, BundlePolicy, DataChannelId, DataChannelInit, DataChannelMessage, IceCandidate,
Expand Down Expand Up @@ -74,7 +74,11 @@ impl Backend for DummyBackend {
(Box::new(DummySocket), id)
}

fn create_videoinput_stream(&self, _: MediaTrackConstraintSet) -> Option<MediaStreamId> {
fn create_videoinput_stream(
&self,
_: MediaTrackConstraintSet,
_: MediaSource,
) -> Option<MediaStreamId> {
Some(register_stream(Arc::new(Mutex::new(DummyMediaStream {
id: MediaStreamId::new(),
}))))
Expand Down Expand Up @@ -118,6 +122,8 @@ impl Backend for DummyBackend {
fn get_device_monitor(&self) -> Box<dyn MediaDeviceMonitor> {
Box::new(DummyMediaDeviceMonitor {})
}

fn push_stream_data(&self, _: &MediaStreamId, _: Vec<u8>) {}
}

impl AudioBackend for DummyBackend {
Expand Down Expand Up @@ -242,6 +248,8 @@ impl MediaStream for DummyMediaStream {
fn ty(&self) -> MediaStreamType {
MediaStreamType::Audio
}

fn push_data(&self, _: Vec<u8>) {}
}

impl Drop for DummyMediaStream {
Expand Down
1 change: 1 addition & 0 deletions backends/gstreamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "lib.rs"

[dependencies]
boxfnonce = "0.1.0"
euclid = "0.20"
mime = "0.3.13"
log = "0.4"

Expand Down
15 changes: 12 additions & 3 deletions backends/gstreamer/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(nll)]
extern crate boxfnonce;
extern crate byte_slice_cast;
extern crate euclid;
extern crate mime;

extern crate glib_sys as glib_ffi;
Expand Down Expand Up @@ -64,7 +65,7 @@ use servo_media_player::{Player, PlayerEvent, StreamType};
use servo_media_streams::capture::MediaTrackConstraintSet;
use servo_media_streams::device_monitor::MediaDeviceMonitor;
use servo_media_streams::registry::MediaStreamId;
use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType};
use servo_media_streams::{MediaOutput, MediaSocket, MediaSource, MediaStreamType};
use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller};
use std::collections::HashMap;
Expand Down Expand Up @@ -245,12 +246,20 @@ impl Backend for GStreamerBackend {
media_capture::create_audioinput_stream(set)
}

fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
fn create_videoinput_stream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be doing this in create_videoinput_stream, that's specifically for screen capture input. We should have a separate create_capture_stream IMO.

&self,
set: MediaTrackConstraintSet,
source: MediaSource,
) -> Option<MediaStreamId> {
if self.capture_mocking.load(Ordering::Acquire) {
// XXXManishearth we should caps filter this
return Some(self.create_videostream());
}
media_capture::create_videoinput_stream(set)
media_capture::create_videoinput_stream(set, source)
}

fn push_stream_data(&self, stream: &MediaStreamId, data: Vec<u8>) {
GStreamerMediaStream::push_data(stream, data);
}

fn can_play_type(&self, media_type: &str) -> SupportsMediaType {
Expand Down
32 changes: 22 additions & 10 deletions backends/gstreamer/media_capture.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::media_stream::GStreamerMediaStream;

use gst;
use gst::prelude::*;
use servo_media_streams::capture::*;
use servo_media_streams::registry::MediaStreamId;
use servo_media_streams::MediaStreamType;
use servo_media_streams::{MediaSource, MediaStreamType};
use std::i32;

trait AddToCaps {
Expand Down Expand Up @@ -150,23 +151,34 @@ pub struct GstMediaTrack {
fn create_input_stream(
stream_type: MediaStreamType,
constraint_set: MediaTrackConstraintSet,
source: MediaSource,
) -> Option<MediaStreamId> {
let devices = GstMediaDevices::new();
devices
.get_track(stream_type == MediaStreamType::Video, constraint_set)
.map(|track| {
let f = match stream_type {
MediaStreamType::Audio => GStreamerMediaStream::create_audio_from,
MediaStreamType::Video => GStreamerMediaStream::create_video_from,
};
f(track.element)
.map(|track| match stream_type {
MediaStreamType::Audio => GStreamerMediaStream::create_audio_from(match source {
MediaSource::Device => track.element,
MediaSource::App(_) => unimplemented!(),
}),
MediaStreamType::Video => match source {
MediaSource::Device => GStreamerMediaStream::create_video_from(track.element, None),
MediaSource::App(size) => {
let appsrc =
gst::ElementFactory::make("appsrc", None).expect("appsrc creation failed");
GStreamerMediaStream::create_video_from(appsrc, Some(size))
}
},
})
}

pub fn create_audioinput_stream(constraint_set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
create_input_stream(MediaStreamType::Audio, constraint_set)
create_input_stream(MediaStreamType::Audio, constraint_set, MediaSource::Device)
}

pub fn create_videoinput_stream(constraint_set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
create_input_stream(MediaStreamType::Video, constraint_set)
pub fn create_videoinput_stream(
constraint_set: MediaTrackConstraintSet,
source: MediaSource,
) -> Option<MediaStreamId> {
create_input_stream(MediaStreamType::Video, constraint_set, source)
}
102 changes: 95 additions & 7 deletions backends/gstreamer/media_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::BACKEND_BASE_TIME;

use euclid::default::Size2D;
use glib::prelude::*;
use gst;
use gst::prelude::*;
use gst_app::AppSrc;
use servo_media_streams::registry::{
get_stream, register_stream, unregister_stream, MediaStreamId,
};
Expand Down Expand Up @@ -29,6 +32,7 @@ pub struct GStreamerMediaStream {
type_: MediaStreamType,
elements: Vec<gst::Element>,
pipeline: Option<gst::Pipeline>,
video_app_source: Option<AppSrc>,
}

impl MediaStream for GStreamerMediaStream {
Expand All @@ -47,6 +51,15 @@ impl MediaStream for GStreamerMediaStream {
fn ty(&self) -> MediaStreamType {
self.type_
}

fn push_data(&self, data: Vec<u8>) {
if let Some(ref appsrc) = self.video_app_source {
let buffer = gst::Buffer::from_slice(data);
if let Err(error) = appsrc.push_buffer(buffer) {
warn!("{}", error);
}
}
}
}

impl GStreamerMediaStream {
Expand All @@ -56,6 +69,7 @@ impl GStreamerMediaStream {
type_,
elements,
pipeline: None,
video_app_source: None,
}
}

Expand Down Expand Up @@ -91,6 +105,10 @@ impl GStreamerMediaStream {
self.elements.last().unwrap().clone()
}

pub fn first_element(&self) -> gst::Element {
self.elements.first().unwrap().clone()
}

pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) {
assert!(self.pipeline.is_none());
let elements: Vec<_> = self.elements.iter().collect();
Expand Down Expand Up @@ -123,7 +141,7 @@ impl GStreamerMediaStream {
.set_property("is-live", &true)
.expect("videotestsrc doesn't have expected 'is-live' property");

Self::create_video_from(videotestsrc)
Self::create_video_from(videotestsrc, None)
}

/// Attaches encoding adapters to the stream, returning the source element
Expand Down Expand Up @@ -174,14 +192,74 @@ impl GStreamerMediaStream {
}
}

pub fn create_video_from(source: gst::Element) -> MediaStreamId {
pub fn set_video_app_source(&mut self, source: &AppSrc) {
self.video_app_source = Some(source.clone());
}

pub fn create_video_from(source: gst::Element, size: Option<Size2D<u32>>) -> MediaStreamId {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be doing decoding within create_video_from, it's the responsibility of the client to do that (see webrtc, all media streams are raw, this was a deliberate choice and without it we will have problems)

let src = gst::ElementFactory::make("proxysrc", None).unwrap();
let videoconvert = gst::ElementFactory::make("videoconvert", None).unwrap();
let queue = gst::ElementFactory::make("queue", None).unwrap();

register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
let stream = Arc::new(Mutex::new(GStreamerMediaStream::new(
MediaStreamType::Video,
vec![source, videoconvert, queue],
))))
vec![src, videoconvert, queue],
)));

let pipeline = gst::Pipeline::new(Some("video pipeline"));
let clock = gst::SystemClock::obtain();
pipeline.set_start_time(gst::ClockTime::none());
pipeline.set_base_time(*BACKEND_BASE_TIME);
pipeline.use_clock(Some(&clock));

let decodebin = gst::ElementFactory::make("decodebin", None).unwrap();

let stream_ = stream.clone();
let video_pipeline = pipeline.clone();
decodebin.connect_pad_added(move |decodebin, _| {
// Append a proxysink to the video pipeline.
let proxy_sink = gst::ElementFactory::make("proxysink", None).unwrap();
video_pipeline.add(&proxy_sink).unwrap();
gst::Element::link_many(&[decodebin, &proxy_sink]).unwrap();

// And connect the video and media stream pipelines.
let stream = stream_.lock().unwrap();
let first_element = stream.first_element();
first_element
.set_property("proxysink", &proxy_sink)
.unwrap();

proxy_sink.sync_state_with_parent().unwrap();
decodebin.sync_state_with_parent().unwrap();
});

if let Some(size) = size {
let caps = gst::Caps::builder("video/x-raw")
.field("format", &gst_video::VideoFormat::Bgra.to_string())
.field("pixel-aspect-ratio", &gst::Fraction::from((1, 1)))
.field("width", &(size.width as i32))
.field("height", &(size.height as i32))
.build();
source
.set_property("caps", &caps)
.expect("source doesn't have expected 'caps' property");
}

if let Some(appsrc) = source.downcast_ref::<AppSrc>() {
appsrc.set_property_format(gst::Format::Time);
stream.lock().unwrap().set_video_app_source(appsrc);
}

pipeline.add_many(&[&source, &decodebin]).unwrap();
gst::Element::link_many(&[&source, &decodebin]).unwrap();

pipeline.set_state(gst::State::Playing).unwrap();

#[cfg(debug_assertions)]
pipeline
.upcast::<gst::Bin>()
.debug_to_dot_file(gst::DebugGraphDetails::all(), "VideoPipeline_PLAYING");

register_stream(stream)
}

pub fn create_audio() -> MediaStreamId {
Expand Down Expand Up @@ -212,11 +290,21 @@ impl GStreamerMediaStream {
proxy_src.set_property("proxysink", &proxy_sink).unwrap();
let stream = match ty {
MediaStreamType::Audio => Self::create_audio_from(proxy_src),
MediaStreamType::Video => Self::create_video_from(proxy_src),
MediaStreamType::Video => Self::create_video_from(proxy_src, None),
};

(stream, GstreamerMediaSocket { proxy_sink })
}

pub fn push_data(stream: &MediaStreamId, data: Vec<u8>) {
let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
let mut stream = stream.lock().unwrap();
let stream = stream
.as_mut_any()
.downcast_mut::<GStreamerMediaStream>()
.unwrap();
stream.push_data(data);
}
}

impl Drop for GStreamerMediaStream {
Expand Down
7 changes: 6 additions & 1 deletion backends/gstreamer/media_stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod imp {

// Append a proxysink to the media stream pipeline.
let pipeline = stream.pipeline_or_new();
let last_element = stream.src_element();
let last_element = stream.encoded();
let sink = gst::ElementFactory::make("proxysink", None).unwrap();
pipeline.add(&sink).unwrap();
gst::Element::link_many(&[&last_element, &sink][..]).unwrap();
Expand All @@ -77,6 +77,11 @@ mod imp {
sink.sync_state_with_parent().unwrap();

pipeline.set_state(gst::State::Playing).unwrap();

#[cfg(debug_assertions)]
pipeline
.upcast::<gst::Bin>()
.debug_to_dot_file(gst::DebugGraphDetails::all(), "ServoMediaStreamSrc_PLAYING");
}

fn setup_proxy_src(
Expand Down
2 changes: 1 addition & 1 deletion backends/gstreamer/webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ fn on_incoming_decodebin_stream(

let (stream, ty) = if name == "video" {
(
GStreamerMediaStream::create_video_from(proxy_src),
GStreamerMediaStream::create_video_from(proxy_src, None),
MediaStreamType::Video,
)
} else {
Expand Down
2 changes: 1 addition & 1 deletion examples/simple_webrtc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl State {
let (video, audio) = if !self.peer_id.is_some() {
(
self.media
.create_videoinput_stream(Default::default())
.create_videoinput_stream(Default::default(), MediaSource::Device)
.unwrap_or_else(|| self.media.create_videostream()),
self.media
.create_audioinput_stream(Default::default())
Expand Down
5 changes: 4 additions & 1 deletion examples/videoinput_stream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
extern crate servo_media;
extern crate servo_media_auto;

use servo_media::streams::MediaSource;
use servo_media::ServoMedia;
use std::sync::Arc;
use std::{thread, time};

fn run_example(servo_media: Arc<ServoMedia>) {
if let Some(stream) = servo_media.create_videoinput_stream(Default::default()) {
if let Some(stream) =
servo_media.create_videoinput_stream(Default::default(), MediaSource::Device)
{
let mut output = servo_media.create_stream_output();
output.add_stream(&stream);
thread::sleep(time::Duration::from_millis(6000));
Expand Down
Loading