diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 6aceed5534a..0a166c98181 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -147,10 +147,10 @@ pub trait ProgressWatcher: Send + Sync { fn transmission_progress(&self, progress: TransmissionProgress); } -/// A listener to the global (client-wide) status of the sending queue. +/// A listener to the global (client-wide) status of the send queue. #[uniffi::export(callback_interface)] -pub trait SendingQueueStatusListener: Sync + Send { - /// Called every time the sending queue has received a new status. +pub trait SendQueueStatusListener: Sync + Send { + /// Called every time the send queue has received a new status. /// /// This can be set automatically (in case of sending failure), or manually /// via an API call. @@ -316,9 +316,9 @@ impl Client { Ok(()) } - /// Enables or disables the sending queue, according to the given parameter. + /// Enables or disables the send queue, according to the given parameter. /// - /// The sending queue automatically disables itself whenever sending an + /// The send queue automatically disables itself whenever sending an /// event with it failed (e.g., sending an event via the high-level Timeline /// object), so it's required to manually re-enable it as soon as /// connectivity is back on the device. @@ -330,14 +330,14 @@ impl Client { } } - /// Subscribe to the global enablement status of the sending queue, at the + /// Subscribe to the global enablement status of the send queue, at the /// client-wide level. /// /// The given listener will be immediately called with the initial value of /// the enablement status. pub fn subscribe_to_sending_queue_status( &self, - listener: Box, + listener: Box, ) -> Arc { let mut subscriber = self.inner.sending_queue().subscribe_status(); diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 02d7179d0bf..fa3007f84b5 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -219,7 +219,7 @@ impl Timeline { Ok(()) } - /// Queues an event in the room's sending queue so it's processed for + /// Queues an event in the room's send queue so it's processed for /// sending later. /// /// Returns an abort handle that allows to abort sending, if it hasn't diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index bf6e0794893..e206b3e2dde 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -18,7 +18,7 @@ use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ event_cache::{EventsOrigin, RoomEventCacheUpdate}, executor::spawn, - send_queue::{LocalEcho, RoomSendingQueueUpdate}, + send_queue::{LocalEcho, RoomSendQueueUpdate}, Room, }; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; @@ -295,7 +295,7 @@ impl TimelineBuilder { loop { match listener.recv().await { Ok(update) => match update { - RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content, abort_handle, @@ -312,13 +312,13 @@ impl TimelineBuilder { .await; } - RoomSendingQueueUpdate::CancelledLocalEvent { transaction_id } => { + RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => { if !timeline.discard_local_echo(&transaction_id).await { warn!("couldn't find the local echo to discard"); } } - RoomSendingQueueUpdate::SendError { transaction_id, error } => { + RoomSendQueueUpdate::SendError { transaction_id, error } => { timeline .update_event_send_state( &transaction_id, @@ -327,7 +327,7 @@ impl TimelineBuilder { .await; } - RoomSendingQueueUpdate::SentEvent { transaction_id, event_id } => { + RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => { timeline .update_event_send_state( &transaction_id, diff --git a/crates/matrix-sdk-ui/src/timeline/error.rs b/crates/matrix-sdk-ui/src/timeline/error.rs index 3782f70b0f8..150b7a0311f 100644 --- a/crates/matrix-sdk-ui/src/timeline/error.rs +++ b/crates/matrix-sdk-ui/src/timeline/error.rs @@ -16,7 +16,7 @@ use std::fmt; use matrix_sdk::{ event_cache::{paginator::PaginatorError, EventCacheError}, - send_queue::RoomSendingQueueError, + send_queue::RoomSendQueueError, }; use ruma::OwnedTransactionId; use thiserror::Error; @@ -144,7 +144,7 @@ pub enum SendEventError { UnsupportedEditItem(#[from] UnsupportedEditItem), #[error(transparent)] - SendError(#[from] RoomSendingQueueError), + SendError(#[from] RoomSendQueueError), } #[derive(Debug, Error)] diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index bbeb1bffe28..d10ee267a56 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -27,7 +27,7 @@ use matrix_sdk::{ event_handler::EventHandlerHandle, executor::JoinHandle, room::{Receipts, Room}, - send_queue::{AbortSendHandle, RoomSendingQueueError}, + send_queue::{AbortSendHandle, RoomSendQueueError}, Client, Result, }; use matrix_sdk_base::RoomState; @@ -300,7 +300,7 @@ impl Timeline { pub async fn send( &self, content: AnyMessageLikeEventContent, - ) -> Result { + ) -> Result { self.room().sending_queue().send(content).await } diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 0821b0f5bae..602684186ea 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -2539,7 +2539,7 @@ async fn test_room_latest_event() -> Result<(), Error> { // Insert a local event in the `Timeline`. timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); - // Let the sending queue send the message, and the timeline process it. + // Let the send queue send the message, and the timeline process it. yield_now().await; // The latest event of the `Timeline` is a local event. diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index e12d4eb7a5d..e52b6a4991b 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -151,7 +151,7 @@ async fn test_retry_failed() { timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; // First, local echo is added @@ -177,7 +177,7 @@ async fn test_retry_failed() { client.sending_queue().enable(); - // Let the sending queue handle the event. + // Let the send queue handle the event. tokio::time::sleep(Duration::from_millis(300)).await; // After mocking the endpoint and retrying, it succeeds. @@ -281,7 +281,7 @@ async fn test_cancel_failed() { let handle = timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; // Local echo is added (immediately) diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs index ed131459594..c06b9dc6ac5 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs @@ -206,7 +206,7 @@ async fn test_send_edit() { .await .unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; let edit_item = @@ -296,7 +296,7 @@ async fn test_send_reply_edit() { .await .unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; let edit_item = @@ -387,7 +387,7 @@ async fn test_send_edit_poll() { UnstablePollStartContentBlock::new("Edited Test".to_owned(), edited_poll_answers); timeline.edit_poll("poll_fallback_text", edited_poll, &poll_event).await.unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; let edit_item = diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs index 825ea889f67..d38614eaf6b 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs @@ -328,7 +328,7 @@ async fn test_focused_timeline_doesnt_show_local_echoes() { // Send a message in the room, expect no local echo. timeline.send(RoomMessageEventContent::text_plain("h4xx0r").into()).await.unwrap(); - // Let a bit of time for the sending queue to process the event. + // Let a bit of time for the send queue to process the event. tokio::time::sleep(Duration::from_millis(300)).await; // And nothing more. diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 50f15df21c0..6f4c36b5111 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -82,10 +82,10 @@ async fn test_message_order() { timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap(); timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; - // Local echoes are available after the sending queue has processed these. + // Local echoes are available after the send queue has processed these. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert!(!value.is_editable(), "local echo for first can't be edited"); assert_eq!(value.content().as_message().unwrap().body(), "First!"); @@ -140,10 +140,10 @@ async fn test_retry_order() { timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap(); timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; - // Local echoes are available after the sending queue has processed these. + // Local echoes are available after the send queue has processed these. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); }); @@ -187,7 +187,7 @@ async fn test_retry_order() { // Wait 200ms for the first msg, 100ms for the second, 300ms for overhead sleep(Duration::from_millis(600)).await; - // With the sending queue, sending is retried in the same order as the events + // With the send queue, sending is retried in the same order as the events // were sent. So we first see the first message. assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); @@ -336,7 +336,7 @@ async fn test_no_duplicate_day_divider() { timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap(); timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; // Local echoes are available as soon as `timeline.send` returns. diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs index 2a3ac068bec..4b99bc7e2e3 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs @@ -323,7 +323,7 @@ async fn test_send_reply() { .await .unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); @@ -430,7 +430,7 @@ async fn test_send_reply_to_self() { .await .unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); @@ -520,7 +520,7 @@ async fn test_send_reply_to_threaded() { .await .unwrap(); - // Let the sending queue handle the event. + // Let the send queue handle the event. yield_now().await; let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); diff --git a/crates/matrix-sdk/src/client/builder.rs b/crates/matrix-sdk/src/client/builder.rs index aad850c1dc0..2505ae74ce2 100644 --- a/crates/matrix-sdk/src/client/builder.rs +++ b/crates/matrix-sdk/src/client/builder.rs @@ -38,7 +38,7 @@ use crate::http_client::HttpSettings; use crate::oidc::OidcCtx; use crate::{ authentication::AuthCtx, config::RequestConfig, error::RumaApiError, http_client::HttpClient, - send_queue::SendingQueueData, HttpError, IdParseError, + send_queue::SendQueueData, HttpError, IdParseError, }; /// Builder that allows creating and configuring various parts of a [`Client`]. @@ -453,7 +453,7 @@ impl ClientBuilder { }); let event_cache = OnceCell::new(); - let sending_queue = Arc::new(SendingQueueData::new(true)); + let sending_queue = Arc::new(SendQueueData::new(true)); let inner = ClientInner::new( auth_ctx, homeserver, diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 652162c201c..904edba31eb 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -83,7 +83,7 @@ use crate::{ matrix_auth::MatrixAuth, notification_settings::NotificationSettings, room_preview::RoomPreview, - send_queue::SendingQueueData, + send_queue::SendQueueData, sync::{RoomUpdate, SyncResponse}, Account, AuthApi, AuthSession, Error, Media, Pusher, RefreshTokenError, Result, Room, TransmissionProgress, @@ -284,10 +284,10 @@ pub(crate) struct ClientInner { #[cfg(feature = "e2e-encryption")] pub(crate) verification_state: SharedObservable, - /// Data related to the [`SendingQueue`]. + /// Data related to the [`SendQueue`]. /// - /// [`SendingQueue`]: crate::send_queue::SendingQueue - pub(crate) sending_queue_data: Arc, + /// [`SendQueue`]: crate::send_queue::SendQueue + pub(crate) sending_queue_data: Arc, } impl ClientInner { @@ -307,7 +307,7 @@ impl ClientInner { unstable_features: Option>, respect_login_well_known: bool, event_cache: OnceCell, - sending_queue: Arc, + sending_queue: Arc, #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings, ) -> Arc { let client = Self { diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 96f6c31f7de..fe88a3f063c 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! A sending queue facility to serializing queuing and sending of messages. +//! A send queue facility to serializing queuing and sending of messages. use std::{ collections::{BTreeMap, VecDeque}, @@ -34,24 +34,24 @@ use tracing::{debug, error, info, instrument, trace, warn}; use crate::{client::WeakClient, config::RequestConfig, room::WeakRoom, Client, Room}; -/// A client-wide sending queue, for all the rooms known by a client. -pub struct SendingQueue { +/// A client-wide send queue, for all the rooms known by a client. +pub struct SendQueue { client: Client, } #[cfg(not(tarpaulin_include))] -impl std::fmt::Debug for SendingQueue { +impl std::fmt::Debug for SendQueue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SendingQueue").finish_non_exhaustive() + f.debug_struct("SendQueue").finish_non_exhaustive() } } -impl SendingQueue { +impl SendQueue { pub(super) fn new(client: Client) -> Self { Self { client } } - fn for_room(&self, room: Room) -> RoomSendingQueue { + fn for_room(&self, room: Room) -> RoomSendQueue { let data = &self.client.inner.sending_queue_data; let mut map = data.rooms.write().unwrap(); @@ -62,7 +62,7 @@ impl SendingQueue { } let owned_room_id = room_id.to_owned(); - let room_q = RoomSendingQueue::new( + let room_q = RoomSendQueue::new( data.globally_enabled.clone(), data.is_dropping.clone(), &self.client, @@ -72,13 +72,13 @@ impl SendingQueue { room_q } - /// Enable the sending queue for the entire client, i.e. all rooms. + /// Enable the send queue for the entire client, i.e. all rooms. /// /// This may wake up background tasks and resume sending of events in the /// background. pub fn enable(&self) { if self.client.inner.sending_queue_data.globally_enabled.set_if_not_eq(true).is_some() { - debug!("globally enabling sending queue"); + debug!("globally enabling send queue"); let rooms = self.client.inner.sending_queue_data.rooms.read().unwrap(); // Wake up the rooms, in case events have been queued in the meanwhile. for room in rooms.values() { @@ -87,7 +87,7 @@ impl SendingQueue { } } - /// Disable the sending queue for the entire client, i.e. all rooms. + /// Disable the send queue for the entire client, i.e. all rooms. /// /// If requests were being sent, they're not aborted, and will continue /// until a status resolves (error responses will keep the events in the @@ -100,34 +100,34 @@ impl SendingQueue { // the queue is now disabled, // - or they were not, and it's not worth it waking them to let them they're // disabled, which causes them to go to sleep again. - debug!("globally disabling sending queue"); + debug!("globally disabling send queue"); self.client.inner.sending_queue_data.globally_enabled.set(false); } - /// Returns whether the sending queue is enabled, at a client-wide + /// Returns whether the send queue is enabled, at a client-wide /// granularity. pub fn is_enabled(&self) -> bool { self.client.inner.sending_queue_data.globally_enabled.get() } /// A subscriber to the enablement status (enabled or disabled) of the - /// sending queue. + /// send queue. pub fn subscribe_status(&self) -> Subscriber { self.client.inner.sending_queue_data.globally_enabled.subscribe() } } impl Client { - /// Returns a [`SendingQueue`] that handles sending, retrying and not + /// Returns a [`SendQueue`] that handles sending, retrying and not /// forgetting about messages that are to be sent. - pub fn sending_queue(&self) -> SendingQueue { - SendingQueue::new(self.clone()) + pub fn sending_queue(&self) -> SendQueue { + SendQueue::new(self.clone()) } } -pub(super) struct SendingQueueData { - /// Mapping of room to their unique sending queue. - rooms: SyncRwLock>, +pub(super) struct SendQueueData { + /// Mapping of room to their unique send queue. + rooms: SyncRwLock>, /// Is the whole mechanism enabled or disabled? globally_enabled: SharedObservable, @@ -136,8 +136,8 @@ pub(super) struct SendingQueueData { is_dropping: Arc, } -impl SendingQueueData { - /// Create the data for a sending queue, in the given enabled state. +impl SendQueueData { + /// Create the data for a send queue, in the given enabled state. pub fn new(globally_enabled: bool) -> Self { Self { rooms: Default::default(), @@ -147,11 +147,11 @@ impl SendingQueueData { } } -impl Drop for SendingQueueData { +impl Drop for SendQueueData { fn drop(&mut self) { - // Mark the whole sending queue as shutting down, then wake up all the room + // Mark the whole send queue as shutting down, then wake up all the room // queues so they're stopped too. - debug!("globally dropping the sending queue"); + debug!("globally dropping the send queue"); self.is_dropping.store(true, Ordering::SeqCst); let rooms = self.rooms.read().unwrap(); @@ -162,27 +162,27 @@ impl Drop for SendingQueueData { } impl Room { - /// Returns the [`RoomSendingQueue`] for this specific room. - pub fn sending_queue(&self) -> RoomSendingQueue { + /// Returns the [`RoomSendQueue`] for this specific room. + pub fn sending_queue(&self) -> RoomSendQueue { self.client.sending_queue().for_room(self.clone()) } } -/// A per-room sending queue. +/// A per-room send queue. /// /// This is cheap to clone. #[derive(Clone)] -pub struct RoomSendingQueue { - inner: Arc, +pub struct RoomSendQueue { + inner: Arc, } -impl std::fmt::Debug for RoomSendingQueue { +impl std::fmt::Debug for RoomSendQueue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RoomSendingQueue").finish_non_exhaustive() + f.debug_struct("RoomSendQueue").finish_non_exhaustive() } } -impl RoomSendingQueue { +impl RoomSendQueue { fn new( globally_enabled: SharedObservable, is_dropping: Arc, @@ -206,7 +206,7 @@ impl RoomSendingQueue { )); Self { - inner: Arc::new(RoomSendingQueueInner { + inner: Arc::new(RoomSendQueueInner { room: weak_room, updates: updates_sender, _task: task, @@ -221,7 +221,7 @@ impl RoomSendingQueue { /// This immediately returns, and will push the event to be sent into a /// queue, handled in the background. /// - /// Callers are expected to consume [`RoomSendingQueueUpdate`] via calling + /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling /// the [`Self::subscribe()`] method to get updates about the sending of /// that event. /// @@ -232,12 +232,12 @@ impl RoomSendingQueue { pub async fn send( &self, content: AnyMessageLikeEventContent, - ) -> Result { + ) -> Result { let Some(room) = self.inner.room.get() else { - return Err(RoomSendingQueueError::RoomDisappeared); + return Err(RoomSendQueueError::RoomDisappeared); }; if room.state() != RoomState::Joined { - return Err(RoomSendingQueueError::RoomNotJoined); + return Err(RoomSendQueueError::RoomNotJoined); } let transaction_id = self.inner.queue.push(content.clone()).await; @@ -245,7 +245,7 @@ impl RoomSendingQueue { self.inner.notifier.notify_one(); - let _ = self.inner.updates.send(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id: transaction_id.clone(), content, abort_handle: AbortSendHandle { @@ -258,8 +258,8 @@ impl RoomSendingQueue { } /// Returns the current local events as well as a receiver to listen to the - /// send queue updates, as defined in [`RoomSendingQueueUpdate`]. - pub async fn subscribe(&self) -> (Vec, broadcast::Receiver) { + /// send queue updates, as defined in [`RoomSendQueueUpdate`]. + pub async fn subscribe(&self) -> (Vec, broadcast::Receiver) { let local_echoes = self .inner .queue @@ -281,7 +281,7 @@ impl RoomSendingQueue { room: WeakRoom, queue: QueueStorage, notifier: Arc, - updates: broadcast::Sender, + updates: broadcast::Sender, globally_enabled: SharedObservable, is_dropping: Arc, ) { @@ -329,7 +329,7 @@ impl RoomSendingQueue { queue.mark_as_sent(&queued_event.transaction_id).await; - let _ = updates.send(RoomSendingQueueUpdate::SentEvent { + let _ = updates.send(RoomSendQueueUpdate::SentEvent { transaction_id: queued_event.transaction_id, event_id: res.event_id, }); @@ -339,14 +339,14 @@ impl RoomSendingQueue { warn!(txn_id = %queued_event.transaction_id, "error when sending event: {err}"); // Disable the queue after an error. - // See comment in [`SendingQueue::disable()`]. + // See comment in [`SendQueue::disable()`]. globally_enabled.set(false); // In this case, we intentionally keep the event in the queue, but mark it as // not being sent anymore. queue.mark_as_not_being_sent(&queued_event.transaction_id).await; - let _ = updates.send(RoomSendingQueueUpdate::SendError { + let _ = updates.send(RoomSendQueueUpdate::SendError { transaction_id: queued_event.transaction_id, error: Arc::new(err), }); @@ -358,14 +358,14 @@ impl RoomSendingQueue { } } -struct RoomSendingQueueInner { - /// The room which this sending queue relates to. +struct RoomSendQueueInner { + /// The room which this send queue relates to. room: WeakRoom, /// Broadcaster for notifications about the statuses of events to be sent. /// /// Can be subscribed to from the outside. - updates: broadcast::Sender, + updates: broadcast::Sender, /// Queue of events that are either to be sent, or being sent. /// @@ -506,10 +506,10 @@ pub struct LocalEcho { pub abort_handle: AbortSendHandle, } -/// An update to a room sending queue, observable with -/// [`RoomSendingQueue::subscribe`]. +/// An update to a room send queue, observable with +/// [`RoomSendQueue::subscribe`]. #[derive(Clone, Debug)] -pub enum RoomSendingQueueUpdate { +pub enum RoomSendQueueUpdate { /// A new local event is being sent. /// /// There's been a user query to create this event. It is being sent to the @@ -525,7 +525,7 @@ pub enum RoomSendingQueueUpdate { /// An error happened when an event was being sent. /// - /// The event has not been removed from the queue. All the sending queues + /// The event has not been removed from the queue. All the send queues /// will be disabled after this happens, and must be manually re-enabled. SendError { /// Transaction id used to identify this event. @@ -544,9 +544,9 @@ pub enum RoomSendingQueueUpdate { }, } -/// An error triggered by the sending queue module. +/// An error triggered by the send queue module. #[derive(Debug, thiserror::Error)] -pub enum RoomSendingQueueError { +pub enum RoomSendQueueError { /// The room isn't in the joined state. #[error("the room isn't in the joined state")] RoomNotJoined, @@ -561,7 +561,7 @@ pub enum RoomSendingQueueError { /// a room. #[derive(Clone, Debug)] pub struct AbortSendHandle { - room: RoomSendingQueue, + room: RoomSendQueue, transaction_id: OwnedTransactionId, } @@ -573,7 +573,7 @@ impl AbortSendHandle { pub async fn abort(self) -> bool { if self.room.inner.queue.cancel(&self.transaction_id).await { // Propagate a cancelled update too. - let _ = self.room.inner.updates.send(RoomSendingQueueUpdate::CancelledLocalEvent { + let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: self.transaction_id.clone(), }); true diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 2706322a373..e8dc5c25062 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use assert_matches2::{assert_let, assert_matches}; use futures_util::FutureExt as _; use matrix_sdk::{ - send_queue::{LocalEcho, RoomSendingQueueError, RoomSendingQueueUpdate}, + send_queue::{LocalEcho, RoomSendQueueError, RoomSendQueueUpdate}, test_utils::logged_in_client_with_server, }; use matrix_sdk_test::{async_test, InvitedRoomBuilder, JoinedRoomBuilder, LeftRoomBuilder}; @@ -47,12 +47,12 @@ async fn test_cant_send_invited_room() { ) .await; - // I can't send message to it with the sending queue. + // I can't send message to it with the send queue. assert_matches!( room.sending_queue() .send(RoomMessageEventContent::text_plain("Hello, World!").into()) .await, - Err(RoomSendingQueueError::RoomNotJoined) + Err(RoomSendQueueError::RoomNotJoined) ); } @@ -73,12 +73,12 @@ async fn test_cant_send_left_room() { ) .await; - // I can't send message to it with the sending queue. + // I can't send message to it with the send queue. assert_matches!( room.sending_queue() .send(RoomMessageEventContent::text_plain("Farewell, World!").into()) .await, - Err(RoomSendingQueueError::RoomNotJoined) + Err(RoomSendQueueError::RoomNotJoined) ); } @@ -99,7 +99,7 @@ async fn test_nothing_sent_when_disabled() { ) .await; - // When I disable the sending queue, + // When I disable the send queue, let event_id = event_id!("$1"); mock_send_event(event_id).expect(0).mount(&server).await; @@ -180,7 +180,7 @@ async fn test_smoke() { room.sending_queue().send(RoomMessageEventContent::text_plain("1").into()).await.unwrap(); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(msg), transaction_id: txn1, .. @@ -200,7 +200,7 @@ async fn test_smoke() { drop(lock_guard); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::SentEvent { + Ok(Ok(RoomSendQueueUpdate::SentEvent { event_id: response_event_id, transaction_id: txn2 })) = timeout(Duration::from_secs(1), watch.recv()).await @@ -270,7 +270,7 @@ async fn test_error() { q.send(RoomMessageEventContent::text_plain("1").into()).await.unwrap(); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(msg), transaction_id: txn1, .. @@ -296,7 +296,7 @@ async fn test_error() { // non-determinism, so let it fail after a large amount of time (10 // seconds). assert_let!( - Ok(Ok(RoomSendingQueueUpdate::SendError { transaction_id: txn2, error })) = + Ok(Ok(RoomSendQueueUpdate::SendError { transaction_id: txn2, error })) = timeout(Duration::from_secs(10), watch.recv()).await ); @@ -334,7 +334,7 @@ async fn test_error() { assert!(client.sending_queue().is_enabled()); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::SentEvent { event_id, transaction_id: txn3 })) = + Ok(Ok(RoomSendQueueUpdate::SentEvent { event_id, transaction_id: txn3 })) = timeout(Duration::from_secs(1), watch.recv()).await ); @@ -365,7 +365,7 @@ async fn test_reenabling_queue() { assert!(global_status.next_now()); - // When I start with a disabled sending queue, + // When I start with a disabled send queue, client.sending_queue().disable(); assert!(!client.sending_queue().is_enabled()); @@ -385,7 +385,7 @@ async fn test_reenabling_queue() { for i in 1..=3 { assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(msg), .. }))) = timeout(Duration::from_secs(1), watch.recv()).await @@ -434,7 +434,7 @@ async fn test_reenabling_queue() { // They're sent, in the same ordering. for i in 1..=3 { assert_let!( - Ok(Ok(RoomSendingQueueUpdate::SentEvent { event_id, .. })) = + Ok(Ok(RoomSendQueueUpdate::SentEvent { event_id, .. })) = timeout(Duration::from_secs(1), watch.recv()).await ); assert_eq!(event_id.as_str(), format!("${i}")); @@ -510,7 +510,7 @@ async fn test_cancellation() { // Receiving update for msg1. assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(_), transaction_id: txn1, .. @@ -519,7 +519,7 @@ async fn test_cancellation() { // Receiving update for msg2. assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(_), transaction_id: txn2, .. @@ -528,7 +528,7 @@ async fn test_cancellation() { // Receiving update for msg3. assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(_), transaction_id: txn3, abort_handle: handle3, @@ -537,7 +537,7 @@ async fn test_cancellation() { // Receiving update for msg4. assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(_), transaction_id: txn4, .. @@ -546,7 +546,7 @@ async fn test_cancellation() { // Receiving update for msg5. assert_let!( - Ok(Ok(RoomSendingQueueUpdate::NewLocalEvent(LocalEcho { + Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { content: AnyMessageLikeEventContent::RoomMessage(_), transaction_id: txn5, .. @@ -568,7 +568,7 @@ async fn test_cancellation() { assert!(handle2.abort().await); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::CancelledLocalEvent { + Ok(Ok(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: cancelled_transaction_id })) = timeout(Duration::from_secs(1), watch.recv()).await ); @@ -582,7 +582,7 @@ async fn test_cancellation() { assert!(handle3.abort().await); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::CancelledLocalEvent { + Ok(Ok(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: cancelled_transaction_id })) = timeout(Duration::from_secs(1), watch.recv()).await ); @@ -606,7 +606,7 @@ async fn test_cancellation() { assert!(handle4.abort().await); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::CancelledLocalEvent { + Ok(Ok(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: cancelled_transaction_id })) = timeout(Duration::from_secs(1), watch.recv()).await ); @@ -620,13 +620,13 @@ async fn test_cancellation() { // Now the server will process msg1 and msg3. assert_let!( - Ok(Ok(RoomSendingQueueUpdate::SentEvent { transaction_id: sent_txn, .. })) = + Ok(Ok(RoomSendQueueUpdate::SentEvent { transaction_id: sent_txn, .. })) = timeout(Duration::from_secs(1), watch.recv()).await ); assert_eq!(sent_txn, txn1,); assert_let!( - Ok(Ok(RoomSendingQueueUpdate::SentEvent { transaction_id: sent_txn, .. })) = + Ok(Ok(RoomSendQueueUpdate::SentEvent { transaction_id: sent_txn, .. })) = timeout(Duration::from_secs(1), watch.recv()).await ); assert_eq!(sent_txn, txn5); diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index d3b56442db4..cf74274fdb5 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -818,7 +818,7 @@ impl App { "\nUse j/k to move, s/S to start/stop the sync service, m to mark as read, t to show the timeline.".to_owned() } DetailsMode::TimelineItems => { - "\nUse j/k to move, s/S to start/stop the sync service, r to show read receipts, Q to enable/disable the sending queue, M to send a message.".to_owned() + "\nUse j/k to move, s/S to start/stop the sync service, r to show read receipts, Q to enable/disable the send queue, M to send a message.".to_owned() } } };