Skip to content

Commit b4a2310

Browse files
Auto merge of #614 - lennart/CLOUD-154, r=TroyKomodo
feat(ingest): rtmp + rtmps server This PR adds an RTMP and an RTMPS server to ingest. The actual handling of events and data is left unimplemented. CLOUD-154 Requested-by: TroyKomodo <[email protected]> Reviewed-by: TroyKomodo <[email protected]>
2 parents 070c416 + 98dfee2 commit b4a2310

24 files changed

+327
-93
lines changed

Cargo.lock

Lines changed: 29 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cloud/video/ingest/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ deps = [
99
"//crates/context",
1010
"//crates/settings",
1111
"//crates/signal",
12+
"//crates/rtmp",
1213
"//cloud/video/ingest/traits",
1314
]
1415

cloud/video/ingest/Cargo.toml

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@ path = "bin/standalone/main.rs"
1717
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] }
1818

1919
[dependencies]
20-
anyhow = "1.0.98"
20+
anyhow = "1"
2121
ingest-traits = { path = "./traits", package = "scufflecloud-ingest-traits" }
2222
scuffle-bootstrap = { path = "../../../crates/bootstrap" }
2323
scuffle-bootstrap-telemetry = { features = ["opentelemetry-logs", "opentelemetry-traces"], path = "../../../crates/bootstrap-telemetry" }
2424
scuffle-context = { path = "../../../crates/context" }
25+
scuffle-rtmp = { path = "../../../crates/rtmp" }
2526
scuffle-settings = { features = ["all-formats", "bootstrap"], path = "../../../crates/settings" }
2627
scuffle-signal = { features = ["bootstrap"], path = "../../../crates/signal" }
27-
serde = "1.0.219"
28-
serde_derive = "1.0.219"
29-
smart-default = "0.7.1"
30-
tracing = "0.1.41"
31-
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
28+
serde = "1"
29+
serde_derive = "1"
30+
smart-default = "0.7"
31+
tokio = "1"
32+
tokio-rustls = "0.26"
33+
tracing = "0.1"
34+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
3235

3336
[package.metadata.sync-readme.badges]
3437
docs-rs = false

cloud/video/ingest/bin/standalone/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::net::SocketAddr;
2+
use std::path::PathBuf;
23

34
#[derive(serde_derive::Deserialize, smart_default::SmartDefault, Debug, Clone)]
45
#[serde(default)]
@@ -7,11 +8,26 @@ pub(crate) struct Config {
78
pub service_name: String,
89
#[default = "info"]
910
pub level: String,
11+
#[default("[::]:1935".parse().unwrap())]
12+
pub rtmp_bind: SocketAddr,
13+
pub rtmps: Option<RtmpsConfig>,
1014
pub telemetry: Option<TelemetryConfig>,
1115
}
1216

1317
scuffle_settings::bootstrap!(Config);
1418

19+
#[derive(serde_derive::Deserialize, smart_default::SmartDefault, Debug, Clone)]
20+
pub(crate) struct RtmpsConfig {
21+
// https://github.com/obsproject/obs-studio/blob/0b1229632063a13dfd26cf1cd9dd43431d8c68f6/plugins/obs-outputs/librtmp/rtmp.c#L718
22+
// Yes, RTMPS' default port is 443
23+
#[default("[::]:443".parse().unwrap())]
24+
pub bind: SocketAddr,
25+
#[default = "rtmps_certs.pem"]
26+
pub cert_chain_path: PathBuf,
27+
#[default = "rtmps_key.pem"]
28+
pub key_path: PathBuf,
29+
}
30+
1531
#[derive(serde_derive::Deserialize, smart_default::SmartDefault, Debug, Clone)]
1632
pub(crate) struct TelemetryConfig {
1733
#[default("[::1]:4317".parse().unwrap())]

cloud/video/ingest/bin/standalone/main.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
use std::sync::Arc;
99

10+
use anyhow::Context;
1011
use scuffle_bootstrap_telemetry::opentelemetry;
1112
use scuffle_bootstrap_telemetry::opentelemetry_sdk::logs::SdkLoggerProvider;
1213
use scuffle_bootstrap_telemetry::opentelemetry_sdk::trace::SdkTracerProvider;
14+
use tokio_rustls::rustls::pki_types::pem::PemObject;
15+
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
1316
use tracing_subscriber::Layer;
1417
use tracing_subscriber::layer::SubscriberExt;
1518
use tracing_subscriber::util::SubscriberInitExt;
@@ -18,9 +21,39 @@ mod config;
1821

1922
struct Global {
2023
config: config::Config,
24+
rtmps: Option<RtmpsGlobal>,
2125
open_telemetry: opentelemetry::OpenTelemetry,
2226
}
2327

28+
struct RtmpsGlobal {
29+
bind: std::net::SocketAddr,
30+
rustls_config: Arc<tokio_rustls::rustls::ServerConfig>,
31+
}
32+
33+
impl ingest_traits::RtmpConfigInterface for RtmpsGlobal {
34+
fn rtmps_bind(&self) -> std::net::SocketAddr {
35+
self.bind
36+
}
37+
38+
fn rtmps_rustls_server_config(&self) -> Arc<tokio_rustls::rustls::ServerConfig> {
39+
self.rustls_config.clone()
40+
}
41+
}
42+
43+
impl ingest_traits::ConfigInterface for Global {
44+
fn rtmp_bind(&self) -> std::net::SocketAddr {
45+
self.config.rtmp_bind
46+
}
47+
}
48+
49+
impl ingest_traits::RtmpsInterface for Global {
50+
type RtmpsConfig = RtmpsGlobal;
51+
52+
fn rtmps_config(&self) -> Option<&Self::RtmpsConfig> {
53+
self.rtmps.as_ref()
54+
}
55+
}
56+
2457
impl ingest_traits::Global for Global {}
2558

2659
impl scuffle_signal::SignalConfig for Global {}
@@ -54,14 +87,38 @@ impl scuffle_bootstrap::Global for Global {
5487
)
5588
.init();
5689

90+
let rtmps = if let Some(rtmps) = config.rtmps.as_ref() {
91+
let cert_chain = CertificateDer::pem_file_iter(&rtmps.cert_chain_path)
92+
.context("load RTMPS cert chain file")?
93+
.collect::<Result<Vec<_>, _>>()
94+
.context("load RTMPS cert chain")?;
95+
let key_der = PrivateKeyDer::from_pem_file(&rtmps.key_path).context("load RTMPS private key")?;
96+
97+
let rustls_config = tokio_rustls::rustls::ServerConfig::builder()
98+
.with_no_client_auth()
99+
.with_single_cert(cert_chain, key_der)
100+
.context("create RTMPS rustls server config")?;
101+
102+
Some(RtmpsGlobal {
103+
bind: rtmps.bind,
104+
rustls_config: Arc::new(rustls_config),
105+
})
106+
} else {
107+
None
108+
};
109+
57110
let tracer = SdkTracerProvider::default();
58111
opentelemetry::global::set_tracer_provider(tracer.clone());
59112

60113
let logger = SdkLoggerProvider::builder().build();
61114

62115
let open_telemetry = opentelemetry::OpenTelemetry::new().with_traces(tracer).with_logs(logger);
63116

64-
Ok(Arc::new(Self { config, open_telemetry }))
117+
Ok(Arc::new(Self {
118+
config,
119+
rtmps,
120+
open_telemetry,
121+
}))
65122
}
66123
}
67124

0 commit comments

Comments
 (0)