Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Stop all running streams after janus disconnection (#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
0nkery authored Oct 17, 2022
1 parent 7837645 commit 4ad831e
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 32 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Andrei Nesterov <[email protected]>"]
edition = "2018"
name = "conference"
version = "0.6.50"
version = "0.6.51"

[dependencies]
anyhow = "1"
Expand Down Expand Up @@ -59,6 +59,7 @@ tracing-futures = "0.2"
tracing-subscriber = "0.2"
uuid = { version = "0.6", features = ["v4", "serde"] }
webrtc-sdp = "0.3"
lazy_static = "1.4.0"

[features]
local_ip = ["local-ip-address"]
Expand Down
2 changes: 1 addition & 1 deletion chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ version: 0.2.4
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v0.6.50"
appVersion: "v0.6.51"
6 changes: 3 additions & 3 deletions src/app/endpoint/rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::extract::{Extension, Path, Query};
use chrono::{DateTime, Utc};

use serde::Deserialize;
use std::{result::Result as StdResult, sync::Arc};
use std::sync::Arc;
use svc_agent::mqtt::{
OutgoingEvent, OutgoingEventProperties, OutgoingMessage, ResponseStatus,
ShortTermTimingProperties,
Expand Down Expand Up @@ -169,11 +169,11 @@ pub fn update_event(
room_id: db::room::Id,
object: db::janus_rtc_stream::Object,
start_timestamp: DateTime<Utc>,
) -> StdResult<ObjectUpdateEvent, AppError> {
) -> ObjectUpdateEvent {
let uri = format!("rooms/{}/events", room_id);
let timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = OutgoingEventProperties::new("rtc_stream.update", timing);
Ok(OutgoingEvent::broadcast(object, props, &uri))
OutgoingEvent::broadcast(object, props, &uri)
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub async fn run(
db.clone(),
config.waitlist_epoch_duration,
own_ip_addr,
Some(agent.clone()),
);

thread::spawn({
Expand Down
77 changes: 61 additions & 16 deletions src/backend/janus/client_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::anyhow;
use chrono::Utc;
use diesel::Connection;
use std::{
collections::{hash_map::Entry, HashMap},
Expand All @@ -9,12 +10,16 @@ use std::{
},
time::Duration,
};
use svc_agent::mqtt::Agent;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, warn};

use crate::{
app::{endpoint::rtc_signal::CreateResponseData, error::Error},
db::{agent_connection, janus_backend, ConnectionPool},
app::{
endpoint::{rtc_signal::CreateResponseData, rtc_stream},
error::Error,
},
db::{agent_connection, janus_backend, janus_rtc_stream, ConnectionPool},
util::spawn_blocking,
};

Expand All @@ -31,6 +36,7 @@ pub struct Clients {
db: ConnectionPool,
stream_waitlist: WaitList<Result<CreateResponseData, Error>>,
ip_addr: IpAddr,
mqtt_agent: Option<Agent>,
}

impl Clients {
Expand All @@ -40,6 +46,7 @@ impl Clients {
db: ConnectionPool,
waitlist_epoch_duration: std::time::Duration,
ip_addr: IpAddr,
mqtt_agent: Option<Agent>,
) -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
Expand All @@ -48,6 +55,7 @@ impl Clients {
db,
stream_waitlist: WaitList::new(waitlist_epoch_duration),
ip_addr,
mqtt_agent,
}
}

Expand Down Expand Up @@ -82,6 +90,7 @@ impl Clients {
Entry::Occupied(o) => Ok(o.get().client.clone()),
Entry::Vacant(v) => {
let this = self.clone();
let mqtt_agent = self.mqtt_agent.clone();
let client = JanusClient::new(backend.janus_url())?;
let session_id = backend.session_id();
let is_cancelled = Arc::new(AtomicBool::new(false));
Expand All @@ -99,7 +108,16 @@ impl Clients {
clients: &this,
backend: &backend,
};
start_polling(client, session_id, sink, db, &is_cancelled, &backend).await;
start_polling(
client,
session_id,
sink,
db,
&is_cancelled,
&backend,
mqtt_agent,
)
.await;
}
});
Ok(client)
Expand Down Expand Up @@ -157,11 +175,12 @@ async fn start_polling(
db: ConnectionPool,
is_cancelled: &AtomicBool,
janus_backend: &janus_backend::Object,
mqtt_agent: Option<Agent>,
) {
let mut fail_retries_count = 5;
loop {
if fail_retries_count == 0 {
remove_backend(janus_backend, db).await;
remove_backend(janus_backend, db, mqtt_agent).await;
break;
}
if is_cancelled.load(Ordering::SeqCst) {
Expand All @@ -171,7 +190,7 @@ async fn start_polling(
match poll_result {
Ok(PollResult::SessionNotFound) => {
warn!(?janus_backend, "Session not found");
remove_backend(janus_backend, db).await;
remove_backend(janus_backend, db, mqtt_agent).await;
break;
}
Ok(PollResult::Events(events)) => {
Expand Down Expand Up @@ -202,29 +221,55 @@ async fn start_polling(
}
}

async fn remove_backend(backend: &janus_backend::Object, db: ConnectionPool) {
async fn remove_backend(backend: &janus_backend::Object, db: ConnectionPool, agent: Option<Agent>) {
let result = spawn_blocking({
let backend = backend.clone();
move || {
let conn = db.get()?;
conn.transaction::<_, diesel::result::Error, _>(|| {
let deleted = janus_backend::DeleteQuery::new(
let stopped_rtcs_streams = conn.transaction::<_, diesel::result::Error, _>(|| {
janus_backend::DeleteQuery::new(
backend.id(),
backend.session_id(),
backend.handle_id(),
)
.execute(&conn)?;
if deleted > 0 {
agent_connection::BulkDisconnectByBackendQuery::new(backend.id())
.execute(&conn)?;
}
Ok(())

// since backend can be up again we should disconnect everyone and stop
// all running streams regardless of whether backend was deleted or not
agent_connection::BulkDisconnectByBackendQuery::new(backend.id()).execute(&conn)?;
let stopped_streams =
janus_rtc_stream::stop_running_streams_by_backend(backend.id(), &conn)?;

Ok(stopped_streams)
})?;
Ok::<_, anyhow::Error>(())
Ok::<_, anyhow::Error>(stopped_rtcs_streams)
}
})
.await;
if let Err(err) = result {
error!(backend = ?backend, ?err, "Error removing backend");

match (result, agent) {
(Ok(stopped_rtcs_streams), Some(mut agent)) => {
let now = Utc::now();
for stream in stopped_rtcs_streams {
let end_time = match stream.janus_rtc_stream.time() {
Some((_start, end)) => match end {
std::ops::Bound::Included(t) | std::ops::Bound::Excluded(t) => t,
std::ops::Bound::Unbounded => continue,
},
None => now,
};
let update_evt =
rtc_stream::update_event(stream.room_id, stream.janus_rtc_stream, end_time);
if let Err(err) = agent.publish(update_evt) {
error!(backend = ?backend, ?err, "Failed to publish rtc_stream.update evt");
}
}
}
(Ok(_streams), None) => {
// not sending events since no agent provided
}
(Err(err), _) => {
error!(backend = ?backend, ?err, "Error removing backend");
}
}
}
4 changes: 2 additions & 2 deletions src/backend/janus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn handle_event_impl<C: Context>(
)?;

let event =
endpoint::rtc_stream::update_event(room.id(), rtc_stream, start_timestamp)?;
endpoint::rtc_stream::update_event(room.id(), rtc_stream, start_timestamp);

Ok(Box::new(stream::once(std::future::ready(
Box::new(event) as Box<dyn IntoPublishableMessage + Send + Sync + 'static>
Expand Down Expand Up @@ -501,7 +501,7 @@ async fn handle_hangup_detach<C: Context>(

// Send rtc_stream.update event.
let event =
endpoint::rtc_stream::update_event(room_id, rtc_stream, start_timestamp)?;
endpoint::rtc_stream::update_event(room_id, rtc_stream, start_timestamp);

let boxed_event =
Box::new(event) as Box<dyn IntoPublishableMessage + Send + Sync + 'static>;
Expand Down
12 changes: 4 additions & 8 deletions src/db/agent_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,13 @@ mod tests {
.room_id(room.id())
.status(db::agent::Status::Ready)
.insert(&conn);
let old_agent_conn = factory::AgentConnection::new(
factory::AgentConnection::new(
*old.id(),
rtc.id(),
crate::backend::janus::client::HandleId::random(),
)
.created_at(Utc::now() - Duration::minutes(20))
.insert(&conn);
println!("{old_agent_conn:?}");

let new = factory::Agent::new()
.agent_id(new.agent_id())
Expand All @@ -317,12 +316,9 @@ mod tests {
crate::backend::janus::client::HandleId::random(),
)
.insert(&conn);
println!("{new_agent_conn:?}");
let new_agent_conn =
UpdateQuery::new(new_agent_conn.handle_id(), Status::Connected)
.execute(&conn)
.unwrap();
println!("{new_agent_conn:?}");
UpdateQuery::new(new_agent_conn.handle_id(), Status::Connected)
.execute(&conn)
.unwrap();

room
})
Expand Down
70 changes: 70 additions & 0 deletions src/db/janus_rtc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,73 @@ pub fn stop(id: db::janus_rtc_stream::Id, conn: &PgConnection) -> Result<Option<
.get_result(conn)
.optional()
}

lazy_static::lazy_static!(
// Diesel doesn't support joins in UPDATE/DELETE queries so it's raw SQL.
static ref BULK_STOP_BY_BACKEND_SQL: String = format!(
r#"
UPDATE "janus_rtc_stream"
SET "time" = {STOP_TIME_SQL}
FROM "rtc"
WHERE "rtc"."id" = "janus_rtc_stream"."rtc_id"
AND {ACTIVE_SQL}
RETURNING "janus_rtc_stream".*, "rtc"."room_id"
"#
);
);

#[derive(Debug, Deserialize, Serialize, Queryable, QueryableByName, Associations)]
pub struct StreamWithRoomId {
#[diesel(embed)]
pub janus_rtc_stream: Object,
#[sql_type = "diesel::sql_types::Uuid"]
pub room_id: db::room::Id,
}

pub fn stop_running_streams_by_backend(
backend_id: &AgentId,
conn: &PgConnection,
) -> Result<Vec<StreamWithRoomId>, Error> {
use crate::db::sql::Agent_id;
use diesel::prelude::*;

diesel::sql_query(&*BULK_STOP_BY_BACKEND_SQL)
.bind::<Agent_id, _>(backend_id)
.get_results(conn)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::{prelude::*, test_deps::LocalDeps};

#[test]
fn test_stop_running_streams_by_backend() {
let local_deps = LocalDeps::new();
let postgres = local_deps.run_postgres();
let db = TestDb::with_local_postgres(&postgres);

let rtc_stream = db
.connection_pool()
.get()
.map(|conn| {
let rtc_stream = factory::JanusRtcStream::new(USR_AUDIENCE).insert(&conn);
start(rtc_stream.id(), &conn).expect("Failed to start rtc stream");

rtc_stream
})
.expect("Failed to insert room");

let conn = db.connection_pool().get().unwrap();
let r = stop_running_streams_by_backend(rtc_stream.backend_id(), &conn)
.expect("Failed to stop running streams");

assert_eq!(r.len(), 1);
let stream = &r[0];
assert_eq!(stream.janus_rtc_stream.id(), rtc_stream.id());
assert!(matches!(
stream.janus_rtc_stream.time(),
Some((std::ops::Bound::Included(_), std::ops::Bound::Excluded(_)))
));
}
}
2 changes: 2 additions & 0 deletions src/test_helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl TestContext {
self.db().clone(),
WAITLIST_DURATION,
IpAddr::V4(Ipv4Addr::LOCALHOST),
None,
));
}

Expand All @@ -144,6 +145,7 @@ impl TestContext {
self.db().clone(),
WAITLIST_DURATION,
IpAddr::V4(Ipv4Addr::LOCALHOST),
None,
));
}

Expand Down

0 comments on commit 4ad831e

Please sign in to comment.