diff --git a/crates/alerion_core/src/servers.rs b/crates/alerion_core/src/servers.rs index b6604ed..2ae6e90 100644 --- a/crates/alerion_core/src/servers.rs +++ b/crates/alerion_core/src/servers.rs @@ -5,11 +5,11 @@ use std::time::Instant; use actix_web::HttpResponse; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use uuid::Uuid; use crate::config::AlerionConfig; -use crate::websocket::conn::{ConnectionAddr, PanelMessage}; +use crate::websocket::conn::{ConnectionAddr, NetworkStatistics, PanelMessage, PerformanceStatisics, ServerMessage, ServerStatus}; use crate::websocket::relay::{AuthTracker, ClientConnection, ServerConnection}; pub struct ServerPoolBuilder { @@ -19,7 +19,6 @@ pub struct ServerPoolBuilder { impl ServerPoolBuilder { pub fn from_config(config: &AlerionConfig) -> Self { - Self { servers: HashMap::new(), remote_api: Arc::new(remote::RemoteClient::new(config)), @@ -37,7 +36,7 @@ impl ServerPoolBuilder { let uuid = s.uuid; let info = ServerInfo::from_remote_info(s.settings); let server = Server::new(uuid, info, Arc::clone(&self.remote_api)); - self.servers.insert(uuid, Arc::new(server)); + self.servers.insert(uuid, server); } Ok(self) @@ -85,7 +84,7 @@ impl ServerPool { let server_info = ServerInfo::from_remote_info(config.settings); - let server = Arc::new(Server::new(uuid, server_info, remote_api)); + let server = Server::new(uuid, server_info, remote_api); self.servers.write().await.insert(uuid, Arc::clone(&server)); Ok(server) @@ -113,28 +112,31 @@ pub struct Server { uuid: Uuid, container_id: String, websocket_id_counter: AtomicU32, - websockets: Mutex>, + websockets: RwLock>, sender_copy: Sender<(u32, PanelMessage)>, server_info: ServerInfo, remote_api: Arc, } impl Server { - pub fn new(uuid: Uuid, server_info: ServerInfo, remote_api: Arc) -> Self { + pub fn new(uuid: Uuid, server_info: ServerInfo, remote_api: Arc) -> Arc { let (send, recv) = channel(128); - tokio::spawn(task_websocket_receiver(recv)); - - Self { + let server = Arc::new(Self { start_time: Instant::now(), uuid, container_id: format!("{}_container", uuid.as_hyphenated()), websocket_id_counter: AtomicU32::new(0), - websockets: Mutex::new(HashMap::new()), + websockets: RwLock::new(HashMap::new()), sender_copy: send, server_info, remote_api, - } + }); + + tokio::spawn(task_websocket_receiver(recv)); + tokio::spawn(monitor_performance_metrics(Arc::clone(&server))); + + server } pub async fn setup_new_websocket(&self, start_websocket: F) -> actix_web::Result @@ -153,18 +155,47 @@ impl Server { // add the obtained reply channel to the list of websocket connections let client_conn = ClientConnection::new(auth_tracker, addr); - let mut websockets = self.websockets.lock().await; + let mut websockets = self.websockets.write().await; websockets.insert(id, client_conn); // give back the HTTP 101 response Ok(response) } + pub async fn send_to_available_websockets(&self, msg: ServerMessage) { + let lock = self.websockets.read().await; + + for sender in lock.values() { + sender.send_if_authenticated(|| msg.clone()); + } + } + pub fn server_time(&self) -> u64 { self.start_time.elapsed().as_millis() as u64 } } +async fn monitor_performance_metrics(server: Arc) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let stats = PerformanceStatisics { + memory_bytes: server.server_time() as usize, + memory_limit_bytes: 1024usize.pow(3) * 8, + cpu_absolute: 50.11, + network: NetworkStatistics { + rx_bytes: 1024, + tx_bytes: 800, + }, + uptime: 5000 + server.server_time(), + state: ServerStatus::Running, + disk_bytes: 100, + }; + + server.send_to_available_websockets(ServerMessage::Stats(stats)).await; + } +} + async fn task_websocket_receiver(mut receiver: Receiver<(u32, PanelMessage)>) { loop { if let Some(msg) = receiver.recv().await { diff --git a/crates/alerion_core/src/websocket.rs b/crates/alerion_core/src/websocket.rs index c073dc3..c00f2c4 100644 --- a/crates/alerion_core/src/websocket.rs +++ b/crates/alerion_core/src/websocket.rs @@ -5,7 +5,6 @@ use uuid::Uuid; use crate::config::AlerionConfig; - pub fn start_websocket( server_uuid: Uuid, config: &AlerionConfig, diff --git a/crates/alerion_core/src/websocket/conn.rs b/crates/alerion_core/src/websocket/conn.rs index cd8be0e..ac2fca6 100644 --- a/crates/alerion_core/src/websocket/conn.rs +++ b/crates/alerion_core/src/websocket/conn.rs @@ -5,6 +5,7 @@ use actix_web_actors::ws; use bytestring::ByteString; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use smallvec::{smallvec, SmallVec}; use crate::config::AlerionConfig; @@ -19,12 +20,43 @@ macro_rules! impl_infallible_message { }; } -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Serialize, PartialEq, Eq)] +pub enum ServerStatus { + #[serde(rename = "running")] + Running, + #[serde(rename = "starting")] + Starting, + #[serde(rename = "stopping")] + Stopping, + #[serde(rename = "offline")] + Offline, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NetworkStatistics { + pub rx_bytes: usize, + pub tx_bytes: usize, +} + +#[derive(Debug, Clone, Serialize)] +pub struct PerformanceStatisics { + pub memory_bytes: usize, + pub memory_limit_bytes: usize, + pub cpu_absolute: f64, + pub network: NetworkStatistics, + pub uptime: u64, + pub state: ServerStatus, + pub disk_bytes: usize, +} + +#[derive(Debug, Clone)] pub enum ServerMessage { Kill, + Logs(String), + Stats(PerformanceStatisics), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum PanelMessage { Command(String), ReceiveLogs, @@ -35,7 +67,42 @@ pub enum PanelMessage { pub struct RawMessage { event: EventType, #[serde(default, skip_serializing_if = "Option::is_none")] - args: Option, + args: Option>, +} + +impl From for ByteString { + fn from(value: RawMessage) -> Self { + // there is no way this could fail, right + serde_json::to_string(&value).unwrap().into() + } +} + +impl RawMessage { + pub fn new_no_args(event: EventType) -> Self { + Self { event, args: None } + } + + pub fn new(event: EventType, args: String) -> Self { + Self { event, args: Some(smallvec![serde_json::Value::String(args)]) } + } + + pub fn into_first_arg(self) -> Option { + let mut args = self.args?; + let json_str = args.get_mut(0)?.take(); + + match json_str { + serde_json::Value::String(s) => Some(s), + _ => None, + } + } + + pub fn event(&self) -> EventType { + self.event + } + + pub fn into_args(self) -> Option> { + self.args + } } impl_infallible_message!(ServerMessage); @@ -136,37 +203,6 @@ pub enum EventType { JwtError, } -impl From for ByteString { - fn from(value: RawMessage) -> Self { - // there is no way this could fail, right - serde_json::to_string(&value).unwrap().into() - } -} - -impl RawMessage { - pub fn new_no_args(event: EventType) -> Self { - Self { event, args: None } - } - - pub fn into_first_arg(self) -> Option { - let mut args = self.args?; - let json_str = args.get_mut(0)?.take(); - - match json_str { - serde_json::Value::String(s) => Some(s), - _ => None, - } - } - - pub fn event(&self) -> EventType { - self.event - } - - pub fn into_args(self) -> Option { - self.args - } -} - pub type ConnectionAddr = Addr; pub struct WebsocketConnectionImpl { @@ -188,6 +224,14 @@ impl Handler for WebsocketConnectionImpl { ctx.close(None); ctx.stop(); } + + ServerMessage::Logs(logs) => { + ctx.text(RawMessage::new(EventType::Logs, logs)); + } + + ServerMessage::Stats(stats) => { + ctx.text(RawMessage::new(EventType::Stats, serde_json::to_string(&stats).unwrap())) + } } Ok(()) diff --git a/crates/alerion_core/src/websocket/relay.rs b/crates/alerion_core/src/websocket/relay.rs index fa4ab84..147000e 100644 --- a/crates/alerion_core/src/websocket/relay.rs +++ b/crates/alerion_core/src/websocket/relay.rs @@ -57,6 +57,16 @@ impl ClientConnection { } } + pub fn send_if_authenticated(&self, msg: F) + where + F: FnOnce() -> ServerMessage + { + if self.auth_tracker.get_auth() { + let m = msg(); + self.ws_sender.do_send(m); + } + } + /// Terminate the connection on the server's side. /// /// There could be a condition where the server tries to terminate the connection,