Skip to content

Commit

Permalink
chore: fix some websocket stuff again
Browse files Browse the repository at this point in the history
  • Loading branch information
fetchfern committed Apr 17, 2024
1 parent b1763e1 commit c43ac69
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 48 deletions.
57 changes: 44 additions & 13 deletions crates/alerion_core/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::time::Instant;

use actix_web::HttpResponse;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio::sync::RwLock;
use uuid::Uuid;

use crate::config::AlerionConfig;
use crate::websocket::conn::{ConnectionAddr, PanelMessage};
use crate::websocket::conn::{ConnectionAddr, NetworkStatistics, PanelMessage, PerformanceStatisics, ServerMessage, ServerStatus};
use crate::websocket::relay::{AuthTracker, ClientConnection, ServerConnection};

pub struct ServerPoolBuilder {
Expand All @@ -19,7 +19,6 @@ pub struct ServerPoolBuilder {

impl ServerPoolBuilder {
pub fn from_config(config: &AlerionConfig) -> Self {

Self {
servers: HashMap::new(),
remote_api: Arc::new(remote::RemoteClient::new(config)),
Expand All @@ -37,7 +36,7 @@ impl ServerPoolBuilder {
let uuid = s.uuid;
let info = ServerInfo::from_remote_info(s.settings);
let server = Server::new(uuid, info, Arc::clone(&self.remote_api));
self.servers.insert(uuid, Arc::new(server));
self.servers.insert(uuid, server);
}

Ok(self)
Expand Down Expand Up @@ -85,7 +84,7 @@ impl ServerPool {

let server_info = ServerInfo::from_remote_info(config.settings);

let server = Arc::new(Server::new(uuid, server_info, remote_api));
let server = Server::new(uuid, server_info, remote_api);
self.servers.write().await.insert(uuid, Arc::clone(&server));

Ok(server)
Expand Down Expand Up @@ -113,28 +112,31 @@ pub struct Server {
uuid: Uuid,
container_id: String,
websocket_id_counter: AtomicU32,
websockets: Mutex<HashMap<u32, ClientConnection>>,
websockets: RwLock<HashMap<u32, ClientConnection>>,
sender_copy: Sender<(u32, PanelMessage)>,
server_info: ServerInfo,
remote_api: Arc<remote::RemoteClient>,
}

impl Server {
pub fn new(uuid: Uuid, server_info: ServerInfo, remote_api: Arc<remote::RemoteClient>) -> Self {
pub fn new(uuid: Uuid, server_info: ServerInfo, remote_api: Arc<remote::RemoteClient>) -> Arc<Self> {
let (send, recv) = channel(128);

tokio::spawn(task_websocket_receiver(recv));

Self {
let server = Arc::new(Self {
start_time: Instant::now(),
uuid,
container_id: format!("{}_container", uuid.as_hyphenated()),
websocket_id_counter: AtomicU32::new(0),
websockets: Mutex::new(HashMap::new()),
websockets: RwLock::new(HashMap::new()),
sender_copy: send,
server_info,
remote_api,
}
});

tokio::spawn(task_websocket_receiver(recv));
tokio::spawn(monitor_performance_metrics(Arc::clone(&server)));

server
}

pub async fn setup_new_websocket<F>(&self, start_websocket: F) -> actix_web::Result<HttpResponse>
Expand All @@ -153,18 +155,47 @@ impl Server {

// add the obtained reply channel to the list of websocket connections
let client_conn = ClientConnection::new(auth_tracker, addr);
let mut websockets = self.websockets.lock().await;
let mut websockets = self.websockets.write().await;
websockets.insert(id, client_conn);

// give back the HTTP 101 response
Ok(response)
}

pub async fn send_to_available_websockets(&self, msg: ServerMessage) {
let lock = self.websockets.read().await;

for sender in lock.values() {
sender.send_if_authenticated(|| msg.clone());
}
}

pub fn server_time(&self) -> u64 {
self.start_time.elapsed().as_millis() as u64
}
}

async fn monitor_performance_metrics(server: Arc<Server>) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

let stats = PerformanceStatisics {
memory_bytes: server.server_time() as usize,
memory_limit_bytes: 1024usize.pow(3) * 8,
cpu_absolute: 50.11,
network: NetworkStatistics {
rx_bytes: 1024,
tx_bytes: 800,
},
uptime: 5000 + server.server_time(),
state: ServerStatus::Running,
disk_bytes: 100,
};

server.send_to_available_websockets(ServerMessage::Stats(stats)).await;
}
}

async fn task_websocket_receiver(mut receiver: Receiver<(u32, PanelMessage)>) {
loop {
if let Some(msg) = receiver.recv().await {
Expand Down
1 change: 0 additions & 1 deletion crates/alerion_core/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use uuid::Uuid;

use crate::config::AlerionConfig;


pub fn start_websocket(
server_uuid: Uuid,
config: &AlerionConfig,
Expand Down
112 changes: 78 additions & 34 deletions crates/alerion_core/src/websocket/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use actix_web_actors::ws;
use bytestring::ByteString;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use smallvec::{smallvec, SmallVec};

use crate::config::AlerionConfig;

Expand All @@ -19,12 +20,43 @@ macro_rules! impl_infallible_message {
};
}

#[derive(Debug)]
#[derive(Copy, Clone, Debug, Serialize, PartialEq, Eq)]
pub enum ServerStatus {
#[serde(rename = "running")]
Running,
#[serde(rename = "starting")]
Starting,
#[serde(rename = "stopping")]
Stopping,
#[serde(rename = "offline")]
Offline,
}

#[derive(Debug, Clone, Serialize)]
pub struct NetworkStatistics {
pub rx_bytes: usize,
pub tx_bytes: usize,
}

#[derive(Debug, Clone, Serialize)]
pub struct PerformanceStatisics {
pub memory_bytes: usize,
pub memory_limit_bytes: usize,
pub cpu_absolute: f64,
pub network: NetworkStatistics,
pub uptime: u64,
pub state: ServerStatus,
pub disk_bytes: usize,
}

#[derive(Debug, Clone)]
pub enum ServerMessage {
Kill,
Logs(String),
Stats(PerformanceStatisics),
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum PanelMessage {
Command(String),
ReceiveLogs,
Expand All @@ -35,7 +67,42 @@ pub enum PanelMessage {
pub struct RawMessage {
event: EventType,
#[serde(default, skip_serializing_if = "Option::is_none")]
args: Option<serde_json::Value>,
args: Option<SmallVec<[serde_json::Value; 1]>>,
}

impl From<RawMessage> for ByteString {
fn from(value: RawMessage) -> Self {
// there is no way this could fail, right
serde_json::to_string(&value).unwrap().into()
}
}

impl RawMessage {
pub fn new_no_args(event: EventType) -> Self {
Self { event, args: None }
}

pub fn new(event: EventType, args: String) -> Self {
Self { event, args: Some(smallvec![serde_json::Value::String(args)]) }
}

pub fn into_first_arg(self) -> Option<String> {
let mut args = self.args?;
let json_str = args.get_mut(0)?.take();

match json_str {
serde_json::Value::String(s) => Some(s),
_ => None,
}
}

pub fn event(&self) -> EventType {
self.event
}

pub fn into_args(self) -> Option<SmallVec<[serde_json::Value; 1]>> {
self.args
}
}

impl_infallible_message!(ServerMessage);
Expand Down Expand Up @@ -136,37 +203,6 @@ pub enum EventType {
JwtError,
}

impl From<RawMessage> for ByteString {
fn from(value: RawMessage) -> Self {
// there is no way this could fail, right
serde_json::to_string(&value).unwrap().into()
}
}

impl RawMessage {
pub fn new_no_args(event: EventType) -> Self {
Self { event, args: None }
}

pub fn into_first_arg(self) -> Option<String> {
let mut args = self.args?;
let json_str = args.get_mut(0)?.take();

match json_str {
serde_json::Value::String(s) => Some(s),
_ => None,
}
}

pub fn event(&self) -> EventType {
self.event
}

pub fn into_args(self) -> Option<serde_json::Value> {
self.args
}
}

pub type ConnectionAddr = Addr<WebsocketConnectionImpl>;

pub struct WebsocketConnectionImpl {
Expand All @@ -188,6 +224,14 @@ impl Handler<ServerMessage> for WebsocketConnectionImpl {
ctx.close(None);
ctx.stop();
}

ServerMessage::Logs(logs) => {
ctx.text(RawMessage::new(EventType::Logs, logs));
}

ServerMessage::Stats(stats) => {
ctx.text(RawMessage::new(EventType::Stats, serde_json::to_string(&stats).unwrap()))
}
}

Ok(())
Expand Down
10 changes: 10 additions & 0 deletions crates/alerion_core/src/websocket/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ impl ClientConnection {
}
}

pub fn send_if_authenticated<F>(&self, msg: F)
where
F: FnOnce() -> ServerMessage
{
if self.auth_tracker.get_auth() {
let m = msg();
self.ws_sender.do_send(m);
}
}

/// Terminate the connection on the server's side.
///
/// There could be a condition where the server tries to terminate the connection,
Expand Down

0 comments on commit c43ac69

Please sign in to comment.