Skip to content

Commit

Permalink
send queue: control enabled on a per-room basis in addition to globally
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Jun 10, 2024
1 parent 9c1d62a commit a5a116b
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 118 deletions.
49 changes: 23 additions & 26 deletions bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,12 @@ pub trait ProgressWatcher: Send + Sync {
fn transmission_progress(&self, progress: TransmissionProgress);
}

/// A listener to the global (client-wide) status of the send queue.
/// A listener to the global (client-wide) error reporter of the send queue.
#[uniffi::export(callback_interface)]
pub trait SendQueueStatusListener: Sync + Send {
/// Called every time the send queue has received a new status.
///
/// This can be set automatically (in case of sending failure), or manually
/// via an API call.
fn on_update(&self, new_value: bool);
pub trait SendQueueRoomErrorListener: Sync + Send {
/// Called every time the send queue has ran into an error for a given room,
/// which will disable the send queue for that particular room.
fn on_error(&self, room_id: String, error: ClientError);
}

#[derive(Clone, Copy, uniffi::Record)]
Expand Down Expand Up @@ -315,18 +313,15 @@ impl Client {
Ok(())
}

/// Enables or disables the send queue, according to the given parameter.
/// Enables or disables all the room send queues at once.
///
/// The send queue automatically disables itself whenever sending an
/// event with it failed (e.g., sending an event via the high-level Timeline
/// object), so it's required to manually re-enable it as soon as
/// connectivity is back on the device.
pub fn enable_send_queue(&self, enable: bool) {
if enable {
self.inner.send_queue().enable();
} else {
self.inner.send_queue().disable();
}
/// When connectivity is lost on a device, it is recommended to disable the
/// room sending queues.
///
/// This can be controlled for individual rooms, using
/// [`Room::enable_send_queue`].
pub fn enable_all_send_queues(&self, enable: bool) {
self.inner.send_queue().set_enabled(enable);
}

/// Subscribe to the global enablement status of the send queue, at the
Expand All @@ -336,17 +331,19 @@ impl Client {
/// the enablement status.
pub fn subscribe_to_send_queue_status(
&self,
listener: Box<dyn SendQueueStatusListener>,
listener: Box<dyn SendQueueRoomErrorListener>,
) -> Arc<TaskHandle> {
let mut subscriber = self.inner.send_queue().subscribe_status();
let mut subscriber = self.inner.send_queue().subscribe_errors();

Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Call with the initial value.
listener.on_update(subscriber.next_now());

// Call every time the value changes.
while let Some(next_val) = subscriber.next().await {
listener.on_update(next_val);
loop {
match subscriber.recv().await {
Ok(report) => listener
.on_error(report.room_id.to_string(), ClientError::new(report.error)),
Err(err) => {
error!("error when listening to the send queue error reporter: {err}");
}
}
}
})))
}
Expand Down
2 changes: 1 addition & 1 deletion bindings/matrix-sdk-ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum ClientError {
}

impl ClientError {
fn new<E: Display>(error: E) -> Self {
pub(crate) fn new<E: Display>(error: E) -> Self {
Self::Generic { msg: error.to_string() }
}
}
Expand Down
11 changes: 11 additions & 0 deletions bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,17 @@ impl Room {
.await?;
Ok(())
}

/// Returns whether the send queue for that particular room is enabled or
/// not.
pub fn is_send_queue_enabled(&self) -> bool {
self.inner.send_queue().is_enabled()
}

/// Enable or disable the send queue for that particular room.
pub fn enable_send_queue(&self, enable: bool) {
self.inner.send_queue().set_enabled(enable);
}
}

