diff --git a/Cargo.lock b/Cargo.lock index fc90dbea5..11bd95c70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -795,6 +795,7 @@ dependencies = [ "slog-scope 4.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-stdlog 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "smart-default 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index d18c3581c..75f7af630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,3 +44,4 @@ toml = "0.4" [dev-dependencies] serial_test = "0.2" serial_test_derive = "0.2" +tokio = "0.1" diff --git a/README.md b/README.md index 311fc6997..b9f8434f3 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,5 @@ Medea Medea media server __DEVELOPMENT IN PROGRESS__ + +{"MakeSdpOffer":{"peer":0,"sdp_offer":"caller_offer"}} diff --git a/signaling_test.html b/signaling_test.html new file mode 100644 index 000000000..0fb718e37 --- /dev/null +++ b/signaling_test.html @@ -0,0 +1,161 @@ + + + + Chat + + + + + +
+
+
+ +
+
+ +
+
+
+ + diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs index 0f3537eab..81bf83f8e 100644 --- a/src/api/client/mod.rs +++ b/src/api/client/mod.rs @@ -1,7 +1,6 @@ //! Implementation of Client API. -pub mod room; -pub mod server; -pub mod session; +mod session; -pub use self::{room::*, server::*, session::*}; +pub mod rpc_connection; +pub mod server; diff --git a/src/api/client/room.rs b/src/api/client/room.rs deleted file mode 100644 index 0023db16d..000000000 --- a/src/api/client/room.rs +++ /dev/null @@ -1,180 +0,0 @@ -//! Room definitions and implementations. - -use std::{ - fmt, - sync::{Arc, Mutex}, -}; - -use actix::{ - fut::wrap_future, Actor, ActorFuture, Addr, Context, Handler, Message, -}; -use futures::{ - future::{self, Either}, - Future, -}; -use hashbrown::HashMap; - -use crate::{ - api::control::{Id as MemberId, Member}, - log::prelude::*, -}; - -/// ID of [`Room`]. -pub type Id = u64; - -/// Media server room with its [`Member`]s. -#[derive(Debug)] -pub struct Room { - /// ID of this [`Room`]. - pub id: Id, - /// [`Member`]s which currently are present in this [`Room`]. - pub members: HashMap, - /// Established [`WsSession`]s of [`Member`]s in this [`Room`]. - pub connections: HashMap>, - /* TODO: Replace Box> with enum, - * as the set of all possible RpcConnection types is not closed. */ -} - -/// [`Actor`] implementation that provides an ergonomic way -/// to interact with [`Room`]. -impl Actor for Room { - type Context = Context; -} - -/// Established RPC connection with some remote [`Member`]. -pub trait RpcConnection: fmt::Debug + Send { - /// Closes [`RpcConnection`]. - /// No [`RpcConnectionClosed`] signals should be emitted. - fn close(&mut self) -> Box>; -} - -/// Signal for authorizing new [`RpcConnection`] before establishing. -#[derive(Debug, Message)] -#[rtype(result = "Result<(), RpcConnectionAuthorizationError>")] -pub struct AuthorizeRpcConnection { - /// ID of [`Member`] to authorize [`RpcConnection`] for. - pub member_id: MemberId, - /// Credentials to authorize [`RpcConnection`] with. - pub credentials: String, // TODO: &str when futures will allow references -} - -/// Error of authorization [`RpcConnection`] in [`Room`]. -#[derive(Debug)] -pub enum RpcConnectionAuthorizationError { - /// Authorizing [`Member`] does not exists in the [`Room`]. - MemberNotExists, - /// Provided credentials are invalid. - InvalidCredentials, -} - -impl Handler for Room { - type Result = Result<(), RpcConnectionAuthorizationError>; - - /// Responses with `Ok` if `RpcConnection` is authorized, otherwise `Err`s. - fn handle( - &mut self, - msg: AuthorizeRpcConnection, - _ctx: &mut Self::Context, - ) -> Self::Result { - use RpcConnectionAuthorizationError::*; - if let Some(ref member) = self.members.get(&msg.member_id) { - if member.credentials.eq(&msg.credentials) { - return Ok(()); - } - return Err(InvalidCredentials); - } - Err(MemberNotExists) - } -} - -/// Signal of new [`RpcConnection`] being established with specified [`Member`]. -#[derive(Debug, Message)] -#[rtype(result = "Result<(), ()>")] -pub struct RpcConnectionEstablished { - /// ID of [`Member`] that establishes [`RpcConnection`]. - pub member_id: MemberId, - /// Established [`RpcConnection`]. - pub connection: Box, -} - -/// Ergonomic type alias for using [`ActorFuture`] for [`Room`]. -type ActFuture = Box>; - -impl Handler for Room { - type Result = ActFuture<(), ()>; - - /// Stores provided [`RpcConnection`] for given [`Member`] in the [`Room`]. - /// - /// If [`Member`] already has any other [`RpcConnection`], - /// then it will be closed. - fn handle( - &mut self, - msg: RpcConnectionEstablished, - _: &mut Self::Context, - ) -> Self::Result { - info!("RpcConnectionEstablished for member {}", msg.member_id); - - let mut fut = Either::A(future::ok(())); - - if let Some(mut old_conn) = self.connections.remove(&msg.member_id) { - debug!("Closing old RpcConnection for member {}", msg.member_id); - fut = Either::B(old_conn.close()); - } - - self.connections.insert(msg.member_id, msg.connection); - - Box::new(wrap_future(fut)) - } -} - -/// Signal of existing [`RpcConnection`] of specified [`Member`] being closed. -#[derive(Debug, Message)] -pub struct RpcConnectionClosed { - /// ID of [`Member`] which [`RpcConnection`] is closed. - pub member_id: MemberId, - /// Reason of why [`RpcConnection`] is closed. - pub reason: RpcConnectionClosedReason, -} - -/// Reasons of why [`RpcConnection`] may be closed. -#[derive(Debug)] -pub enum RpcConnectionClosedReason { - /// [`RpcConnection`] is disconnect by server itself. - Disconnected, - /// [`RpcConnection`] has become idle and is disconnected by idle timeout. - Idle, -} - -impl Handler for Room { - type Result = (); - - /// Removes [`RpcConnection`] of specified [`Member`] from the [`Room`]. - fn handle(&mut self, msg: RpcConnectionClosed, _: &mut Self::Context) { - info!( - "RpcConnectionClosed for member {}, reason {:?}", - msg.member_id, msg.reason - ); - self.connections.remove(&msg.member_id); - } -} - -/// Repository that stores [`Room`]s. -#[derive(Clone, Default)] -pub struct RoomsRepository { - rooms: Arc>>>, -} - -impl RoomsRepository { - /// Creates new [`Room`]s repository with passed-in [`Room`]s. - pub fn new(rooms: HashMap>) -> Self { - Self { - rooms: Arc::new(Mutex::new(rooms)), - } - } - - /// Returns [`Room`] by its ID. - pub fn get(&self, id: Id) -> Option> { - let rooms = self.rooms.lock().unwrap(); - rooms.get(&id).cloned() - } -} diff --git a/src/api/client/rpc_connection.rs b/src/api/client/rpc_connection.rs new file mode 100644 index 000000000..bd6026933 --- /dev/null +++ b/src/api/client/rpc_connection.rs @@ -0,0 +1,196 @@ +//! [`RpcConnection`] with related messages. +use actix::Message; +use futures::Future; + +use crate::api::{control::MemberId, protocol::Event}; + +use std::fmt; + +/// Abstraction over RPC connection with some remote [`Member`]. +pub trait RpcConnection: fmt::Debug + Send { + /// Closes [`RpcConnection`]. + /// No [`RpcConnectionClosed`] signals should be emitted. + /// Always returns success. + fn close(&mut self) -> Box>; + + /// Sends [`Event`] to remote [`Member`]. + fn send_event( + &self, + event: Event, + ) -> Box>; +} + +/// Signal for authorizing new [`RpcConnection`] before establishing. +#[derive(Debug, Message)] +#[rtype(result = "Result<(), AuthorizationError>")] +pub struct Authorize { + /// ID of [`Member`] to authorize [`RpcConnection`] for. + pub member_id: MemberId, + /// Credentials to authorize [`RpcConnection`] with. + pub credentials: String, // TODO: &str when futures will allow references +} + +/// Error of authorization [`RpcConnection`] in [`Room`]. +#[derive(Debug)] +pub enum AuthorizationError { + /// Authorizing [`Member`] does not exists in the [`Room`]. + MemberNotExists, + /// Provided credentials are invalid. + InvalidCredentials, +} + +/// Signal of new [`RpcConnection`] being established with specified [`Member`]. +/// Transport should consider dropping connection if message result is err. +#[derive(Debug, Message)] +#[rtype(result = "Result<(), ()>")] +#[allow(clippy::module_name_repetitions)] +pub struct RpcConnectionEstablished { + /// ID of [`Member`] that establishes [`RpcConnection`]. + pub member_id: MemberId, + /// Established [`RpcConnection`]. + pub connection: Box, +} +/// Signal of existing [`RpcConnection`] of specified [`Member`] being closed. +#[derive(Debug, Message)] +#[allow(clippy::module_name_repetitions)] +pub struct RpcConnectionClosed { + /// ID of [`Member`] which [`RpcConnection`] is closed. + pub member_id: MemberId, + /// Reason of why [`RpcConnection`] is closed. + pub reason: ClosedReason, +} + +/// Reasons of why [`RpcConnection`] may be closed. +#[derive(Debug)] +pub enum ClosedReason { + /// [`RpcConnection`] was irrevocably closed. + Closed, + /// [`RpcConnection`] was lost, but may be reestablished. + Lost, +} + +#[cfg(test)] +pub mod test { + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }; + + use actix::{ + Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, + System, + }; + use futures::future::Future; + + use crate::{ + api::{ + client::rpc_connection::{ + ClosedReason, RpcConnection, RpcConnectionClosed, + RpcConnectionEstablished, + }, + control::MemberId, + protocol::{Command, Event}, + }, + signalling::Room, + }; + + /// [`RpcConnection`] impl convenient for testing. + #[derive(Debug, Clone)] + pub struct TestConnection { + pub member_id: MemberId, + pub room: Addr, + pub events: Arc>>, + pub stopped: Arc, + } + + impl Actor for TestConnection { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.room + .try_send(RpcConnectionEstablished { + member_id: self.member_id, + connection: Box::new(ctx.address()), + }) + .unwrap(); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + self.stopped.fetch_add(1, Ordering::Relaxed); + if self.stopped.load(Ordering::Relaxed) > 1 { + System::current().stop() + } + } + } + + #[derive(Message)] + struct Close; + + impl Handler for TestConnection { + type Result = (); + + fn handle(&mut self, _: Close, ctx: &mut Self::Context) { + ctx.stop() + } + } + + impl Handler for TestConnection { + type Result = (); + + fn handle(&mut self, event: Event, _ctx: &mut Self::Context) { + let mut events = self.events.lock().unwrap(); + events.push(serde_json::to_string(&event).unwrap()); + match event { + Event::PeerCreated { + peer_id, + sdp_offer, + tracks: _, + } => { + match sdp_offer { + Some(_) => self.room.do_send(Command::MakeSdpAnswer { + peer_id, + sdp_answer: "responder_answer".into(), + }), + None => self.room.do_send(Command::MakeSdpOffer { + peer_id, + sdp_offer: "caller_offer".into(), + }), + } + self.room.do_send(Command::SetIceCandidate { + peer_id, + candidate: "ice_candidate".into(), + }) + } + Event::IceCandidateDiscovered { + peer_id: _, + candidate: _, + } => { + self.room.do_send(RpcConnectionClosed { + member_id: self.member_id, + reason: ClosedReason::Closed, + }); + } + Event::PeersRemoved { peer_ids: _ } => {} + Event::SdpAnswerMade { + peer_id: _, + sdp_answer: _, + } => {} + } + } + } + + impl RpcConnection for Addr { + fn close(&mut self) -> Box> { + let fut = self.send(Close {}).map_err(|_| ()); + Box::new(fut) + } + + fn send_event( + &self, + event: Event, + ) -> Box> { + let fut = self.send(event).map_err(|_| ()); + Box::new(fut) + } + } +} diff --git a/src/api/client/server.rs b/src/api/client/server.rs index 989021e6e..604976eb7 100644 --- a/src/api/client/server.rs +++ b/src/api/client/server.rs @@ -10,13 +10,14 @@ use serde::Deserialize; use crate::{ api::{ client::{ - AuthorizeRpcConnection, Id as RoomId, RoomsRepository, - RpcConnectionAuthorizationError, WsSession, + rpc_connection::{AuthorizationError, Authorize}, + session::WsSession, }, - control::Id as MemberId, + control::MemberId, }, conf::{Conf, Rpc}, log::prelude::*, + signalling::{RoomId, RoomsRepository}, }; /// Parameters of new WebSocket connection creation HTTP request. @@ -39,13 +40,11 @@ fn ws_index( State, ), ) -> FutureResponse { - use RpcConnectionAuthorizationError::*; - debug!("Request params: {:?}", info); match state.rooms.get(info.room_id) { Some(room) => room - .send(AuthorizeRpcConnection { + .send(Authorize { member_id: info.member_id, credentials: info.credentials.clone(), }) @@ -59,8 +58,12 @@ fn ws_index( state.config.idle_timeout, ), ), - Err(MemberNotExists) => Ok(HttpResponse::NotFound().into()), - Err(InvalidCredentials) => Ok(HttpResponse::Forbidden().into()), + Err(AuthorizationError::MemberNotExists) => { + Ok(HttpResponse::NotFound().into()) + } + Err(AuthorizationError::InvalidCredentials) => { + Ok(HttpResponse::Forbidden().into()) + } }) .responder(), None => future::ok(HttpResponse::NotFound().into()).responder(), @@ -104,25 +107,24 @@ mod test { use actix::Arbiter; use actix_web::{http, test, App}; use futures::Stream; - use hashbrown::HashMap; use crate::{ - api::{client::Room, control::Member}, + api::control::Member, conf::{Conf, Server}, + media::create_peers, + signalling::Room, }; use super::*; /// Creates [`RoomsRepository`] for tests filled with a single [`Room`]. - fn room() -> RoomsRepository { + fn room(conf: Rpc) -> RoomsRepository { let members = hashmap! { 1 => Member{id: 1, credentials: "caller_credentials".into()}, 2 => Member{id: 2, credentials: "responder_credentials".into()}, }; - let room = Arbiter::start(move |_| Room { - id: 1, - members, - connections: HashMap::new(), + let room = Arbiter::start(move |_| { + Room::new(1, members, create_peers(1, 2), conf.reconnect_timeout) }); let rooms = hashmap! {1 => room}; RoomsRepository::new(rooms) @@ -132,7 +134,7 @@ mod test { fn ws_server(conf: Conf) -> test::TestServer { test::TestServer::with_factory(move || { App::with_state(Context { - rooms: room(), + rooms: room(conf.rpc.clone()), config: conf.rpc.clone(), }) .resource("/ws/{room_id}/{member_id}/{credentials}", |r| { @@ -149,14 +151,15 @@ mod test { write.text(r#"{"ping":33}"#); let (item, _) = server.execute(read.into_future()).unwrap(); - assert_eq!(item, Some(ws::Message::Text(r#"{"pong":33}"#.into()))); + assert_eq!(Some(ws::Message::Text(r#"{"pong":33}"#.into())), item); } #[test] fn disconnects_on_idle() { let conf = Conf { rpc: Rpc { - idle_timeout: Duration::new(1, 0), + idle_timeout: Duration::new(2, 0), + reconnect_timeout: Default::default(), }, server: Server::default(), }; @@ -167,14 +170,14 @@ mod test { write.text(r#"{"ping":33}"#); let (item, read) = server.execute(read.into_future()).unwrap(); - assert_eq!(item, Some(ws::Message::Text(r#"{"pong":33}"#.into()))); + assert_eq!(Some(ws::Message::Text(r#"{"pong":33}"#.into())), item); thread::sleep(conf.rpc.idle_timeout.add(Duration::from_secs(1))); let (item, _) = server.execute(read.into_future()).unwrap(); assert_eq!( - item, - Some(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) + Some(ws::Message::Close(Some(ws::CloseCode::Normal.into()))), + item ); } } diff --git a/src/api/client/session.rs b/src/api/client/session.rs index 6698439c2..40e17eccd 100644 --- a/src/api/client/session.rs +++ b/src/api/client/session.rs @@ -1,22 +1,25 @@ //! WebSocket session. -use std::time::Duration; +use std::time::{Duration, Instant}; use actix::{ - fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, - Message, SpawnHandle, StreamHandler, + fut::wrap_future, Actor, ActorContext, ActorFuture, Addr, AsyncContext, + Handler, Message, StreamHandler, }; -use actix_web::ws::{self, CloseReason}; -use futures::Future; -use serde::{Deserialize, Serialize}; +use actix_web::ws::{self, CloseReason, WebsocketContext}; +use futures::future::Future; use crate::{ - api::client::room::{ - Room, RpcConnection, RpcConnectionClosed, RpcConnectionClosedReason, - RpcConnectionEstablished, + api::{ + client::rpc_connection::{ + ClosedReason, RpcConnection, RpcConnectionClosed, + RpcConnectionEstablished, + }, + control::MemberId, + protocol::{ClientMsg, Event, ServerMsg}, }, - api::control::member::Id as MemberId, log::prelude::*, + signalling::Room, }; /// Long-running WebSocket connection of Client API. @@ -25,17 +28,20 @@ use crate::{ pub struct WsSession { /// ID of [`Member`] that WebSocket connection is associated with. member_id: MemberId, + /// [`Room`] that [`Member`] is associated with. room: Addr, - /// Handle for watchdog which checks whether WebSocket client became - /// idle (no `ping` messages received during [`idle_timeout`]). - /// - /// This one should be renewed on received `ping` message from client. - idle_handler: Option, - /// Timeout of receiving `ping` messages from client. + /// Timeout of receiving any messages from client. idle_timeout: Duration, + /// Timestamp for watchdog which checks whether WebSocket client became + /// idle (no messages received during [`idle_timeout`]). + /// + /// This one should be renewed on any received WebSocket message + /// from client. + last_activity: Instant, + /// Indicates whether WebSocket connection is closed by server ot by /// client. closed_by_server: bool, @@ -51,42 +57,39 @@ impl WsSession { Self { member_id, room, - idle_handler: None, idle_timeout, + last_activity: Instant::now(), closed_by_server: false, } } - /// Resets idle handler watchdog. - fn reset_idle_timeout(&mut self, ctx: &mut ::Context) { - if let Some(handler) = self.idle_handler { - ctx.cancel_future(handler); - } - - self.idle_handler = - Some(ctx.run_later(self.idle_timeout, |sess, ctx| { - info!("WsConnection with member {} is idle", sess.member_id); - - let member_id = sess.member_id; - ctx.wait(wrap_future( - sess.room - .send(RpcConnectionClosed { - member_id, - reason: RpcConnectionClosedReason::Idle, - }) - .map_err(move |err| { - error!( - "WsSession of member {} failed to remove from \ - Room, because: {:?}", - member_id, err, - ) - }), - )); + fn close_normal(&self, ctx: &mut WebsocketContext) { + ctx.notify(Close { + reason: Some(ws::CloseCode::Normal.into()), + }); + } - ctx.notify(Close { - reason: Some(ws::CloseCode::Normal.into()), - }); - })); + /// Start watchdog which will drop connection if now-last_activity > + /// idle_timeout. + fn start_watchdog(&mut self, ctx: &mut ::Context) { + ctx.run_interval(Duration::new(1, 0), |session, ctx| { + if Instant::now().duration_since(session.last_activity) + > session.idle_timeout + { + info!("WsSession of member {} is idle", session.member_id); + if let Err(err) = session.room.try_send(RpcConnectionClosed { + member_id: session.member_id, + reason: ClosedReason::Lost, + }) { + error!( + "WsSession of member {} failed to remove from Room, \ + because: {:?}", + session.member_id, err, + ) + } + session.close_normal(ctx); + } + }); } } @@ -100,24 +103,41 @@ impl Actor for WsSession { fn started(&mut self, ctx: &mut Self::Context) { debug!("Started WsSession for member {}", self.member_id); - self.reset_idle_timeout(ctx); + self.start_watchdog(ctx); let member_id = self.member_id; - ctx.wait(wrap_future( - self.room - .send(RpcConnectionEstablished { - member_id: self.member_id, - connection: Box::new(ctx.address()), - }) - .map(|_| ()) - .map_err(move |err| { + ctx.wait( + wrap_future(self.room.send(RpcConnectionEstablished { + member_id: self.member_id, + connection: Box::new(ctx.address()), + })) + .map( + move |auth_result, + session: &mut Self, + ctx: &mut ws::WebsocketContext| { + if let Err(e) = auth_result { + error!( + "Room rejected Established for member {}, cause \ + {:?}", + member_id, e + ); + session.close_normal(ctx); + } + }, + ) + .map_err( + move |send_err, + session: &mut Self, + ctx: &mut ws::WebsocketContext| { error!( "WsSession of member {} failed to join Room, because: \ {:?}", - member_id, err, - ) - }), - )); + member_id, send_err, + ); + session.close_normal(ctx); + }, + ), + ); } fn stopped(&mut self, _ctx: &mut Self::Context) { @@ -127,12 +147,25 @@ impl Actor for WsSession { impl RpcConnection for Addr { /// Closes [`WsSession`] by sending itself "normal closure" close message. + /// + /// Never returns error. fn close(&mut self) -> Box> { let fut = self .send(Close { reason: Some(ws::CloseCode::Normal.into()), }) - .map_err(|_| ()); + .or_else(|_| Ok(())); + Box::new(fut) + } + + /// Sends [`Event`] to Web Client. + fn send_event( + &self, + event: Event, + ) -> Box> { + let fut = self + .send(ServerMsg::Event(event)) + .map_err(|err| error!("Failed send event {:?} ", err)); Box::new(fut) } } @@ -155,29 +188,13 @@ impl Handler for WsSession { } } -/// Message for keeping client WebSocket connection alive. -#[derive(Debug, Deserialize, Message, Serialize)] -pub enum Heartbeat { - /// `ping` message that WebSocket client is expected to send to the server - /// periodically. - #[serde(rename = "ping")] - Ping(usize), - /// `pong` message that server answers with to WebSocket client in response - /// to received `ping` message. - #[serde(rename = "pong")] - Pong(usize), -} - -impl Handler for WsSession { +impl Handler for WsSession { type Result = (); - /// Answers with `Heartbeat::Pong` message to WebSocket client in response - /// to the received `Heartbeat::Ping` message. - fn handle(&mut self, msg: Heartbeat, ctx: &mut Self::Context) { - if let Heartbeat::Ping(n) = msg { - trace!("Received ping: {}", n); - ctx.text(serde_json::to_string(&Heartbeat::Pong(n)).unwrap()) - } + /// Sends [`Event`] to Web Client. + fn handle(&mut self, msg: ServerMsg, ctx: &mut Self::Context) { + debug!("Event {:?} for member {}", msg, self.member_id); + ctx.text(serde_json::to_string(&msg).unwrap()) } } @@ -190,32 +207,39 @@ impl StreamHandler for WsSession { ); match msg { ws::Message::Text(text) => { - self.reset_idle_timeout(ctx); - if let Ok(msg) = serde_json::from_str::(&text) { - ctx.notify(msg); + self.last_activity = Instant::now(); + match serde_json::from_str::(&text) { + Ok(ClientMsg::Ping(n)) => { + trace!("Received ping: {}", n); + // Answer with Heartbeat::Pong. + ctx.notify(ServerMsg::Pong(n)); + } + Ok(ClientMsg::Command(command)) => { + if let Err(err) = self.room.try_send(command) { + error!( + "Cannot send Command to Room {}, because {}", + self.member_id, err + ) + } + } + Err(err) => error!( + "Error [{:?}] parsing client message [{}]", + err, &text + ), } } ws::Message::Close(reason) => { if !self.closed_by_server { - debug!( - "Send close frame with reason {:?} for member {}", - reason, self.member_id - ); - let member_id = self.member_id; - ctx.wait(wrap_future( - self.room - .send(RpcConnectionClosed { - member_id: self.member_id, - reason: RpcConnectionClosedReason::Disconnected, - }) - .map_err(move |err| { - error!( - "WsSession of member {} failed to remove \ - from Room, because: {:?}", - member_id, err, - ) - }), - )); + if let Err(err) = self.room.try_send(RpcConnectionClosed { + member_id: self.member_id, + reason: ClosedReason::Closed, + }) { + error!( + "WsSession of member {} failed to remove from \ + Room, because: {:?}", + self.member_id, err, + ) + }; ctx.close(reason); ctx.stop(); } diff --git a/src/api/control/member.rs b/src/api/control/member.rs index 57734dfde..880e666aa 100644 --- a/src/api/control/member.rs +++ b/src/api/control/member.rs @@ -8,6 +8,7 @@ pub type Id = u64; pub struct Member { /// ID of [`Member`]. pub id: Id, + /// Credentials to authorize [`Member`] with. pub credentials: String, } diff --git a/src/api/control/mod.rs b/src/api/control/mod.rs index b29bf968d..db3c38fcd 100644 --- a/src/api/control/mod.rs +++ b/src/api/control/mod.rs @@ -1,5 +1,5 @@ //! Implementation of Control API. -pub mod member; +mod member; -pub use self::member::*; +pub use self::member::{Id as MemberId, Member}; diff --git a/src/api/mod.rs b/src/api/mod.rs index 1c66e73e5..f12d438ca 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -2,3 +2,4 @@ pub mod client; pub mod control; +pub mod protocol; diff --git a/src/api/protocol.rs b/src/api/protocol.rs new file mode 100644 index 000000000..34848e063 --- /dev/null +++ b/src/api/protocol.rs @@ -0,0 +1,283 @@ +use actix::Message; +use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize}; + +// TODO: should be properly shared between medea and jason +#[cfg_attr(test, derive(PartialEq))] +#[derive(Message, Debug)] +#[allow(dead_code)] +/// Message sent by `Media Server` to `Client`. +pub enum ServerMsg { + /// `pong` message that server answers with to WebSocket client in response + /// to received `ping` message. + Pong(u64), + /// `Media Server` notifies `Client` about happened facts and it reacts on + /// them to reach the proper state. + Event(Event), +} + +#[cfg_attr(test, derive(PartialEq, Debug))] +#[allow(dead_code)] +/// Message from 'Client' to 'Media Server'. +pub enum ClientMsg { + /// `ping` message that WebSocket client is expected to send to the server + /// periodically. + Ping(u64), + /// Request of `Web Client` to change the state on `Media Server`. + Command(Command), +} + +/// WebSocket message from Web Client to Media Server. +#[derive(Deserialize, Serialize, Message)] +#[cfg_attr(test, derive(PartialEq, Debug))] +#[serde(tag = "command", content = "data")] +#[allow(dead_code)] +#[rtype(result = "Result<(), ()>")] +pub enum Command { + /// Web Client sends SDP Offer. + MakeSdpOffer { peer_id: u64, sdp_offer: String }, + /// Web Client sends SDP Answer. + MakeSdpAnswer { peer_id: u64, sdp_answer: String }, + /// Web Client sends Ice Candidate. + SetIceCandidate { peer_id: u64, candidate: String }, +} + +/// WebSocket message from Medea to Jason. +#[derive(Deserialize, Serialize, Message, Debug)] +#[cfg_attr(test, derive(PartialEq))] +#[allow(dead_code)] +#[serde(tag = "event", content = "data")] +pub enum Event { + /// Media Server notifies Web Client about necessity of RTCPeerConnection + /// creation. + PeerCreated { + peer_id: u64, + sdp_offer: Option, + tracks: Vec, + }, + /// Media Server notifies Web Client about necessity to apply specified SDP + /// Answer to Web Client's RTCPeerConnection. + SdpAnswerMade { peer_id: u64, sdp_answer: String }, + + /// Media Server notifies Web Client about necessity to apply specified + /// ICE Candidate. + IceCandidateDiscovered { peer_id: u64, candidate: String }, + + /// Media Server notifies Web Client about necessity of RTCPeerConnection + /// close. + PeersRemoved { peer_ids: Vec }, +} + +/// [`Track] with specified direction. +#[derive(Deserialize, Serialize, Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct Directional { + pub id: u64, + pub direction: Direction, + pub media_type: MediaType, +} + +/// Direction of [`Track`]. +#[derive(Deserialize, Serialize, Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub enum Direction { + Send { receivers: Vec }, + Recv { sender: u64 }, +} + +/// Type of [`Track`]. +#[derive(Deserialize, Serialize, Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub enum MediaType { + Audio(AudioSettings), + Video(VideoSettings), +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub struct AudioSettings {} + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[cfg_attr(test, derive(PartialEq))] +pub struct VideoSettings {} + +impl Serialize for ClientMsg { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeStruct; + + match self { + ClientMsg::Ping(n) => { + let mut ping = serializer.serialize_struct("ping", 1)?; + ping.serialize_field("ping", n)?; + ping.end() + } + ClientMsg::Command(command) => command.serialize(serializer), + } + } +} + +impl<'de> Deserialize<'de> for ClientMsg { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + + let ev = serde_json::Value::deserialize(deserializer)?; + let map = ev.as_object().ok_or_else(|| { + Error::custom(format!("unable to deser ClientMsg [{:?}]", &ev)) + })?; + + if let Some(v) = map.get("ping") { + let n = v.as_u64().ok_or_else(|| { + Error::custom(format!( + "unable to deser ClientMsg::Ping [{:?}]", + &ev + )) + })?; + + Ok(ClientMsg::Ping(n)) + } else { + let command = + serde_json::from_value::(ev).map_err(|e| { + Error::custom(format!( + "unable to deser ClientMsg::Command [{:?}]", + e + )) + })?; + Ok(ClientMsg::Command(command)) + } + } +} + +impl Serialize for ServerMsg { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeStruct; + + match self { + ServerMsg::Pong(n) => { + let mut ping = serializer.serialize_struct("pong", 1)?; + ping.serialize_field("pong", n)?; + ping.end() + } + ServerMsg::Event(command) => command.serialize(serializer), + } + } +} + +impl<'de> Deserialize<'de> for ServerMsg { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + + let ev = serde_json::Value::deserialize(deserializer)?; + let map = ev.as_object().ok_or_else(|| { + Error::custom(format!("unable to deser ServerMsg [{:?}]", &ev)) + })?; + + if let Some(v) = map.get("pong") { + let n = v.as_u64().ok_or_else(|| { + Error::custom(format!( + "unable to deser ServerMsg::Pong [{:?}]", + &ev + )) + })?; + + Ok(ServerMsg::Pong(n)) + } else { + let event = serde_json::from_value::(ev).map_err(|e| { + Error::custom(format!( + "unable to deser ServerMsg::Event [{:?}]", + e + )) + })?; + Ok(ServerMsg::Event(event)) + } + } +} + +#[cfg(test)] +mod test { + use crate::api::protocol::{ClientMsg, Command, Event, ServerMsg}; + + #[test] + fn command() { + let command = ClientMsg::Command(Command::MakeSdpOffer { + peer_id: 77, + sdp_offer: "offer".to_owned(), + }); + #[cfg_attr(nightly, rustfmt::skip)] + let command_str = + "{\ + \"command\":\"MakeSdpOffer\",\ + \"data\":{\ + \"peer_id\":77,\ + \"sdp_offer\":\"offer\"\ + }\ + }"; + + assert_eq!(command_str, serde_json::to_string(&command).unwrap()); + assert_eq!( + command, + serde_json::from_str(&serde_json::to_string(&command).unwrap()) + .unwrap() + ); + } + + #[test] + fn ping() { + let ping = ClientMsg::Ping(15); + let ping_str = "{\"ping\":15}"; + + assert_eq!(ping_str, serde_json::to_string(&ping).unwrap()); + assert_eq!( + ping, + serde_json::from_str(&serde_json::to_string(&ping).unwrap()) + .unwrap() + ) + } + + #[test] + fn event() { + let event = ServerMsg::Event(Event::SdpAnswerMade { + peer_id: 45, + sdp_answer: "answer".to_owned(), + }); + #[cfg_attr(nightly, rustfmt::skip)] + let event_str = + "{\ + \"event\":\"SdpAnswerMade\",\ + \"data\":{\ + \"peer_id\":45,\ + \"sdp_answer\":\"answer\"\ + }\ + }"; + + assert_eq!(event_str, serde_json::to_string(&event).unwrap()); + assert_eq!( + event, + serde_json::from_str(&serde_json::to_string(&event).unwrap()) + .unwrap() + ); + } + + #[test] + fn pong() { + let pong = ServerMsg::Pong(5); + let pong_str = "{\"pong\":5}"; + + assert_eq!(pong_str, serde_json::to_string(&pong).unwrap()); + assert_eq!( + pong, + serde_json::from_str(&serde_json::to_string(&pong).unwrap()) + .unwrap() + ) + } +} diff --git a/src/conf/rpc.rs b/src/conf/rpc.rs index 0327e2225..e3fdda999 100644 --- a/src/conf/rpc.rs +++ b/src/conf/rpc.rs @@ -12,4 +12,10 @@ pub struct Rpc { #[default(Duration::from_secs(10))] #[serde(with = "serde_humantime")] pub idle_timeout: Duration, + + /// Duration, after which the server deletes the client session if + /// the remote RPC client does not reconnect after it is idle. + #[default(Duration::from_secs(10))] + #[serde(with = "serde_humantime")] + pub reconnect_timeout: Duration, } diff --git a/src/main.rs b/src/main.rs index fd42507b1..aab1c2a74 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,22 @@ //! Medea media server application. #[macro_use] -mod utils; - +pub mod utils; pub mod api; pub mod conf; pub mod log; +pub mod media; +pub mod signalling; use actix::prelude::*; use dotenv::dotenv; -use hashbrown::HashMap; use log::prelude::*; use crate::{ - api::{ - client::{server, Room, RoomsRepository}, - control::Member, - }, + api::{client::server, control::Member}, conf::Conf, + media::create_peers, + signalling::{Room, RoomsRepository}, }; fn main() { @@ -28,22 +27,20 @@ fn main() { let sys = System::new("medea"); + let config = Conf::parse().unwrap(); + + info!("{:?}", config); + let members = hashmap! { 1 => Member{id: 1, credentials: "caller_credentials".to_owned()}, 2 => Member{id: 2, credentials: "responder_credentials".to_owned()}, }; - let room = Arbiter::start(move |_| Room { - id: 1, - members, - connections: HashMap::new(), - }); + let peers = create_peers(1, 2); + let room = Room::new(1, members, peers, config.rpc.reconnect_timeout); + let room = Arbiter::start(move |_| room); let rooms = hashmap! {1 => room}; let rooms_repo = RoomsRepository::new(rooms); - let config = Conf::parse().unwrap(); - - info!("{:?}", config); - server::run(rooms_repo, config); let _ = sys.run(); } diff --git a/src/media/mod.rs b/src/media/mod.rs new file mode 100644 index 000000000..ed48bf622 --- /dev/null +++ b/src/media/mod.rs @@ -0,0 +1,11 @@ +//! Representations of media and media connection establishment objects. +pub mod peer; +pub mod track; + +pub use self::{ + peer::{ + create_peers, Id as PeerId, New, Peer, PeerStateError, + PeerStateMachine, WaitLocalHaveRemote, WaitLocalSdp, WaitRemoteSdp, + }, + track::{Id as TrackId, Track}, +}; diff --git a/src/media/peer.rs b/src/media/peer.rs new file mode 100644 index 000000000..a01b7bb2c --- /dev/null +++ b/src/media/peer.rs @@ -0,0 +1,389 @@ +//! Remote [`RTCPeerConnection`][1] representation. +//! +//! [1]: https://www.w3.org/TR/webrtc/#rtcpeerconnection-interface + +#![allow(clippy::use_self)] +use failure::Fail; +use hashbrown::HashMap; + +use std::{convert::TryFrom, fmt::Display, sync::Arc}; + +use crate::{ + api::{ + control::MemberId, + protocol::{ + AudioSettings, Direction, Directional, MediaType, VideoSettings, + }, + }, + media::{Track, TrackId}, +}; + +/// Newly initialized [`Peer`] ready to signalling. +#[derive(Debug, PartialEq)] +pub struct New {} + +/// [`Peer`] doesnt have remote SDP and is waiting for local SDP. +#[derive(Debug, PartialEq)] +pub struct WaitLocalSdp {} + +/// [`Peer`] has remote SDP and is waiting for local SDP. +#[derive(Debug, PartialEq)] +pub struct WaitLocalHaveRemote {} + +/// [`Peer`] has local SDP and is waiting for remote SDP. +#[derive(Debug, PartialEq)] +pub struct WaitRemoteSdp {} + +/// SDP exchange ended. +#[derive(Debug, PartialEq)] +pub struct Stable {} + +/// Produced when unwrapping [`PeerStateMachine`] to [`Peer`] with wrong state. +#[derive(Fail, Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum PeerStateError { + #[fail( + display = "Cannot unwrap Peer from PeerStateMachine [id = {}]. \ + Expected state {} was {}", + _0, _1, _2 + )] + WrongState(Id, &'static str, String), +} + +impl PeerStateError { + pub fn new_wrong_state( + peer: &PeerStateMachine, + expected: &'static str, + ) -> Self { + PeerStateError::WrongState(peer.id(), expected, format!("{}", peer)) + } +} + +/// Implementation of ['Peer'] state machine. +#[derive(Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum PeerStateMachine { + New(Peer), + WaitLocalSdp(Peer), + WaitLocalHaveRemote(Peer), + WaitRemoteSdp(Peer), + Stable(Peer), +} + +// TODO: macro to remove boilerplate +impl PeerStateMachine { + /// Returns ID of [`Peer`]. + pub fn id(&self) -> Id { + match self { + PeerStateMachine::New(peer) => peer.id(), + PeerStateMachine::WaitLocalSdp(peer) => peer.id(), + PeerStateMachine::WaitLocalHaveRemote(peer) => peer.id(), + PeerStateMachine::WaitRemoteSdp(peer) => peer.id(), + PeerStateMachine::Stable(peer) => peer.id(), + } + } + + /// Returns ID of [`Member`] associated with this [`Peer`]. + pub fn member_id(&self) -> MemberId { + match self { + PeerStateMachine::New(peer) => peer.member_id(), + PeerStateMachine::WaitLocalSdp(peer) => peer.member_id(), + PeerStateMachine::WaitLocalHaveRemote(peer) => peer.member_id(), + PeerStateMachine::WaitRemoteSdp(peer) => peer.member_id(), + PeerStateMachine::Stable(peer) => peer.member_id(), + } + } + + /// Returns ID of interconnected [`Peer`]. + pub fn partner_peer_id(&self) -> Id { + match self { + PeerStateMachine::New(peer) => peer.partner_peer_id(), + PeerStateMachine::WaitLocalSdp(peer) => peer.partner_peer_id(), + PeerStateMachine::WaitLocalHaveRemote(peer) => { + peer.partner_peer_id() + } + PeerStateMachine::WaitRemoteSdp(peer) => peer.partner_peer_id(), + PeerStateMachine::Stable(peer) => peer.partner_peer_id(), + } + } + + /// Returns ID of interconnected [`Member`]. + pub fn partner_member_id(&self) -> Id { + match self { + PeerStateMachine::New(peer) => peer.partner_peer_id(), + PeerStateMachine::WaitLocalSdp(peer) => peer.partner_peer_id(), + PeerStateMachine::WaitLocalHaveRemote(peer) => { + peer.partner_peer_id() + } + PeerStateMachine::WaitRemoteSdp(peer) => peer.partner_peer_id(), + PeerStateMachine::Stable(peer) => peer.partner_peer_id(), + } + } +} + +impl Display for PeerStateMachine { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + PeerStateMachine::WaitRemoteSdp(_) => write!(f, "WaitRemoteSdp"), + PeerStateMachine::New(_) => write!(f, "New"), + PeerStateMachine::WaitLocalSdp(_) => write!(f, "WaitLocalSdp"), + PeerStateMachine::WaitLocalHaveRemote(_) => { + write!(f, "WaitLocalHaveRemote") + } + PeerStateMachine::Stable(_) => write!(f, "Stable"), + } + } +} + +macro_rules! impl_peer_converts { + ($peer_type:tt) => { + impl<'a> TryFrom<&'a PeerStateMachine> for &'a Peer<$peer_type> { + type Error = PeerStateError; + + fn try_from( + peer: &'a PeerStateMachine, + ) -> Result { + match peer { + PeerStateMachine::$peer_type(peer) => Ok(peer), + _ => Err(PeerStateError::WrongState( + 1, + stringify!($peer_type), + format!("{}", peer), + )), + } + } + } + + impl TryFrom for Peer<$peer_type> { + type Error = PeerStateError; + + fn try_from(peer: PeerStateMachine) -> Result { + match peer { + PeerStateMachine::$peer_type(peer) => Ok(peer), + _ => Err(PeerStateError::WrongState( + 1, + stringify!($peer_type), + format!("{}", peer), + )), + } + } + } + + impl From> for PeerStateMachine { + fn from(peer: Peer<$peer_type>) -> Self { + PeerStateMachine::$peer_type(peer) + } + } + }; +} + +impl_peer_converts!(New); +impl_peer_converts!(WaitLocalSdp); +impl_peer_converts!(WaitLocalHaveRemote); +impl_peer_converts!(WaitRemoteSdp); +impl_peer_converts!(Stable); + +/// ID of [`Peer`]. +pub type Id = u64; + +#[derive(Debug)] +pub struct Context { + id: Id, + member_id: MemberId, + partner_peer: Id, + partner_member: MemberId, + sdp_offer: Option, + sdp_answer: Option, + receivers: HashMap>, + senders: HashMap>, +} + +/// [`RTCPeerConnection`] representation. +#[derive(Debug)] +pub struct Peer { + context: Context, + state: S, +} + +impl Peer { + /// Returns ID of [`Member`] associated with this [`Peer`]. + pub fn member_id(&self) -> MemberId { + self.context.member_id + } + + /// Returns ID of [`Peer`]. + pub fn id(&self) -> Id { + self.context.id + } + + /// Returns ID of interconnected [`Peer`]. + pub fn partner_peer_id(&self) -> Id { + self.context.partner_peer + } + + /// Returns ID of interconnected [`Member`]. + pub fn partner_member_id(&self) -> Id { + self.context.partner_member + } + + /// Returns [`Track`]'s of [`Peer`]. + pub fn tracks(&self) -> Vec { + let tracks = self.context.senders.iter().fold( + vec![], + |mut tracks, (_, track)| { + tracks.push(Directional { + id: track.id, + media_type: track.media_type.clone(), + direction: Direction::Send { + receivers: vec![self.context.partner_peer], + }, + }); + tracks + }, + ); + self.context + .receivers + .iter() + .fold(tracks, |mut tracks, (_, track)| { + tracks.push(Directional { + id: track.id, + media_type: track.media_type.clone(), + direction: Direction::Recv { + sender: self.context.partner_peer, + }, + }); + tracks + }) + } + + pub fn is_sender(&self) -> bool { + !self.context.senders.is_empty() + } +} + +impl Peer { + /// Creates new [`Peer`] for [`Member`]. + pub fn new( + id: Id, + member_id: MemberId, + partner_peer: Id, + partner_member: MemberId, + ) -> Self { + let context = Context { + id, + member_id, + partner_peer, + partner_member, + sdp_offer: None, + sdp_answer: None, + receivers: HashMap::new(), + senders: HashMap::new(), + }; + Self { + context, + state: New {}, + } + } + + /// Transition new [`Peer`] into state of waiting for local description. + pub fn start(self) -> Peer { + Peer { + context: self.context, + state: WaitLocalSdp {}, + } + } + + /// Transition new [`Peer`] into state of waiting for remote description. + pub fn set_remote_sdp( + self, + sdp_offer: String, + ) -> Peer { + let mut context = self.context; + context.sdp_offer = Some(sdp_offer); + Peer { + context, + state: WaitLocalHaveRemote {}, + } + } + + /// Add [`Track`] to [`Peer`] for send. + pub fn add_sender(&mut self, track: Arc) { + self.context.senders.insert(track.id, track); + } + + /// Add [`Track`] to [`Peer`] for receive. + pub fn add_receiver(&mut self, track: Arc) { + self.context.receivers.insert(track.id, track); + } +} + +impl Peer { + /// Set local description and transition [`Peer`] + /// to [`WaitRemoteSDP`] state. + pub fn set_local_sdp(self, sdp_offer: String) -> Peer { + let mut context = self.context; + context.sdp_offer = Some(sdp_offer); + Peer { + context, + state: WaitRemoteSdp {}, + } + } +} + +impl Peer { + /// Set remote description and transition [`Peer`] to [`Stable`] state. + pub fn set_remote_sdp(self, sdp_answer: &str) -> Peer { + let mut context = self.context; + context.sdp_answer = Some(sdp_answer.to_string()); + Peer { + context, + state: Stable {}, + } + } +} + +impl Peer { + /// Set local description and transition [`Peer`] to [`Stable`] state. + pub fn set_local_sdp(self, sdp_answer: String) -> Peer { + let mut context = self.context; + context.sdp_answer = Some(sdp_answer); + Peer { + context, + state: Stable {}, + } + } +} + +pub fn create_peers( + caller: MemberId, + responder: MemberId, +) -> HashMap { + let caller_peer_id = 1; + let responder_peer_id = 2; + let mut caller_peer = + Peer::new(caller_peer_id, caller, responder_peer_id, responder_peer_id); + let mut responder_peer = + Peer::new(responder_peer_id, responder, caller_peer_id, caller_peer_id); + + let track_audio = + Arc::new(Track::new(1, MediaType::Audio(AudioSettings {}))); + let track_video = + Arc::new(Track::new(2, MediaType::Video(VideoSettings {}))); + caller_peer.add_sender(track_audio.clone()); + caller_peer.add_sender(track_video.clone()); + responder_peer.add_receiver(track_audio); + responder_peer.add_receiver(track_video); + + hashmap!( + caller_peer_id => PeerStateMachine::New(caller_peer), + responder_peer_id => PeerStateMachine::New(responder_peer), + ) +} + +#[test] +fn create_peer() { + let peer = Peer::new(1, 1, 2, 2); + let peer = peer.start(); + + assert_eq!(peer.state, WaitLocalSdp {}); +} diff --git a/src/media/track.rs b/src/media/track.rs new file mode 100644 index 000000000..cca65b95c --- /dev/null +++ b/src/media/track.rs @@ -0,0 +1,22 @@ +//! Remote [`MediaStreamTrack`][1] representation. +//! +//! [1]: https://www.w3.org/TR/mediacapture-streams/#mediastreamtrack + +use crate::api::protocol::MediaType; + +/// ID of [`Track`]. +pub type Id = u64; + +/// [`MediaStreamTrack`] representation. +#[derive(Debug)] +pub struct Track { + pub id: Id, + pub media_type: MediaType, +} + +impl Track { + /// Creates new [`Track`] of the specified type. + pub fn new(id: Id, media_type: MediaType) -> Self { + Self { id, media_type } + } +} diff --git a/src/signalling/mod.rs b/src/signalling/mod.rs new file mode 100644 index 000000000..ddf76119c --- /dev/null +++ b/src/signalling/mod.rs @@ -0,0 +1,9 @@ +pub mod participants; +pub mod peers; +pub mod room; +pub mod room_repo; + +pub use self::{ + room::{Id as RoomId, Room}, + room_repo::RoomsRepository, +}; diff --git a/src/signalling/participants.rs b/src/signalling/participants.rs new file mode 100644 index 000000000..4973f7af2 --- /dev/null +++ b/src/signalling/participants.rs @@ -0,0 +1,192 @@ +//! Participant is [`Member`] with [`RpcConnection`]. [`ParticipantService`] +//! stores [`Members`] and associated [`RpcConnection`]s, handles +//! [`RpcConnection`] authorization, establishment, message sending. + +use actix::{fut::wrap_future, AsyncContext, Context, SpawnHandle}; +use hashbrown::HashMap; + +use futures::{ + future::{self, join_all, Either}, + Future, +}; + +use std::time::{Duration, Instant}; + +use crate::{ + api::{ + client::rpc_connection::{ + AuthorizationError, ClosedReason, RpcConnection, + RpcConnectionClosed, + }, + control::{Member, MemberId}, + protocol::Event, + }, + log::prelude::*, + signalling::{ + room::{CloseRoom, RoomError}, + Room, + }, +}; + +/// Participant is [`Member`] with [`RpcConnection`]. [`ParticipantService`] +/// stores [`Members`] and associated [`RpcConnection`]s, handles +/// [`RpcConnection`] authorization, establishment, message sending. +#[derive(Debug)] +pub struct ParticipantService { + /// [`Member`]s which currently are present in this [`Room`]. + members: HashMap, + + /// Established [`RpcConnection`]s of [`Member`]s in this [`Room`]. + // TODO: Replace Box> with enum, + // as the set of all possible RpcConnection types is not closed. + connections: HashMap>, + + /// Timeout for close [`RpcConnection`] after receiving + /// [`RpcConnectionClosed`] message. + reconnect_timeout: Duration, + + /// Stores [`RpcConnection`] drop tasks. + /// If [`RpcConnection`] is lost, [`Room`] waits for connection_timeout + /// before dropping it irrevocably in case it gets reestablished. + drop_connection_tasks: HashMap, +} + +impl ParticipantService { + pub fn new( + members: HashMap, + reconnect_timeout: Duration, + ) -> Self { + Self { + members, + connections: HashMap::new(), + reconnect_timeout, + drop_connection_tasks: HashMap::new(), + } + } + + /// Lookup [`Member`] by provided id and credentials. Returns + /// [`Err(AuthorizationError::MemberNotExists)`] if lookup by [`MemberId`] + /// failed. Returns [`Err(AuthorizationError::InvalidCredentials)`] if + /// [`Member`] was found, but incorrect credentials was provided. + pub fn get_member_by_id_and_credentials( + &self, + member_id: MemberId, + credentials: &str, + ) -> Result<&Member, AuthorizationError> { + match self.members.get(&member_id) { + Some(ref member) => { + if member.credentials.eq(credentials) { + Ok(member) + } else { + Err(AuthorizationError::InvalidCredentials) + } + } + None => Err(AuthorizationError::MemberNotExists), + } + } + + /// Checks if [`Member`] has **active** [`RcpConnection`]. + pub fn member_has_connection(&self, member_id: MemberId) -> bool { + self.connections.contains_key(&member_id) + && !self.drop_connection_tasks.contains_key(&member_id) + } + + /// Send [`Event`] to specified remote [`Member`]. + pub fn send_event_to_member( + &mut self, + member_id: MemberId, + event: Event, + ) -> impl Future { + match self.connections.get(&member_id) { + Some(conn) => Either::A( + conn.send_event(event) + .map_err(move |_| RoomError::UnableToSendEvent(member_id)), + ), + None => Either::B(future::err(RoomError::ConnectionNotExists( + member_id, + ))), + } + } + + /// If [`ClosedReason::Closed`], then removes [`RpcConnection`] associated + /// with specified user [`Member`] from the storage and closes the room. + /// If [`ClosedReason::Lost`], then creates delayed task that emits + /// [`ClosedReason::Closed`]. + // TODO: Dont close the room. It is being closed atm, because we have + // no way to handle absence of RtcPeerConnection when. + pub fn connection_closed( + &mut self, + ctx: &mut Context, + member_id: MemberId, + reason: &ClosedReason, + ) { + let closed_at = Instant::now(); + match reason { + ClosedReason::Closed => { + self.connections.remove(&member_id); + ctx.notify(CloseRoom {}) + } + ClosedReason::Lost => { + self.drop_connection_tasks.insert( + member_id, + ctx.run_later(self.reconnect_timeout, move |_, ctx| { + info!( + "Member {} connection lost at {:?}. Room will be \ + stopped.", + member_id, closed_at + ); + ctx.notify(RpcConnectionClosed { + member_id, + reason: ClosedReason::Closed, + }) + }), + ); + } + } + } + + /// Stores provided [`RpcConnection`] for given [`Member`] in the [`Room`]. + /// If [`Member`] already has any other [`RpcConnection`], + /// then it will be closed. + pub fn connection_established( + &mut self, + ctx: &mut Context, + member_id: MemberId, + con: Box, + ) { + // lookup previous member connection + if let Some(mut connection) = self.connections.remove(&member_id) { + debug!("Closing old RpcConnection for member {}", member_id); + + // cancel RpcConnection close task, since connection is + // reestablished + if let Some(handler) = self.drop_connection_tasks.remove(&member_id) + { + ctx.cancel_future(handler); + } + ctx.spawn(wrap_future(connection.close())); + } else { + self.connections.insert(member_id, con); + } + } + + /// Cancels all connection close tasks, closes all [`RpcConnection`]s. + pub fn drop_connections( + &mut self, + ctx: &mut Context, + ) -> impl Future { + self.drop_connection_tasks.drain().for_each(|(_, handle)| { + ctx.cancel_future(handle); + }); + + let close_fut = self.connections.drain().fold( + vec![], + |mut futures, (_, mut connection)| { + futures.push(connection.close()); + futures + }, + ); + + join_all(close_fut).map(|_| ()) + } +} diff --git a/src/signalling/peers.rs b/src/signalling/peers.rs new file mode 100644 index 000000000..710de7909 --- /dev/null +++ b/src/signalling/peers.rs @@ -0,0 +1,89 @@ +//! Repository that stores [`Room`]s [`Peer`]s. + +use hashbrown::HashMap; + +use std::convert::{TryFrom, TryInto}; + +use crate::{ + api::control::MemberId, + media::{Peer, PeerId, PeerStateMachine}, + signalling::room::RoomError, +}; + +#[derive(Debug)] +pub struct PeerRepository { + /// [`Peer`]s of [`Member`]s in this [`Room`]. + peers: HashMap, +} + +impl PeerRepository { + /// Store [`Peer`] in [`Room`]. + pub fn add_peer>(&mut self, id: PeerId, peer: S) { + self.peers.insert(id, peer.into()); + } + + /// Returns borrowed [`PeerStateMachine`] by its ID. + pub fn get_peer( + &self, + peer_id: PeerId, + ) -> Result<&PeerStateMachine, RoomError> { + self.peers + .get(&peer_id) + .ok_or_else(|| RoomError::PeerNotFound(peer_id)) + } + + /// Returns borrowed [`Peer`] by its ID. + pub fn get_inner_peer<'a, S>( + &'a self, + peer_id: PeerId, + ) -> Result<&'a Peer, RoomError> + where + &'a Peer: std::convert::TryFrom<&'a PeerStateMachine>, + <&'a Peer as TryFrom<&'a PeerStateMachine>>::Error: Into, + { + match self.peers.get(&peer_id) { + Some(peer) => peer.try_into().map_err(Into::into), + None => Err(RoomError::PeerNotFound(peer_id)), + } + } + + /// Returns [`Peer`] of specified [`Member`]. + /// + /// Panic if [`Peer`] not exists. + pub fn get_peers_by_member_id( + &self, + member_id: MemberId, + ) -> Vec<&PeerStateMachine> { + self.peers + .iter() + .filter_map(|(_, peer)| { + if peer.member_id() == member_id { + Some(peer) + } else { + None + } + }) + .collect() + } + + /// Returns owned [`Peer`] by its ID. + pub fn take_inner_peer( + &mut self, + peer_id: PeerId, + ) -> Result, RoomError> + where + Peer: TryFrom, + as TryFrom>::Error: Into, + { + match self.peers.remove(&peer_id) { + Some(peer) => peer.try_into().map_err(Into::into), + None => Err(RoomError::PeerNotFound(peer_id)), + } + } +} + +impl From> for PeerRepository { + fn from(map: HashMap) -> Self { + Self { peers: map } + } +} diff --git a/src/signalling/room.rs b/src/signalling/room.rs new file mode 100644 index 000000000..a76d12129 --- /dev/null +++ b/src/signalling/room.rs @@ -0,0 +1,531 @@ +//! Room definitions and implementations. Room is responsible for media +//! connection establishment between concrete [`Member`]s. + +use actix::{ + fut::wrap_future, Actor, ActorFuture, AsyncContext, Context, Handler, + Message, +}; +use failure::Fail; +use futures::future; +use hashbrown::HashMap; + +use std::time::Duration; + +use crate::{ + api::{ + client::rpc_connection::{ + AuthorizationError, Authorize, RpcConnectionClosed, + RpcConnectionEstablished, + }, + control::{Member, MemberId}, + protocol::{Command, Event}, + }, + log::prelude::*, + media::{ + New, Peer, PeerId, PeerStateError, PeerStateMachine, + WaitLocalHaveRemote, WaitLocalSdp, WaitRemoteSdp, + }, + signalling::{participants::ParticipantService, peers::PeerRepository}, +}; + +/// ID of [`Room`]. +pub type Id = u64; + +/// Ergonomic type alias for using [`ActorFuture`] for [`Room`]. +type ActFuture = Box>; + +#[derive(Fail, Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum RoomError { + #[fail(display = "Couldn't find Peer with [id = {}]", _0)] + PeerNotFound(PeerId), + #[fail(display = "Couldn't find RpcConnection with Member [id = {}]", _0)] + ConnectionNotExists(MemberId), + #[fail(display = "Unable to send event to Member [id = {}]", _0)] + UnableToSendEvent(MemberId), + #[fail(display = "PeerError: {}", _0)] + PeerStateError(PeerStateError), + #[fail(display = "Generic room error {}", _0)] + BadRoomSpec(String), +} + +impl From for RoomError { + fn from(err: PeerStateError) -> Self { + RoomError::PeerStateError(err) + } +} + +/// Media server room with its [`Member`]s. +#[derive(Debug)] +pub struct Room { + id: Id, + + /// [`RpcConnection`]s of [`Member`]s in this [`Room`]. + participants: ParticipantService, + + /// [`Peer`]s of [`Member`]s in this [`Room`]. + peers: PeerRepository, +} + +impl Room { + /// Create new instance of [`Room`]. + pub fn new( + id: Id, + members: HashMap, + peers: HashMap, + reconnect_timeout: Duration, + ) -> Self { + Self { + id, + peers: PeerRepository::from(peers), + participants: ParticipantService::new(members, reconnect_timeout), + } + } + + /// Sends [`Event::PeerCreated`] to one of specified [`Peer`]s based on + /// which of them has any outbound tracks. That [`Peer`] state will be + /// changed to [`WaitLocalSdp`] state. Both provided peers must be in + /// [`New`] state. At least one of provided peers must have outbound + /// tracks. + fn send_peer_created( + &mut self, + peer1_id: PeerId, + peer2_id: PeerId, + ) -> Result, RoomError> { + let peer1: Peer = self.peers.take_inner_peer(peer1_id)?; + let peer2: Peer = self.peers.take_inner_peer(peer2_id)?; + + // decide which peer is sender + let (sender, receiver) = if peer1.is_sender() { + (peer1, peer2) + } else if peer2.is_sender() { + (peer2, peer1) + } else { + self.peers.add_peer(peer1.id(), peer1); + self.peers.add_peer(peer2.id(), peer2); + return Err(RoomError::BadRoomSpec(format!( + "Error while trying to connect Peer [id = {}] and Peer [id = \ + {}] cause neither of peers are senders", + peer1_id, peer2_id + ))); + }; + self.peers.add_peer(receiver.id(), receiver); + + let sender = sender.start(); + let member_id = sender.member_id(); + let peer_created = Event::PeerCreated { + peer_id: sender.id(), + sdp_offer: None, + tracks: sender.tracks(), + }; + self.peers.add_peer(sender.id(), sender); + Ok(Box::new(wrap_future( + self.participants + .send_event_to_member(member_id, peer_created), + ))) + } + + /// Sends [`Event::PeerCreated`] to provided [`Peer`] partner. Provided + /// [`Peer`] state must be [`WaitLocalSdp`] and will be changed to + /// [`WaitRemoteSdp`], partners [`Peer`] state must be [`New`] and will be + /// changed to [`WaitLocalHaveRemote`]. + fn handle_make_sdp_offer( + &mut self, + from_peer_id: PeerId, + sdp_offer: String, + ) -> Result, RoomError> { + let from_peer: Peer = + self.peers.take_inner_peer(from_peer_id)?; + let to_peer_id = from_peer.partner_peer_id(); + let to_peer: Peer = self.peers.take_inner_peer(to_peer_id)?; + + let from_peer = from_peer.set_local_sdp(sdp_offer.clone()); + let to_peer = to_peer.set_remote_sdp(sdp_offer.clone()); + + let to_member_id = to_peer.member_id(); + let event = Event::PeerCreated { + peer_id: to_peer_id, + sdp_offer: Some(sdp_offer), + tracks: to_peer.tracks(), + }; + + self.peers.add_peer(from_peer_id, from_peer); + self.peers.add_peer(to_peer_id, to_peer); + Ok(Box::new(wrap_future( + self.participants.send_event_to_member(to_member_id, event), + ))) + } + + /// Sends [`Event::SdpAnswerMade`] to provided [`Peer`] partner. Provided + /// [`Peer`] state must be [`WaitLocalHaveRemote`] and will be changed to + /// [`Stable`], partners [`Peer`] state must be [`WaitRemoteSdp`] and will + /// be changed to [`Stable`]. + fn handle_make_sdp_answer( + &mut self, + from_peer_id: PeerId, + sdp_answer: String, + ) -> Result, RoomError> { + let from_peer: Peer = + self.peers.take_inner_peer(from_peer_id)?; + let to_peer_id = from_peer.partner_peer_id(); + let to_peer: Peer = + self.peers.take_inner_peer(to_peer_id)?; + + let from_peer = from_peer.set_local_sdp(sdp_answer.clone()); + let to_peer = to_peer.set_remote_sdp(&sdp_answer); + + let to_member_id = to_peer.member_id(); + let event = Event::SdpAnswerMade { + peer_id: to_peer_id, + sdp_answer, + }; + + self.peers.add_peer(from_peer_id, from_peer); + self.peers.add_peer(to_peer_id, to_peer); + + Ok(Box::new(wrap_future( + self.participants.send_event_to_member(to_member_id, event), + ))) + } + + /// Sends [`Event::IceCandidateDiscovered`] to provided [`Peer`] partner. + /// Both [`Peer`]s may have any state except [`New`]. + fn handle_set_ice_candidate( + &mut self, + from_peer_id: PeerId, + candidate: String, + ) -> Result, RoomError> { + let from_peer = self.peers.get_peer(from_peer_id)?; + if let PeerStateMachine::New(_) = from_peer { + return Err(RoomError::PeerStateError(PeerStateError::WrongState( + from_peer_id, + "Not New", + format!("{}", from_peer), + ))); + } + + let to_peer_id = from_peer.partner_peer_id(); + let to_peer = self.peers.get_peer(to_peer_id)?; + if let PeerStateMachine::New(_) = to_peer { + return Err(RoomError::PeerStateError(PeerStateError::WrongState( + to_peer_id, + "Not New", + format!("{}", to_peer), + ))); + } + + let to_member_id = to_peer.member_id(); + let event = Event::IceCandidateDiscovered { + peer_id: to_peer_id, + candidate, + }; + + Ok(Box::new(wrap_future( + self.participants.send_event_to_member(to_member_id, event), + ))) + } +} + +/// [`Actor`] implementation that provides an ergonomic way +/// to interact with [`Room`]. +impl Actor for Room { + type Context = Context; +} + +impl Handler for Room { + type Result = Result<(), AuthorizationError>; + + /// Responses with `Ok` if `RpcConnection` is authorized, otherwise `Err`s. + fn handle( + &mut self, + msg: Authorize, + _ctx: &mut Self::Context, + ) -> Self::Result { + self.participants + .get_member_by_id_and_credentials(msg.member_id, &msg.credentials) + .map(|_| ()) + } +} + +/// Signal of start signaling between specified [`Peer`]'s. +#[derive(Debug, Message)] +#[rtype(result = "Result<(), ()>")] +pub struct ConnectPeers(PeerId, PeerId); + +impl Handler for Room { + type Result = ActFuture<(), ()>; + + /// Check state of interconnected [`Peer`]s and sends [`Event`] about + /// [`Peer`] created to remote [`Member`]. + fn handle( + &mut self, + msg: ConnectPeers, + ctx: &mut Self::Context, + ) -> Self::Result { + match self.send_peer_created(msg.0, msg.1) { + Ok(res) => { + Box::new(res.map_err(|err, _, ctx: &mut Context| { + error!( + "Failed handle command, because {}. Room will be \ + stopped.", + err + ); + ctx.notify(CloseRoom {}) + })) + } + Err(err) => { + error!( + "Failed handle command, because {}. Room will be stopped.", + err + ); + ctx.notify(CloseRoom {}); + Box::new(wrap_future(future::ok(()))) + } + } + } +} + +impl Handler for Room { + type Result = ActFuture<(), ()>; + + /// Receives [`Command`] from Web client and passes it to corresponding + /// handlers. Will emit [`CloseRoom`] on any error. + fn handle( + &mut self, + command: Command, + ctx: &mut Self::Context, + ) -> Self::Result { + let result = match command { + Command::MakeSdpOffer { peer_id, sdp_offer } => { + self.handle_make_sdp_offer(peer_id, sdp_offer) + } + Command::MakeSdpAnswer { + peer_id, + sdp_answer, + } => self.handle_make_sdp_answer(peer_id, sdp_answer), + Command::SetIceCandidate { peer_id, candidate } => { + self.handle_set_ice_candidate(peer_id, candidate) + } + }; + + match result { + Ok(res) => { + Box::new(res.map_err(|err, _, ctx: &mut Context| { + error!( + "Failed handle command, because {}. Room will be \ + stopped.", + err + ); + ctx.notify(CloseRoom {}) + })) + } + Err(err) => { + error!( + "Failed handle command, because {}. Room will be stopped.", + err + ); + ctx.notify(CloseRoom {}); + Box::new(wrap_future(future::ok(()))) + } + } + } +} + +impl Handler for Room { + type Result = ActFuture<(), ()>; + + /// Saves new [`RpcConnection`] in [`ParticipantService`], initiates media + /// establishment between members. + fn handle( + &mut self, + msg: RpcConnectionEstablished, + ctx: &mut Self::Context, + ) -> Self::Result { + info!("RpcConnectionEstablished for member {}", msg.member_id); + + // save new connection + self.participants.connection_established( + ctx, + msg.member_id, + msg.connection, + ); + + // get connected member Peers + self.peers + .get_peers_by_member_id(msg.member_id) + .into_iter() + .for_each(|peer| { + // only New peers should be connected + if let PeerStateMachine::New(peer) = peer { + if self + .participants + .member_has_connection(peer.partner_member_id()) + { + ctx.notify(ConnectPeers( + peer.id(), + peer.partner_peer_id(), + )); + } + } + }); + + Box::new(wrap_future(future::ok(()))) + } +} + +/// Signal of close [`Room`]. +#[derive(Debug, Message)] +#[rtype(result = "()")] +#[allow(clippy::module_name_repetitions)] +pub struct CloseRoom {} + +impl Handler for Room { + type Result = (); + + /// Sends to remote [`Member`] the [`Event`] about [`Peer`] removed. + /// Closes all active [`RpcConnection`]s. + fn handle( + &mut self, + _msg: CloseRoom, + ctx: &mut Self::Context, + ) -> Self::Result { + info!("Closing Room [id = {:?}]", self.id); + let drop_fut = self.participants.drop_connections(ctx); + ctx.wait(wrap_future(drop_fut)); + } +} + +impl Handler for Room { + type Result = (); + + /// Passes message to [`ParticipantService`] to cleanup stored connections. + fn handle(&mut self, msg: RpcConnectionClosed, ctx: &mut Self::Context) { + info!( + "RpcConnectionClosed for member {}, reason {:?}", + msg.member_id, msg.reason + ); + + self.participants + .connection_closed(ctx, msg.member_id, &msg.reason); + } +} + +#[cfg(test)] +mod test { + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }; + + use actix::{Addr, Arbiter, System}; + + use super::*; + use crate::{ + api::protocol::{ + AudioSettings, Direction, Directional, MediaType, VideoSettings, + }, + media::create_peers, + }; + + use crate::api::client::rpc_connection::test::TestConnection; + + fn start_room() -> Addr { + let members = hashmap! { + 1 => Member{id: 1, credentials: "caller_credentials".to_owned()}, + 2 => Member{id: 2, credentials: "responder_credentials".to_owned()}, + }; + Arbiter::start(move |_| { + Room::new(1, members, create_peers(1, 2), Duration::from_secs(10)) + }) + } + + #[test] + fn start_signaling() { + let stopped = Arc::new(AtomicUsize::new(0)); + let caller_events = Arc::new(Mutex::new(vec![])); + let caller_events_clone = Arc::clone(&caller_events); + let responder_events = Arc::new(Mutex::new(vec![])); + let responder_events_clone = Arc::clone(&responder_events); + + System::run(move || { + let room = start_room(); + let room_clone = room.clone(); + let stopped_clone = stopped.clone(); + Arbiter::start(move |_| TestConnection { + events: caller_events_clone, + member_id: 1, + room: room_clone, + stopped: stopped_clone, + }); + Arbiter::start(move |_| TestConnection { + events: responder_events_clone, + member_id: 2, + room, + stopped, + }); + }); + + let caller_events = caller_events.lock().unwrap(); + let responder_events = responder_events.lock().unwrap(); + assert_eq!( + caller_events.to_vec(), + vec![ + serde_json::to_string(&Event::PeerCreated { + peer_id: 1, + sdp_offer: None, + tracks: vec![ + Directional { + id: 1, + direction: Direction::Send { receivers: vec![2] }, + media_type: MediaType::Audio(AudioSettings {}), + }, + Directional { + id: 2, + direction: Direction::Send { receivers: vec![2] }, + media_type: MediaType::Video(VideoSettings {}), + }, + ], + }) + .unwrap(), + serde_json::to_string(&Event::SdpAnswerMade { + peer_id: 1, + sdp_answer: "responder_answer".into(), + }) + .unwrap(), + serde_json::to_string(&Event::IceCandidateDiscovered { + peer_id: 1, + candidate: "ice_candidate".into(), + }) + .unwrap(), + ] + ); + + assert_eq!( + responder_events.to_vec(), + vec![ + serde_json::to_string(&Event::PeerCreated { + peer_id: 2, + sdp_offer: Some("caller_offer".into()), + tracks: vec![ + Directional { + id: 1, + direction: Direction::Recv { sender: 1 }, + media_type: MediaType::Audio(AudioSettings {}), + }, + Directional { + id: 2, + direction: Direction::Recv { sender: 1 }, + media_type: MediaType::Video(VideoSettings {}), + }, + ], + }) + .unwrap(), + serde_json::to_string(&Event::IceCandidateDiscovered { + peer_id: 2, + candidate: "ice_candidate".into(), + }) + .unwrap(), + ] + ); + } +} diff --git a/src/signalling/room_repo.rs b/src/signalling/room_repo.rs new file mode 100644 index 000000000..f0df8ead4 --- /dev/null +++ b/src/signalling/room_repo.rs @@ -0,0 +1,31 @@ +//! Repository that stores [`Room`]s addresses. + +use actix::Addr; +use hashbrown::HashMap; + +use std::sync::{Arc, Mutex}; + +use crate::signalling::{Room, RoomId}; + +/// Repository that stores [`Room`]s addresses. +#[derive(Clone, Default)] +pub struct RoomsRepository { + // TODO: Use crossbeam's concurrent hashmap when its done. + // [Tracking](https://github.com/crossbeam-rs/rfcs/issues/32). + rooms: Arc>>>, +} + +impl RoomsRepository { + /// Creates new [`Room`]s repository with passed-in [`Room`]s. + pub fn new(rooms: HashMap>) -> Self { + Self { + rooms: Arc::new(Mutex::new(rooms)), + } + } + + /// Returns [`Room`] by its ID. + pub fn get(&self, id: RoomId) -> Option> { + let rooms = self.rooms.lock().unwrap(); + rooms.get(&id).cloned() + } +}