Skip to content

Commit

Permalink
refactor: use ractor
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Feb 21, 2025
1 parent bb18fd8 commit 876a8e0
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 61 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"pemfile",
"prost",
"protobuf",
"ractor",
"reqwest",
"rustls",
"rustup",
Expand Down
84 changes: 79 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ fmt:
cargo +nightly fmt
fd --type=file --hidden --extension=yml --extension=md --extension=js --exec-batch npx -y prettier --write

# Check project.
[group("lint")]
check: clippy
fd --type=file --hidden --extension=yml --extension=md --extension=js --exec-batch npx -y prettier --check

# Run Clippy over workspace.
[group("lint")]
clippy:
Expand Down
4 changes: 2 additions & 2 deletions websockets/echo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ name = "websocket-client"
path = "src/client.rs"

[dependencies]
actix.workspace = true
actix-files.workspace = true
actix-web.workspace = true
actix-web-actors.workspace = true
actix-ws.workspace = true
awc.workspace = true

env_logger.workspace = true
futures-util = { workspace = true, features = ["sink"] }
log.workspace = true
ractor = { version = "0.15", default-features = false }
tokio = { workspace = true, features = ["full"] }
tokio-stream.workspace = true
18 changes: 15 additions & 3 deletions websockets/echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@
use actix_files::NamedFile;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web_actors::ws;
use ractor::Actor;

mod server;
use self::server::MyWebSocket;
use self::server::{MyWebSocket, WsMessage};

async fn index() -> impl Responder {
NamedFile::open_async("./static/index.html").await.unwrap()
}

/// WebSocket handshake and start `MyWebSocket` actor.
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
ws::start(MyWebSocket::new(), &req, stream)
let (res, session, stream) = actix_ws::handle(&req, stream)?;

let (actor, _handle) = Actor::spawn(None, MyWebSocket, session).await.unwrap();

actix_web::rt::spawn(async move {
let mut stream = stream.aggregate_continuations();

while let Some(Ok(msg)) = stream.recv().await {
actor.send_message(WsMessage::Ws(msg)).unwrap();
}
});

Ok(res)
}

// the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support
Expand Down
135 changes: 84 additions & 51 deletions websockets/echo/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,111 @@
use actix_ws::AggregatedMessage;
use ractor::{ActorProcessingErr, ActorRef};
use std::time::{Duration, Instant};

use actix::prelude::*;
use actix_web_actors::ws;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Debug)]
pub(crate) enum WsMessage {
Ws(actix_ws::AggregatedMessage),
Hb,
}

/// websocket connection is long running connection, it easier
/// to handle with an actor
pub struct MyWebSocket {
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
}
pub(crate) struct MyWebSocket;

impl MyWebSocket {
pub fn new() -> Self {
Self { hb: Instant::now() }
async fn handle_hb(
&self,
state: &mut (Instant, actix_ws::Session),
myself: &ActorRef<WsMessage>,
) -> Result<(), ActorProcessingErr> {
if Instant::now().duration_since(state.0) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");

let _ = state.1.clone().close(None).await;
myself.stop(None);

// don't try to send a ping
} else {
state.1.ping(b"").await?;
};

Ok(())
}

/// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL).
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");

// stop actor
ctx.stop();

// don't try to send a ping
return;
async fn handle_ws_msg(
&self,
msg: AggregatedMessage,
state: &mut (Instant, actix_ws::Session),
myself: ActorRef<WsMessage>,
) -> Result<(), ActorProcessingErr> {
println!("WS: {msg:?}");

match msg {
AggregatedMessage::Ping(msg) => {
state.0 = Instant::now();
state.1.pong(&msg).await?;
}

ctx.ping(b"");
});
AggregatedMessage::Pong(_) => {
state.0 = Instant::now();
}

AggregatedMessage::Text(text) => {
state.1.text(text).await?;
}

AggregatedMessage::Binary(bin) => {
state.1.binary(bin).await?;
}

AggregatedMessage::Close(reason) => {
let _ = state.1.clone().close(reason).await;
myself.stop(None);
}
};

Ok(())
}
}

impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
impl ractor::Actor for MyWebSocket {
type Msg = WsMessage;
type State = (Instant, actix_ws::Session);
type Arguments = actix_ws::Session;

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
session: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
myself.send_interval(HEARTBEAT_INTERVAL, || WsMessage::Hb);

/// Method is called on actor start. We start the heartbeat process here.
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
Ok((Instant::now(), session))
}
}

/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
// process websocket messages
println!("WS: {msg:?}");
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WsMessage::Hb => {
self.handle_hb(state, &myself).await?;
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();

WsMessage::Ws(msg) => {
self.handle_ws_msg(msg, state, myself).await?;
}
_ => ctx.stop(),
}

Ok(())
}
}

0 comments on commit 876a8e0

Please sign in to comment.