Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timeline/sendq: add a Timeline::redact() method and tweak local echoes behavior #3488

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
db658ad
sdk: add an optional `RequestConfig` to the SendMessageLikeEvent futures
bnjbvr May 24, 2024
6c2834a
sdk: add a sending queue for handling sending room events in the back…
bnjbvr May 20, 2024
e094cca
sending queue: add a global way to observe the enablement state
bnjbvr May 27, 2024
c8b3eed
sending queue: remove spurious `async`
bnjbvr May 27, 2024
185d692
sending queue: add better logging for debug purposes
bnjbvr May 28, 2024
3bca0f0
clippy: declare victory over clippy
bnjbvr May 27, 2024
420c579
timeline: use the new sending queue mechanism to send and receive loc…
bnjbvr May 24, 2024
adca0ee
timeline: only spawn the local echo listener if we're a live timeline
bnjbvr May 27, 2024
3754089
timeline: clarify in some names that events are remote
bnjbvr May 27, 2024
961d60b
integration tests: update reactions test to check if an event is a lo…
bnjbvr May 27, 2024
2e3f07d
ffi: expose the global sending queue primitives
bnjbvr May 27, 2024
5737b7e
ffi: expose the abort handle after sending an event
bnjbvr May 28, 2024
f69f4ee
multiverse: add support for sending messages and {en|dis}abling the s…
bnjbvr May 28, 2024
e70e0e1
fixup! ffi: expose the abort handle after sending an event
bnjbvr May 30, 2024
8c115c7
send queue: add an abort handle to all local echoes
bnjbvr May 31, 2024
bf9df0f
timeline: add a redact method to get rid of local or remote events
bnjbvr May 31, 2024
4b4e440
timeline: get rid of the Cancelled state
bnjbvr May 31, 2024
84695ad
timeline: mark local echoes as not editable
bnjbvr May 31, 2024
bd9a5d1
fixup! timeline: add a redact method to get rid of local or remote ev…
bnjbvr Jun 3, 2024
64a2f21
fixup! timeline: add a redact method to get rid of local or remote ev…
bnjbvr Jun 3, 2024
e13a29a
ffi: add new method to retrieve a timeline item by transaction id
bnjbvr Jun 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions bindings/matrix-sdk-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ pub trait ProgressWatcher: Send + Sync {
fn transmission_progress(&self, progress: TransmissionProgress);
}

/// A listener to the global (client-wide) status of the sending queue.
#[uniffi::export(callback_interface)]
pub trait SendingQueueStatusListener: Sync + Send {
/// Called every time the sending queue has received a new status.
///
/// This can be set automatically (in case of sending failure), or manually
/// via an API call.
fn on_value(&self, new_value: bool);
}

#[derive(Clone, Copy, uniffi::Record)]
pub struct TransmissionProgress {
pub current: u64,
Expand Down Expand Up @@ -307,6 +317,42 @@ impl Client {

Ok(())
}

/// Enables or disables the sending queue, according to the given parameter.
///
/// The sending queue automatically disables itself whenever sending an
/// event with it failed (e.g., sending an event via the high-level Timeline
/// object), so it's required to manually re-enable it as soon as
/// connectivity is back on the device.
pub fn enable_sending_queue(&self, val: bool) {
if val {
self.inner.sending_queue().enable();
} else {
self.inner.sending_queue().disable();
}
}

/// Subscribe to the global enablement status of the sending queue, at the
/// client-wide level.
///
/// The given listener will be immediately called with the initial value of
/// the enablement status.
pub fn subscribe_to_sending_queue_status(
&self,
listener: Box<dyn SendingQueueStatusListener>,
) -> Arc<TaskHandle> {
let mut subscriber = self.inner.sending_queue().subscribe_status();

Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Call with the initial value.
listener.on_value(subscriber.next_now());

// Call every time the value changes.
while let Some(next_val) = subscriber.next().await {
listener.on_value(next_val);
}
})))
}
}

impl Client {
Expand Down
135 changes: 104 additions & 31 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ use ruma::{
},
AnyMessageLikeEventContent,
},
EventId,
EventId, OwnedTransactionId,
};
use tokio::{
sync::Mutex,
task::{AbortHandle, JoinHandle},
};
use tracing::{error, info, warn};
use tracing::{error, warn};
use uuid::Uuid;

use self::content::{Reaction, ReactionSenderData, TimelineItemContent};
Expand Down Expand Up @@ -219,10 +219,22 @@ impl Timeline {
Ok(())
}

