Skip to content

Commit

Permalink
Fix sockets causing actix hangs
Browse files Browse the repository at this point in the history
  • Loading branch information
Geometrically committed Dec 27, 2024
1 parent 24765db commit 28fae41
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
32 changes: 18 additions & 14 deletions apps/labrinth/src/routes/internal/statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::queue::socket::ActiveSockets;
use crate::routes::ApiError;
use actix_web::web::{Data, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse};
use actix_ws::AggregatedMessage;
use actix_ws::Message;
use chrono::Utc;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

Expand Down Expand Up @@ -128,13 +128,13 @@ pub async fn ws_init(
)
.await?;

let mut stream = msg_stream.aggregate_continuations();
let mut stream = msg_stream.into_stream();

actix_web::rt::spawn(async move {
// receive messages from websocket
while let Some(msg) = stream.next().await {
match msg {
Ok(AggregatedMessage::Text(text)) => {
Ok(Message::Text(text)) => {
if let Ok(message) =
serde_json::from_str::<ClientToServerMessage>(&text)
{
Expand All @@ -159,10 +159,14 @@ pub async fn ws_init(
status.profile_name = profile_name;
status.last_update = Utc::now();

let user_status = status.clone();
// We drop the pair to avoid holding the lock for too long
drop(pair);

let _ = broadcast_friends(
user.id,
ServerToClientMessage::StatusUpdate {
status: status.clone(),
status: user_status,
},
&pool,
&db,
Expand All @@ -175,15 +179,14 @@ pub async fn ws_init(
}
}

Ok(AggregatedMessage::Close(_)) => {
Ok(Message::Close(_)) => {
let _ = close_socket(user.id, &pool, &db).await;
}

Ok(AggregatedMessage::Ping(msg)) => {
if let Some(mut socket) = db.auth_sockets.get_mut(&user.id)
{
let (_, socket) = socket.value_mut();
let _ = socket.pong(&msg).await;
Ok(Message::Ping(msg)) => {
if let Some(mut socket) = db.auth_sockets.get(&user.id) {
let (_, socket) = socket.value();
let _ = socket.clone().pong(&msg).await;
}
}

Expand Down Expand Up @@ -219,11 +222,12 @@ pub async fn broadcast_friends(

if friend.accepted {
if let Some(mut socket) =
sockets.auth_sockets.get_mut(&friend_id.into())
sockets.auth_sockets.get(&friend_id.into())
{
let (_, socket) = socket.value_mut();
let (_, socket) = socket.value();

let _ = socket.text(serde_json::to_string(&message)?).await;
let _ =
socket.clone().text(serde_json::to_string(&message)?).await;
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions apps/labrinth/src/routes/v3/friends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ pub async fn add_friend(
) -> Result<(), ApiError> {
if let Some(pair) = sockets.auth_sockets.get(&user_id.into()) {
let (friend_status, _) = pair.value();
if let Some(mut socket) =
sockets.auth_sockets.get_mut(&friend_id.into())
if let Some(socket) =
sockets.auth_sockets.get(&friend_id.into())
{
let (_, socket) = socket.value_mut();
let (_, socket) = socket.value();

let _ = socket
.clone()
.text(serde_json::to_string(
&ServerToClientMessage::StatusUpdate {
status: friend_status.clone(),
Expand Down Expand Up @@ -120,11 +121,11 @@ pub async fn add_friend(
.insert(&mut transaction)
.await?;

if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into())
{
let (_, socket) = socket.value_mut();
if let Some(socket) = db.auth_sockets.get(&friend.id.into()) {
let (_, socket) = socket.value();

if socket
.clone()
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequest { from: user.id },
)?)
Expand Down Expand Up @@ -177,10 +178,11 @@ pub async fn remove_friend(
)
.await?;

if let Some(mut socket) = db.auth_sockets.get_mut(&friend.id.into()) {
let (_, socket) = socket.value_mut();
if let Some(mut socket) = db.auth_sockets.get(&friend.id.into()) {
let (_, socket) = socket.value();

let _ = socket
.clone()
.text(serde_json::to_string(
&ServerToClientMessage::FriendRequestRejected {
from: user.id,
Expand Down

0 comments on commit 28fae41

Please sign in to comment.