Skip to content

Commit

Permalink
sdk: finish renaming "sending queue" to "send queue"
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Jun 6, 2024
1 parent 632de47 commit 429fe7b
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 55 deletions.
10 changes: 5 additions & 5 deletions bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,11 @@ impl Client {
/// event with it failed (e.g., sending an event via the high-level Timeline
/// object), so it's required to manually re-enable it as soon as
/// connectivity is back on the device.
pub fn enable_sending_queue(&self, enable: bool) {
pub fn enable_send_queue(&self, enable: bool) {
if enable {
self.inner.sending_queue().enable();
self.inner.send_queue().enable();
} else {
self.inner.sending_queue().disable();
self.inner.send_queue().disable();
}
}

Expand All @@ -334,11 +334,11 @@ impl Client {
///
/// The given listener will be immediately called with the initial value of
/// the enablement status.
pub fn subscribe_to_sending_queue_status(
pub fn subscribe_to_send_queue_status(
&self,
listener: Box<dyn SendQueueStatusListener>,
) -> Arc<TaskHandle> {
let mut subscriber = self.inner.sending_queue().subscribe_status();
let mut subscriber = self.inner.send_queue().subscribe_status();

Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Call with the initial value.
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl TimelineBuilder {
let local_echo_listener_handle = if is_live {
Some(spawn({
let timeline = inner.clone();
let (local_echoes, mut listener) = room.sending_queue().subscribe().await;
let (local_echoes, mut listener) = room.send_queue().subscribe().await;

// Handles existing local echoes first.
for echo in local_echoes {
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Timeline {
&self,
content: AnyMessageLikeEventContent,
) -> Result<AbortSendHandle, RoomSendQueueError> {
self.room().sending_queue().send(content).await
self.room().send_queue().send(content).await
}

/// Send a reply to the given event.
Expand Down
6 changes: 3 additions & 3 deletions crates/matrix-sdk-ui/tests/integration/timeline/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn test_retry_failed() {

mock_encryption_state(&server, false).await;

client.sending_queue().enable();
client.send_queue().enable();

let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await.unwrap());
Expand Down Expand Up @@ -173,9 +173,9 @@ async fn test_retry_failed() {
.mount(&server)
.await;

assert!(!client.sending_queue().is_enabled());
assert!(!client.send_queue().is_enabled());

client.sending_queue().enable();
client.send_queue().enable();

// Let the send queue handle the event.
tokio::time::sleep(Duration::from_millis(300)).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/tests/integration/timeline/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async fn test_retry_order() {
.await;

// Retry the second message first
client.sending_queue().enable();
client.send_queue().enable();

// Wait 200ms for the first msg, 100ms for the second, 300ms for overhead
sleep(Duration::from_millis(600)).await;
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl ClientBuilder {
});

let event_cache = OnceCell::new();
let sending_queue = Arc::new(SendQueueData::new(true));
let send_queue = Arc::new(SendQueueData::new(true));
let inner = ClientInner::new(
auth_ctx,
homeserver,
Expand All @@ -465,7 +465,7 @@ impl ClientBuilder {
None,
self.respect_login_well_known,
event_cache,
sending_queue,
send_queue,
#[cfg(feature = "e2e-encryption")]
self.encryption_settings,
)
Expand Down
8 changes: 4 additions & 4 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub(crate) struct ClientInner {
/// Data related to the [`SendQueue`].
///
/// [`SendQueue`]: crate::send_queue::SendQueue
pub(crate) sending_queue_data: Arc<SendQueueData>,
pub(crate) send_queue_data: Arc<SendQueueData>,
}

impl ClientInner {
Expand All @@ -307,7 +307,7 @@ impl ClientInner {
unstable_features: Option<BTreeMap<String, bool>>,
respect_login_well_known: bool,
event_cache: OnceCell<EventCache>,
sending_queue: Arc<SendQueueData>,
send_queue: Arc<SendQueueData>,
#[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
) -> Arc<Self> {
let client = Self {
Expand All @@ -330,7 +330,7 @@ impl ClientInner {
respect_login_well_known,
sync_beat: event_listener::Event::new(),
event_cache,
sending_queue_data: sending_queue,
send_queue_data: send_queue,
#[cfg(feature = "e2e-encryption")]
e2ee: EncryptionData::new(encryption_settings),
#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -2110,7 +2110,7 @@ impl Client {
self.inner.unstable_features.get().cloned(),
self.inner.respect_login_well_known,
self.inner.event_cache.clone(),
self.inner.sending_queue_data.clone(),
self.inner.send_queue_data.clone(),
#[cfg(feature = "e2e-encryption")]
self.inner.e2ee.encryption_settings,
)
Expand Down
24 changes: 12 additions & 12 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl SendQueue {
}

fn for_room(&self, room: Room) -> RoomSendQueue {
let data = &self.client.inner.sending_queue_data;
let data = &self.client.inner.send_queue_data;

let mut map = data.rooms.write().unwrap();

Expand All @@ -77,9 +77,9 @@ impl SendQueue {
/// This may wake up background tasks and resume sending of events in the
/// background.
pub fn enable(&self) {
if self.client.inner.sending_queue_data.globally_enabled.set_if_not_eq(true).is_some() {
if self.client.inner.send_queue_data.globally_enabled.set_if_not_eq(true).is_some() {
debug!("globally enabling send queue");
let rooms = self.client.inner.sending_queue_data.rooms.read().unwrap();
let rooms = self.client.inner.send_queue_data.rooms.read().unwrap();
// Wake up the rooms, in case events have been queued in the meanwhile.
for room in rooms.values() {
room.inner.notifier.notify_one();
Expand All @@ -101,26 +101,26 @@ impl SendQueue {
// - or they were not, and it's not worth it waking them to let them they're
// disabled, which causes them to go to sleep again.
debug!("globally disabling send queue");
self.client.inner.sending_queue_data.globally_enabled.set(false);
self.client.inner.send_queue_data.globally_enabled.set(false);
}

/// Returns whether the send queue is enabled, at a client-wide
/// granularity.
pub fn is_enabled(&self) -> bool {
self.client.inner.sending_queue_data.globally_enabled.get()
self.client.inner.send_queue_data.globally_enabled.get()
}

/// A subscriber to the enablement status (enabled or disabled) of the
/// send queue.
pub fn subscribe_status(&self) -> Subscriber<bool> {
self.client.inner.sending_queue_data.globally_enabled.subscribe()
self.client.inner.send_queue_data.globally_enabled.subscribe()
}
}

impl Client {
/// Returns a [`SendQueue`] that handles sending, retrying and not
/// forgetting about messages that are to be sent.
pub fn sending_queue(&self) -> SendQueue {
pub fn send_queue(&self) -> SendQueue {
SendQueue::new(self.clone())
}
}
Expand Down Expand Up @@ -163,8 +163,8 @@ impl Drop for SendQueueData {

impl Room {
/// Returns the [`RoomSendQueue`] for this specific room.
pub fn sending_queue(&self) -> RoomSendQueue {
self.client.sending_queue().for_room(self.clone())
pub fn send_queue(&self) -> RoomSendQueue {
self.client.send_queue().for_room(self.clone())
}
}

Expand Down Expand Up @@ -620,14 +620,14 @@ mod tests {
.unwrap();

let room = client.get_room(room_id).unwrap();
let q = room.sending_queue();
let q = room.send_queue();

let _watcher = q.subscribe().await;

if enabled {
client.sending_queue().enable();
client.send_queue().enable();
} else {
client.sending_queue().disable();
client.send_queue().disable();
}
}

Expand Down
48 changes: 23 additions & 25 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ async fn test_cant_send_invited_room() {

// I can't send message to it with the send queue.
assert_matches!(
room.sending_queue()
.send(RoomMessageEventContent::text_plain("Hello, World!").into())
.await,
room.send_queue().send(RoomMessageEventContent::text_plain("Hello, World!").into()).await,
Err(RoomSendQueueError::RoomNotJoined)
);
}
Expand All @@ -75,7 +73,7 @@ async fn test_cant_send_left_room() {

// I can't send message to it with the send queue.
assert_matches!(
room.sending_queue()
room.send_queue()
.send(RoomMessageEventContent::text_plain("Farewell, World!").into())
.await,
Err(RoomSendQueueError::RoomNotJoined)
Expand Down Expand Up @@ -103,10 +101,10 @@ async fn test_nothing_sent_when_disabled() {
let event_id = event_id!("$1");
mock_send_event(event_id).expect(0).mount(&server).await;

client.sending_queue().disable();
client.send_queue().disable();

// A message is queued, but never sent.
room.sending_queue()
room.send_queue()
.send(RoomMessageEventContent::text_plain("Hello, World!").into())
.await
.unwrap();
Expand Down Expand Up @@ -139,7 +137,7 @@ async fn test_smoke() {
)
.await;

let q = room.sending_queue();
let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await;
assert!(local_echoes.is_empty());
Expand Down Expand Up @@ -177,7 +175,7 @@ async fn test_smoke() {
.mount(&server)
.await;

room.sending_queue().send(RoomMessageEventContent::text_plain("1").into()).await.unwrap();
room.send_queue().send(RoomMessageEventContent::text_plain("1").into()).await.unwrap();

assert_let!(
Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
Expand Down Expand Up @@ -216,9 +214,9 @@ async fn test_smoke() {
async fn test_error() {
let (client, server) = logged_in_client_with_server().await;

let mut global_status = client.sending_queue().subscribe_status();
let mut global_status = client.send_queue().subscribe_status();

assert!(client.sending_queue().is_enabled());
assert!(client.send_queue().is_enabled());
assert!(global_status.next_now());

// Mark the room as joined.
Expand All @@ -234,7 +232,7 @@ async fn test_error() {
)
.await;

let q = room.sending_queue();
let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await;
assert!(local_echoes.is_empty());
Expand Down Expand Up @@ -311,7 +309,7 @@ async fn test_error() {
assert!(watch.is_empty());

assert!(!global_status.next().await.unwrap(), "the queue should be disabled next");
assert!(!client.sending_queue().is_enabled());
assert!(!client.send_queue().is_enabled());

server.reset().await;
Mock::given(method("PUT"))
Expand All @@ -325,13 +323,13 @@ async fn test_error() {
.await;

// Re-enabling the queue will re-send the same message in that room.
client.sending_queue().enable();
client.send_queue().enable();

assert!(
global_status.next().await.unwrap(),
"the queue should be re-enabled after the user action"
);
assert!(client.sending_queue().is_enabled());
assert!(client.send_queue().is_enabled());

assert_let!(
Ok(Ok(RoomSendQueueUpdate::SentEvent { event_id, transaction_id: txn3 })) =
Expand Down Expand Up @@ -361,17 +359,17 @@ async fn test_reenabling_queue() {
)
.await;

let mut global_status = client.sending_queue().subscribe_status();
let mut global_status = client.send_queue().subscribe_status();

assert!(global_status.next_now());

// When I start with a disabled send queue,
client.sending_queue().disable();
client.send_queue().disable();

assert!(!client.sending_queue().is_enabled());
assert!(!client.send_queue().is_enabled());
assert!(!global_status.next().await.unwrap());

let q = room.sending_queue();
let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await;

Expand Down Expand Up @@ -426,9 +424,9 @@ async fn test_reenabling_queue() {
.await;

// But when reenabling the queue,
client.sending_queue().enable();
client.send_queue().enable();

assert!(client.sending_queue().is_enabled());
assert!(client.send_queue().is_enabled());
assert!(global_status.next().await.unwrap());

// They're sent, in the same ordering.
Expand Down Expand Up @@ -460,7 +458,7 @@ async fn test_cancellation() {
)
.await;

let q = room.sending_queue();
let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await;

Expand Down Expand Up @@ -651,16 +649,16 @@ async fn test_abort_reenable() {
)
.await;

let mut global_status = client.sending_queue().subscribe_status();
let mut global_status = client.send_queue().subscribe_status();

assert!(global_status.next_now());

// When I start with an enabled sending queue,
client.sending_queue().enable();
client.send_queue().enable();

assert!(client.sending_queue().is_enabled());
assert!(client.send_queue().is_enabled());

let q = room.sending_queue();
let q = room.send_queue();

let (local_echoes, mut watch) = q.subscribe().await;

Expand Down
2 changes: 1 addition & 1 deletion labs/multiverse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl App {
Char('S') => self.sync_service.stop().await?,

Char('Q') => {
let q = self.client.sending_queue();
let q = self.client.send_queue();
let enabled = q.is_enabled();
if enabled {
q.disable();
Expand Down

0 comments on commit 429fe7b

Please sign in to comment.