pub fn send(self: Arc<Self>, msg: Arc<RoomMessageEventContentWithoutRelation>) {
RUNTIME.spawn(async move {
self.inner.send((*msg).to_owned().with_relation(None).into()).await;
});
/// Queues an event in the room's sending queue so it's processed for
/// sending later.
///
/// Returns an abort handle that allows to abort sending, if it hasn't
/// happened yet.
pub async fn send(
self: Arc<Self>,
msg: Arc<RoomMessageEventContentWithoutRelation>,
) -> Result<Arc<AbortSendHandle>, ClientError> {
match self.inner.send((*msg).to_owned().with_relation(None).into()).await {
Ok(handle) => Ok(Arc::new(AbortSendHandle { inner: Mutex::new(Some(handle)) })),
Err(err) => {
error!("error when sending a message: {err}");
Err(anyhow::anyhow!(err).into())
}
}
}

pub fn send_image(
Expand Down Expand Up @@ -390,7 +402,9 @@ impl Timeline {
AnyMessageLikeEventContent::UnstablePollStart(poll_start_event_content.into());

RUNTIME.spawn(async move {
self.inner.send(event_content).await;
if let Err(err) = self.inner.send(event_content).await {
error!("unable to start poll: {err}");
}
});

Ok(())
Expand All @@ -409,7 +423,9 @@ impl Timeline {
AnyMessageLikeEventContent::UnstablePollResponse(poll_response_event_content);

RUNTIME.spawn(async move {
self.inner.send(event_content).await;
if let Err(err) = self.inner.send(event_content).await {
error!("unable to send poll response: {err}");
}
});

Ok(())
Expand All @@ -426,7 +442,9 @@ impl Timeline {
let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content);

RUNTIME.spawn(async move {
self.inner.send(event_content).await;
if let Err(err) = self.inner.send(event_content).await {
error!("unable to end poll: {err}");
}
});

Ok(())
Expand Down Expand Up @@ -472,7 +490,7 @@ impl Timeline {
Ok(())
}

pub fn send_location(
pub async fn send_location(
self: Arc<Self>,
body: String,
geo_uri: String,
Expand All @@ -496,7 +514,8 @@ impl Timeline {
let room_message_event_content = RoomMessageEventContentWithoutRelation::new(
MessageType::Location(location_event_message_content),
);
self.send(Arc::new(room_message_event_content))
// Errors are logged in `Self::send` already.
let _ = self.send(Arc::new(room_message_event_content)).await;
}

pub async fn toggle_reaction(&self, event_id: String, key: String) -> Result<(), ClientError> {
Expand All @@ -511,22 +530,14 @@ impl Timeline {
Ok(())
}

pub fn retry_send(self: Arc<Self>, txn_id: String) {
RUNTIME.spawn(async move {
if let Err(e) = self.inner.retry_send(txn_id.as_str().into()).await {
error!(txn_id, "Failed to retry sending: {e}");
}
});
}

pub fn cancel_send(self: Arc<Self>, txn_id: String) {
RUNTIME.spawn(async move {
if !self.inner.cancel_send(txn_id.as_str().into()).await {
info!(txn_id, "Failed to discard local echo: Not found");
}
});
}

/// Get the current timeline item for the given event ID, if any.
///
/// Will return a remote event, *or* a local echo that has been sent but not
/// yet replaced by a remote echo.
///
/// It's preferable to store the timeline items in the model for your UI, if
/// possible, instead of just storing IDs and coming back to the timeline
/// object to look up items.
pub async fn get_event_timeline_item_by_event_id(
&self,
event_id: String,
Expand All @@ -540,6 +551,26 @@ impl Timeline {
Ok(Arc::new(EventTimelineItem(item)))
}

/// Get the current timeline item for the given transaction ID, if any.
///
/// This will always return a local echo, if found.
///
/// It's preferable to store the timeline items in the model for your UI, if
/// possible, instead of just storing IDs and coming back to the timeline
/// object to look up items.
pub async fn get_event_timeline_item_by_transaction_id(
&self,
transaction_id: String,
) -> Result<Arc<EventTimelineItem>, ClientError> {
let transaction_id: OwnedTransactionId = transaction_id.into();
let item = self
.inner
.item_by_transaction_id(&transaction_id)
.await
.context("Item with given transaction ID not found")?;
Ok(Arc::new(EventTimelineItem(item)))
}

pub async fn get_timeline_event_content_by_event_id(
&self,
event_id: String,
Expand Down Expand Up @@ -567,6 +598,52 @@ impl Timeline {

latest_event.map(|item| Arc::new(EventTimelineItem(item)))
}

/// Redacts an event from the timeline.
///
/// Only works for events that exist as timeline items.
///
/// If it was a local event, this will *try* to cancel it, if it was not
/// being sent already. If the event was a remote event, then it will be
/// redacted by sending a redaction request to the server.
///
/// Returns whether the redaction did happen. It can only return false for
/// local events that are being processed.
pub async fn redact_event(
&self,
item: Arc<EventTimelineItem>,
reason: Option<String>,
) -> Result<bool, ClientError> {
let removed = self
.inner
.redact(&item.0, reason.as_deref())
.await
.map_err(|err| anyhow::anyhow!(err))?;

Ok(removed)
}
}

#[derive(uniffi::Object)]
pub struct AbortSendHandle {
inner: Mutex<Option<matrix_sdk::send_queue::AbortSendHandle>>,
}

#[uniffi::export(async_runtime = "tokio")]
impl AbortSendHandle {
/// Try to abort the sending of the current event.
///
/// If this returns `true`, then the sending could be aborted, because the
/// event hasn't been sent yet. Otherwise, if this returns `false`, the
/// event had already been sent and could not be aborted.
async fn abort(self: Arc<Self>) -> bool {
if let Some(inner) = self.inner.lock().await.take() {
inner.abort().await
} else {
warn!("trying to abort an send handle that's already been actioned");
false
}
}
}

#[derive(Debug, thiserror::Error, uniffi::Error)]
Expand Down Expand Up @@ -771,9 +848,6 @@ pub enum EventSendState {
/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed { error: String },
/// Sending has been cancelled because an earlier event in the
/// message-sending queue failed.
Cancelled,
/// The local event has been sent successfully to the server.
Sent { event_id: String },
}
Expand All @@ -785,7 +859,6 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
match value {
NotSentYet => Self::NotSentYet,
SendingFailed { error } => Self::SendingFailed { error: error.to_string() },
Cancelled => Self::Cancelled,
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
}
}
Expand Down
Loading
Loading