Skip to content

Commit

Permalink
fix(relay): Remove reference to the Agent from Relay after it is disc…
Browse files Browse the repository at this point in the history
…onnected. (#121)
  • Loading branch information
nasa42 authored Feb 23, 2025
1 parent 12eb7d1 commit 7d767a9
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 57 deletions.
39 changes: 16 additions & 23 deletions relay/src/models/agent_connection.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,42 @@
use crate::models::socket_reader::{SocketReader, SocketSubscriber};
use crate::models::socket_writer::{SocketPublisher, SocketWriter};
use crate::models::agent_registry::AgentRegistry;
use crate::models::socket_connection::SocketConnection;
use axum::extract::ws::WebSocket;
use futures::StreamExt;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Notify;
use tracing::error;
use webterm_core::models::device_id::DeviceId;
use webterm_core::types::FrontendId;

pub struct AgentConnection {
device_id: DeviceId,
agent_writer: SocketWriter,
agent_reader: SocketReader,
close_notifier: Notify,
socket_connection: SocketConnection,
next_frontend_id: AtomicU64,
}

impl AgentConnection {
pub async fn new(device_id: DeviceId, socket: WebSocket) -> Self {
let (agent_writer, agent_reader) = socket.split();
let agent_reader = SocketReader::new(agent_reader);
let agent_writer = SocketWriter::new(agent_writer);

let conn = SocketConnection::new(socket);
Self {
device_id,
agent_writer,
agent_reader,
close_notifier: Notify::new(),
socket_connection: conn,
next_frontend_id: AtomicU64::new(1),
}
}

pub fn socket(&self) -> &SocketConnection {
&self.socket_connection
}

pub fn device_id(&self) -> &DeviceId {
&self.device_id
}

pub async fn wait_until_closed(&self) {
self.close_notifier.notified().await;
}

pub fn publisher(&self) -> SocketPublisher {
self.agent_writer.publisher()
}

pub fn subscriber(&self) -> SocketSubscriber {
self.agent_reader.subscriber()
self.socket_connection.close_notifier().notified().await;
error!("STARTING THE REMOVAL");
if let Err(e) = AgentRegistry::remove(self.device_id.clone()).await {
error!("Failed to remove agent from registry: {:?}", e);
}
let _ = self.socket_connection.writer().close().await;
}

pub fn next_frontend_id(&self) -> FrontendId {
Expand Down
15 changes: 6 additions & 9 deletions relay/src/models/agent_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,18 @@ impl AgentRegistry {
Ok(())
}

#[allow(dead_code)]
pub async fn remove(device_id: DeviceId) -> Result<Arc<AgentConnection>, RelayError> {
let registry = Self::singleton().await;
let result = registry
.agents
.write()
.await
.remove(&device_id)
.ok_or(RelayError::AgentNotFound);
let mut agents = registry.agents.write().await;
let mut devices = registry.devices.write().await;

let result = agents.remove(&device_id).ok_or(RelayError::AgentNotFound);

if let Some(subnames) = registry.devices.write().await.get_mut(device_id.name()) {
if let Some(subnames) = devices.get_mut(device_id.name()) {
subnames.remove(device_id.subname());

if subnames.is_empty() {
registry.devices.write().await.remove(device_id.name());
devices.remove(device_id.name());
}
}

Expand Down
12 changes: 6 additions & 6 deletions relay/src/models/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ impl Bridge {

let frontend_id = self.frontend_id;

let frontend_sub = fc.subscriber();
let agent_sub = ac.subscriber();
let frontend_sub = fc.socket().subscriber();
let agent_sub = ac.socket().subscriber();

let frontend_pub = fc.publisher();
let agent_pub = ac.publisher();
let frontend_pub = fc.socket().publisher();
let agent_pub = ac.socket().publisher();

let f2r_task = tokio::spawn(Self::f2r_task(
frontend_sub,
Expand All @@ -77,8 +77,8 @@ impl Bridge {
frontend_id,
));

let frontend_pub = fc.publisher();
let agent_pub = ac.publisher();
let frontend_pub = fc.socket().publisher();
let agent_pub = ac.socket().publisher();

let a2r_task = tokio::spawn(Self::a2r_task(
agent_sub,
Expand Down
24 changes: 7 additions & 17 deletions relay/src/models/frontend_connection.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
use crate::models::socket_reader::{SocketReader, SocketSubscriber};
use crate::models::socket_writer::{SocketPublisher, SocketWriter};
use crate::models::socket_connection::SocketConnection;
use axum::extract::ws::WebSocket;
use futures::StreamExt;

pub struct FrontendConnection {
frontend_writer: SocketWriter,
frontend_reader: SocketReader,
socket_connection: SocketConnection,
}

impl FrontendConnection {
pub async fn new(socket: WebSocket) -> Self {
let (frontend_writer, frontend_reader) = socket.split();
let frontend_reader = SocketReader::new(frontend_reader);
let frontend_writer = SocketWriter::new(frontend_writer);
let conn = SocketConnection::new(socket);
Self {
frontend_writer,
frontend_reader,
socket_connection: conn,
}
}

pub fn publisher(&self) -> SocketPublisher {
self.frontend_writer.publisher()
}

pub fn subscriber(&self) -> SocketSubscriber {
self.frontend_reader.subscriber()
pub fn socket(&self) -> &SocketConnection {
&self.socket_connection
}

pub async fn close(&self) {
let _ = self.frontend_writer.close().await;
let _ = self.socket().writer().close().await;
}
}
1 change: 1 addition & 0 deletions relay/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ pub mod handshake_nonce_agent_registry;
pub mod handshake_nonce_frontend_registry;
pub mod relay_error;
pub mod send_payload;
pub mod socket_connection;
pub mod socket_reader;
pub mod socket_writer;
40 changes: 40 additions & 0 deletions relay/src/models/socket_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::models::socket_reader::{SocketReader, SocketSubscriber};
use crate::models::socket_writer::{SocketPublisher, SocketWriter};
use axum::extract::ws::WebSocket;
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::Notify;

pub struct SocketConnection {
writer: SocketWriter,
reader: SocketReader,
close_notifier: Arc<Notify>,
}

impl SocketConnection {
pub fn new(socket: WebSocket) -> Self {
let (writer, reader) = socket.split();
let notifier = Arc::new(Notify::new());
Self {
writer: SocketWriter::new(writer),
reader: SocketReader::new(reader, notifier.clone()),
close_notifier: notifier.clone(),
}
}

pub fn close_notifier(&self) -> Arc<Notify> {
self.close_notifier.clone()
}

pub fn writer(&self) -> &SocketWriter {
&self.writer
}

pub fn publisher(&self) -> SocketPublisher {
self.writer.publisher()
}

pub fn subscriber(&self) -> SocketSubscriber {
self.reader.subscriber()
}
}
7 changes: 5 additions & 2 deletions relay/src/models/socket_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use axum::extract::ws::{Message, WebSocket};
use futures::stream::SplitStream;
use futures::StreamExt;
use tokio::sync::broadcast;
use std::sync::Arc;
use tokio::sync::{broadcast, Notify};
use tracing::{error, info};
use webterm_core::models::reader_socket_error::ReaderSocketError;

Expand All @@ -12,7 +13,7 @@ pub struct SocketReader {
}

impl SocketReader {
pub fn new(mut reader_stream: SplitStream<WebSocket>) -> Self {
pub fn new(mut reader_stream: SplitStream<WebSocket>, close_notifier: Arc<Notify>) -> Self {
let (_tx, _rx) = broadcast::channel::<Result<Option<Vec<u8>>, ReaderSocketError>>(16);
let tx = _tx.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -48,6 +49,8 @@ impl SocketReader {
break;
}
}

close_notifier.notify_waiters();
});
Self { _tx }
}
Expand Down

0 comments on commit 7d767a9

Please sign in to comment.