Skip to content

Commit

Permalink
send queue: try to avoid a race when disabling then aborting a send
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Jun 6, 2024
1 parent 66330a2 commit 6744f27
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
13 changes: 9 additions & 4 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,19 @@ impl RoomSendQueue {
Err(err) => {
warn!(txn_id = %queued_event.transaction_id, "error when sending event: {err}");

// Disable the queue after an error.
// See comment in [`SendQueue::disable()`].
globally_enabled.set(false);

// In this case, we intentionally keep the event in the queue, but mark it as
// not being sent anymore.
queue.mark_as_not_being_sent(&queued_event.transaction_id).await;

// Let observers know about a failure *after* we've marked the item as not
// being sent anymore. Otherwise, there's a possible race where a caller might
// try to remove an item, while it's still marked as being sent, resulting in a
// cancellation failure.

// Disable the queue after an error.
// See comment in [`SendQueue::disable()`].
globally_enabled.set(false);

let _ = updates.send(RoomSendQueueUpdate::SendError {
transaction_id: queued_event.transaction_id,
error: Arc::new(err),
Expand Down
66 changes: 66 additions & 0 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,69 @@ async fn test_cancellation() {

assert!(watch.is_empty());
}

#[async_test]
async fn test_abort_reenable() {
let (client, server) = logged_in_client_with_server().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");

let room = mock_sync_with_new_room(
|builder| {
builder.add_joined_room(JoinedRoomBuilder::new(room_id));
},
&client,
&server,
room_id,
)
.await;

let mut global_status = client.sending_queue().subscribe_status();

assert!(global_status.next_now());

// When I start with an enabled sending queue,
client.sending_queue().enable();

assert!(client.sending_queue().is_enabled());

let q = room.sending_queue();

let (local_echoes, mut watch) = q.subscribe().await;

assert!(local_echoes.is_empty());
assert!(watch.is_empty());

// One message is queued.
let abort_send_handle =
q.send(RoomMessageEventContent::text_plain("hey there").into()).await.unwrap();

// It is first seen as a local echo,
assert_let!(
Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho {
content: AnyMessageLikeEventContent::RoomMessage(msg),
..
}))) = timeout(Duration::from_secs(1), watch.recv()).await
);
assert_eq!(msg.body(), format!("hey there"));

// Waiting for the global status to report the queue is getting disabled.
assert!(!global_status.next().await.unwrap());

// Aborting the sending should work.
assert!(abort_send_handle.abort().await);

// The room updates will report the error, then the cancelled event, eventually.
assert_let!(
Ok(Ok(RoomSendingQueueUpdate::SendError { .. })) =
timeout(Duration::from_secs(1), watch.recv()).await
);

assert_let!(
Ok(Ok(RoomSendingQueueUpdate::CancelledLocalEvent { .. })) =
timeout(Duration::from_secs(1), watch.recv()).await
);

assert!(watch.is_empty());
}

0 comments on commit 6744f27

Please sign in to comment.