/// Generates a `matrix.to` permalink to the given room alias.
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk-ui/tests/integration/timeline/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn test_retry_failed() {

mock_encryption_state(&server, false).await;

client.send_queue().enable();
client.send_queue().set_enabled(true);

let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await.unwrap());
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn test_retry_failed() {

assert!(!client.send_queue().is_enabled());

client.send_queue().enable();
client.send_queue().set_enabled(true);

// Let the send queue handle the event.
tokio::time::sleep(Duration::from_millis(300)).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-ui/tests/integration/timeline/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async fn test_retry_order() {
.await;

// Retry the second message first
client.send_queue().enable();
client.send_queue().set_enabled(true);

// Wait 200ms for the first msg, 100ms for the second, 300ms for overhead
sleep(Duration::from_millis(600)).await;
Expand Down
133 changes: 86 additions & 47 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::{
},
};

use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_base::RoomState;
use matrix_sdk_common::executor::{spawn, JoinHandle};
use ruma::{
Expand Down Expand Up @@ -51,8 +50,13 @@ impl SendQueue {
Self { client }
}

#[inline(always)]
fn data(&self) -> &SendQueueData {
&self.client.inner.send_queue_data
}

fn for_room(&self, room: Room) -> RoomSendQueue {
let data = &self.client.inner.send_queue_data;
let data = self.data();

let mut map = data.rooms.write().unwrap();

Expand All @@ -63,7 +67,8 @@ impl SendQueue {

let owned_room_id = room_id.to_owned();
let room_q = RoomSendQueue::new(
data.globally_enabled.clone(),
data.globally_enabled.load(Ordering::SeqCst),
data.error_reporter.clone(),
data.is_dropping.clone(),
&self.client,
owned_room_id.clone(),
Expand All @@ -72,51 +77,49 @@ impl SendQueue {
room_q
}

/// Enable the send queue for the entire client, i.e. all rooms.
/// Enable or disable the send queue for the entire client, i.e. all rooms.
///
/// If we're disabling the queue, and requests were being sent, they're not
/// aborted, and will continue until a status resolves (error responses
/// will keep the events in the buffer of events to send later). The
/// disablement will happen before the next event is sent.
///
/// This may wake up background tasks and resume sending of events in the
/// background.
pub fn enable(&self) {
if self.client.inner.send_queue_data.globally_enabled.set_if_not_eq(true).is_some() {
debug!("globally enabling send queue");
let rooms = self.client.inner.send_queue_data.rooms.read().unwrap();
// Wake up the rooms, in case events have been queued in the meanwhile.
for room in rooms.values() {
room.inner.notifier.notify_one();
}
}
}
pub fn set_enabled(&self, enabled: bool) {
debug!(?enabled, "setting global send queue enablement");

/// Disable the send queue for the entire client, i.e. all rooms.
///
/// If requests were being sent, they're not aborted, and will continue
/// until a status resolves (error responses will keep the events in the
/// buffer of events to send later). The disablement will happen before
/// the next event is sent.
pub fn disable(&self) {
// Note: it's not required to wake the tasks just to let them know they're
// disabled:
// - either they were busy, will continue to the next iteration and realize
// the queue is now disabled,
// - or they were not, and it's not worth it waking them to let them they're
// disabled, which causes them to go to sleep again.
debug!("globally disabling send queue");
self.client.inner.send_queue_data.globally_enabled.set(false);
self.data().globally_enabled.store(enabled, Ordering::SeqCst);

let rooms = self.data().rooms.read().unwrap();
for room in rooms.values() {
room.set_enabled(enabled);
}
}

/// Returns whether the send queue is enabled, at a client-wide
/// granularity.
pub fn is_enabled(&self) -> bool {
self.client.inner.send_queue_data.globally_enabled.get()
self.data().globally_enabled.load(Ordering::SeqCst)
}

/// A subscriber to the enablement status (enabled or disabled) of the
/// send queue.
pub fn subscribe_status(&self) -> Subscriber<bool> {
self.client.inner.send_queue_data.globally_enabled.subscribe()
pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
self.data().error_reporter.subscribe()
}
}

/// A specific room ran into an error, and has disabled itself.
#[derive(Clone, Debug)]
pub struct SendQueueRoomError {
/// Which room is failing?
pub room_id: OwnedRoomId,

/// The error the room has ran into, when trying to send an event.
pub error: Arc<crate::Error>,
}

impl Client {
/// Returns a [`SendQueue`] that handles sending, retrying and not
/// forgetting about messages that are to be sent.
Expand All @@ -130,7 +133,13 @@ pub(super) struct SendQueueData {
rooms: SyncRwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,

/// Is the whole mechanism enabled or disabled?
globally_enabled: SharedObservable<bool>,
///
/// This is only kept in memory to initialize new room queues with an
/// initial enablement state.
globally_enabled: AtomicBool,

/// Global error updates for the send queue.
error_reporter: broadcast::Sender<SendQueueRoomError>,

/// Are we currently dropping the Client?
is_dropping: Arc<AtomicBool>,
Expand All @@ -139,9 +148,12 @@ pub(super) struct SendQueueData {
impl SendQueueData {
/// Create the data for a send queue, in the given enabled state.
pub fn new(globally_enabled: bool) -> Self {
let (sender, _) = broadcast::channel(32);

Self {
rooms: Default::default(),
globally_enabled: SharedObservable::new(globally_enabled),
globally_enabled: AtomicBool::new(globally_enabled),
error_reporter: sender,
is_dropping: Arc::new(false.into()),
}
}
Expand Down Expand Up @@ -184,7 +196,8 @@ impl std::fmt::Debug for RoomSendQueue {

impl RoomSendQueue {
fn new(
globally_enabled: SharedObservable<bool>,
globally_enabled: bool,
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
client: &Client,
room_id: OwnedRoomId,
Expand All @@ -195,13 +208,15 @@ impl RoomSendQueue {
let notifier = Arc::new(Notify::new());

let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));

let task = spawn(Self::sending_task(
weak_room.clone(),
queue.clone(),
notifier.clone(),
updates_sender.clone(),
globally_enabled,
locally_enabled.clone(),
global_error_reporter,
is_dropping,
));

Expand All @@ -212,6 +227,7 @@ impl RoomSendQueue {
_task: task,
queue,
notifier,
locally_enabled,
}),
}
}
Expand Down Expand Up @@ -282,7 +298,8 @@ impl RoomSendQueue {
queue: QueueStorage,
notifier: Arc<Notify>,
updates: broadcast::Sender<RoomSendQueueUpdate>,
globally_enabled: SharedObservable<bool>,
locally_enabled: Arc<AtomicBool>,
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
) {
info!("spawned the sending task");
Expand All @@ -294,7 +311,7 @@ impl RoomSendQueue {
break;
}

if !globally_enabled.get() {
if !locally_enabled.load(Ordering::SeqCst) {
trace!("not enabled, sleeping");
// Wait for an explicit wakeup.
notifier.notified().await;
Expand Down Expand Up @@ -347,20 +364,42 @@ impl RoomSendQueue {
// try to remove an item, while it's still marked as being sent, resulting in a
// cancellation failure.

// Disable the queue after an error.
// See comment in [`SendQueue::disable()`].
globally_enabled.set(false);
// Disable the queue for this room after an error.
locally_enabled.store(false, Ordering::SeqCst);

let error = Arc::new(err);

let _ = global_error_reporter.send(SendQueueRoomError {
room_id: room.room_id().to_owned(),
error: error.clone(),
});

let _ = updates.send(RoomSendQueueUpdate::SendError {
transaction_id: queued_event.transaction_id,
error: Arc::new(err),
error,
});
}
}
}

info!("exited sending task");
}

/// Returns whether the room is enabled, at the room level.
pub fn is_enabled(&self) -> bool {
self.inner.locally_enabled.load(Ordering::SeqCst)
}

/// Set the locally enabled flag for this room queue.
pub fn set_enabled(&self, enabled: bool) {
self.inner.locally_enabled.store(enabled, Ordering::SeqCst);

// No need to wake a task to tell it it's been disabled, so only notify if we're
// re-enabling the queue.
if enabled {
self.inner.notifier.notify_one();
}
}
}

struct RoomSendQueueInner {
Expand Down Expand Up @@ -389,6 +428,10 @@ struct RoomSendQueueInner {
/// enabled statuses), or the associated room [`QueueStorage`].
notifier: Arc<Notify>,

/// Should the room process new events or not (because e.g. it might be
/// running off the network)?
locally_enabled: Arc<AtomicBool>,

/// Handle to the actual sending task. Unused, but kept alive along this
/// data structure.
_task: JoinHandle<()>,
Expand Down Expand Up @@ -624,11 +667,7 @@ mod tests {

let _watcher = q.subscribe().await;

if enabled {
client.send_queue().enable();
} else {
client.send_queue().disable();
}
client.send_queue().set_enabled(enabled);
}

drop(client);
Expand Down
Loading

0 comments on commit a5a116b

Please sign in to comment.