From 2a79956caac4f72cb66d99beb86ff68340819150 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 11 Jun 2024 18:36:36 +0200 Subject: [PATCH] ffi: call the initial events in the same task that listens to updates This avoids a race condition where the caller hasn't set up the initial items or the listener, and the listener is called *before* the initial items have been used. --- bindings/matrix-sdk-ffi/src/timeline/mod.rs | 32 ++++++++++----------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 84c620069b9..0eec1ca8fa9 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -133,24 +133,28 @@ impl Timeline { #[uniffi::export(async_runtime = "tokio")] impl Timeline { - pub async fn add_listener( - &self, - listener: Box, - ) -> RoomTimelineListenerResult { + pub async fn add_listener(&self, listener: Box) -> Arc { let (timeline_items, timeline_stream) = self.inner.subscribe_batched().await; - let timeline_stream = TaskHandle::new(RUNTIME.spawn(async move { + + Arc::new(TaskHandle::new(RUNTIME.spawn(async move { pin_mut!(timeline_stream); + // It's important that the initial items are passed *before* we forward the + // stream updates, with a guaranteed ordering. Otherwise, it could + // be that the listener be called before the initial items have been + // handled by the caller. See #3535 for details. + + // First, pass all the items as a reset update. + listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset { + values: timeline_items, + }))]); + + // Then forward new items. while let Some(diffs) = timeline_stream.next().await { listener .on_update(diffs.into_iter().map(|d| Arc::new(TimelineDiff::new(d))).collect()); } - })); - - RoomTimelineListenerResult { - items: timeline_items.into_iter().map(TimelineItem::from_arc).collect(), - items_stream: Arc::new(timeline_stream), - } + }))) } pub fn retry_decryption(self: Arc, session_ids: Vec) { @@ -671,12 +675,6 @@ pub enum FocusEventError { Other { msg: String }, } -#[derive(uniffi::Record)] -pub struct RoomTimelineListenerResult { - pub items: Vec>, - pub items_stream: Arc, -} - #[uniffi::export(callback_interface)] pub trait TimelineListener: Sync + Send { fn on_update(&self, diff: Vec>);