Skip to content

Commit

Permalink
send queue: put a LocalEcho's event content into a new enum
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Aug 13, 2024
1 parent 5954ee1 commit 9f556ec
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 54 deletions.
60 changes: 32 additions & 28 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use matrix_sdk::{
deserialized_responses::SyncTimelineEvent,
event_cache::{paginator::Paginator, RoomEventCache},
pinned_events_cache::PinnedEventCache,
send_queue::{LocalEcho, RoomSendQueueUpdate, SendHandle},
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle},
Result, Room,
};
#[cfg(test)]
Expand Down Expand Up @@ -1183,34 +1183,38 @@ impl<P: RoomDataProvider> TimelineInner<P> {

/// Handle a room send update that's a new local echo.
pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
let content = match echo.serialized_event.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo: {err}");
return;
}
};
match echo.content {
LocalEchoContent::Event { serialized_event, send_handle, is_wedged } => {
let content = match serialized_event.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo: {err}");
return;
}
};

self.handle_local_event(
echo.transaction_id.clone(),
TimelineEventKind::Message { content, relations: Default::default() },
Some(echo.send_handle),
)
.await;

if echo.is_wedged {
self.update_event_send_state(
&echo.transaction_id,
EventSendState::SendingFailed {
// Put a dummy error in this case, since we're not persisting the errors that
// occurred in previous sessions.
error: Arc::new(matrix_sdk::Error::UnknownError(Box::new(
MissingLocalEchoFailError,
))),
is_recoverable: false,
},
)
.await;
self.handle_local_event(
echo.transaction_id.clone(),
TimelineEventKind::Message { content, relations: Default::default() },
Some(send_handle),
)
.await;

if is_wedged {
self.update_event_send_state(
&echo.transaction_id,
EventSendState::SendingFailed {
// Put a dummy error in this case, since we're not persisting the errors
// that occurred in previous sessions.
error: Arc::new(matrix_sdk::Error::UnknownError(Box::new(
MissingLocalEchoFailError,
))),
is_recoverable: false,
},
)
.await;
}
}
}
}

Expand Down
50 changes: 34 additions & 16 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ pub struct RoomSendQueue {
inner: Arc<RoomSendQueueInner>,
}

#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for RoomSendQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoomSendQueue").finish_non_exhaustive()
Expand Down Expand Up @@ -343,9 +344,14 @@ impl RoomSendQueue {

let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
serialized_event: content,
send_handle: SendHandle { room: self.clone(), transaction_id: transaction_id.clone() },
is_wedged: false,
content: LocalEchoContent::Event {
serialized_event: content,
send_handle: SendHandle {
room: self.clone(),
transaction_id: transaction_id.clone(),
},
is_wedged: false,
},
}));

Ok(SendHandle { transaction_id, room: self.clone() })
Expand Down Expand Up @@ -389,12 +395,14 @@ impl RoomSendQueue {
.into_iter()
.map(|queued| LocalEcho {
transaction_id: queued.transaction_id.clone(),
serialized_event: queued.event,
send_handle: SendHandle {
room: self.clone(),
transaction_id: queued.transaction_id,
content: LocalEchoContent::Event {
serialized_event: queued.event,
send_handle: SendHandle {
room: self.clone(),
transaction_id: queued.transaction_id,
},
is_wedged: queued.is_wedged,
},
is_wedged: queued.is_wedged,
})
.collect();

Expand Down Expand Up @@ -980,19 +988,29 @@ impl QueueStorage {
}
}

/// The content of a local echo.
#[derive(Clone, Debug)]
pub enum LocalEchoContent {
/// The local echo contains an actual event ready to display.
Event {
/// Content of the event itself (along with its type) that we are about
/// to send.
serialized_event: SerializableEventContent,
/// A handle to manipulate the sending of the associated event.
send_handle: SendHandle,
/// Whether trying to send this local echo failed in the past with an
/// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
is_wedged: bool,
},
}

/// An event that has been locally queued for sending, but hasn't been sent yet.
#[derive(Clone, Debug)]
pub struct LocalEcho {
/// Transaction id used to identify this event.
pub transaction_id: OwnedTransactionId,
/// Content of the event itself (along with its type) that we are about to
/// send.
pub serialized_event: SerializableEventContent,
/// A handle to manipulate the sending of the associated event.
pub send_handle: SendHandle,
/// Whether trying to send this local echo failed in the past with an
/// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
pub is_wedged: bool,
/// The content for the local echo.
pub content: LocalEchoContent,
}

/// An update to a room send queue, observable with
Expand Down
22 changes: 12 additions & 10 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use assert_matches2::{assert_let, assert_matches};
use matrix_sdk::{
config::{RequestConfig, StoreConfig},
send_queue::{LocalEcho, RoomSendQueueError, RoomSendQueueUpdate},
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueError, RoomSendQueueUpdate},
test_utils::{
events::EventFactory, logged_in_client, logged_in_client_with_server, set_client_session,
},
Expand Down Expand Up @@ -64,11 +64,13 @@ macro_rules! assert_update {
($watch:ident => local echo { body = $body:expr }) => {{
assert_let!(
Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
serialized_event,
content: LocalEchoContent::Event {
serialized_event,
send_handle,
// New local echoes should always start as not wedged.
is_wedged: false,
},
transaction_id: txn,
send_handle,
// New local echoes should always start as not wedged.
is_wedged: false,
}))) = timeout(Duration::from_secs(1), $watch.recv()).await
);

Expand Down Expand Up @@ -334,9 +336,8 @@ async fn test_smoke_raw() {

assert_let!(
Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
serialized_event,
content: LocalEchoContent::Event { serialized_event, .. },
transaction_id: txn1,
..
}))) = timeout(Duration::from_secs(1), watch.recv()).await
);

Expand Down Expand Up @@ -787,8 +788,7 @@ async fn test_cancellation() {
let local_echo4 = local_echoes.remove(1);
assert_eq!(local_echo4.transaction_id, txn4, "local echoes: {local_echoes:?}");

let handle4 = local_echo4.send_handle;

let LocalEchoContent::Event { send_handle: handle4, .. } = local_echo4.content;
assert!(handle4.abort().await.unwrap());
assert_update!(watch => cancelled { txn = txn4 });
assert!(watch.is_empty());
Expand Down Expand Up @@ -1016,7 +1016,9 @@ async fn test_edit_while_being_sent_and_fails() {
assert_eq!(local_echoes.len(), 1);
assert_eq!(local_echoes[0].transaction_id, txn1);

let event = local_echoes[0].serialized_event.deserialize().unwrap();
let LocalEchoContent::Event { serialized_event, .. } = &local_echoes[0].content;
let event = serialized_event.deserialize().unwrap();

assert_let!(AnyMessageLikeEventContent::RoomMessage(msg) = event);
assert_eq!(msg.body(), "it's never too late!");
}
Expand Down

0 comments on commit 9f556ec

Please sign in to comment.