diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index fe88a3f063c..22e41e9b1a1 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -338,14 +338,19 @@ impl RoomSendQueue { Err(err) => { warn!(txn_id = %queued_event.transaction_id, "error when sending event: {err}"); - // Disable the queue after an error. - // See comment in [`SendQueue::disable()`]. - globally_enabled.set(false); - // In this case, we intentionally keep the event in the queue, but mark it as // not being sent anymore. queue.mark_as_not_being_sent(&queued_event.transaction_id).await; + // Let observers know about a failure *after* we've marked the item as not + // being sent anymore. Otherwise, there's a possible race where a caller might + // try to remove an item, while it's still marked as being sent, resulting in a + // cancellation failure. + + // Disable the queue after an error. + // See comment in [`SendQueue::disable()`]. + globally_enabled.set(false); + let _ = updates.send(RoomSendQueueUpdate::SendError { transaction_id: queued_event.transaction_id, error: Arc::new(err), diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index e8dc5c25062..e9e974d85ab 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -633,3 +633,69 @@ async fn test_cancellation() { assert!(watch.is_empty()); } + +#[async_test] +async fn test_abort_reenable() { + let (client, server) = logged_in_client_with_server().await; + + // Mark the room as joined. + let room_id = room_id!("!a:b.c"); + + let room = mock_sync_with_new_room( + |builder| { + builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + }, + &client, + &server, + room_id, + ) + .await; + + let mut global_status = client.sending_queue().subscribe_status(); + + assert!(global_status.next_now()); + + // When I start with an enabled sending queue, + client.sending_queue().enable(); + + assert!(client.sending_queue().is_enabled()); + + let q = room.sending_queue(); + + let (local_echoes, mut watch) = q.subscribe().await; + + assert!(local_echoes.is_empty()); + assert!(watch.is_empty()); + + // One message is queued. + let abort_send_handle = + q.send(RoomMessageEventContent::text_plain("hey there").into()).await.unwrap(); + + // It is first seen as a local echo, + assert_let!( + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + content: AnyMessageLikeEventContent::RoomMessage(msg), + .. + }))) = timeout(Duration::from_secs(1), watch.recv()).await + ); + assert_eq!(msg.body(), format!("hey there")); + + // Waiting for the global status to report the queue is getting disabled. + assert!(!global_status.next().await.unwrap()); + + // Aborting the sending should work. + assert!(abort_send_handle.abort().await); + + // The room updates will report the error, then the cancelled event, eventually. + assert_let!( + Ok(Ok(RoomSendQueueUpdate::SendError { .. })) = + timeout(Duration::from_secs(1), watch.recv()).await + ); + + assert_let!( + Ok(Ok(RoomSendQueueUpdate::CancelledLocalEvent { .. })) = + timeout(Duration::from_secs(1), watch.recv()).await + ); + + assert!(watch.is_empty()); +}