Skip to content

Commit

Permalink
ffi: call the initial events in the same task that listens to updates
Browse files Browse the repository at this point in the history
This avoids a race condition where the caller hasn't set up the initial
items or the listener, and the listener is called *before* the initial
items have been used.
  • Loading branch information
bnjbvr committed Jun 12, 2024
1 parent 871116e commit 2a79956
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,28 @@ impl Timeline {

#[uniffi::export(async_runtime = "tokio")]
impl Timeline {
pub async fn add_listener(
&self,
listener: Box<dyn TimelineListener>,
) -> RoomTimelineListenerResult {
pub async fn add_listener(&self, listener: Box<dyn TimelineListener>) -> Arc<TaskHandle> {
let (timeline_items, timeline_stream) = self.inner.subscribe_batched().await;
let timeline_stream = TaskHandle::new(RUNTIME.spawn(async move {

Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
pin_mut!(timeline_stream);

// It's important that the initial items are passed *before* we forward the
// stream updates, with a guaranteed ordering. Otherwise, it could
// be that the listener be called before the initial items have been
// handled by the caller. See #3535 for details.

// First, pass all the items as a reset update.
listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset {
values: timeline_items,
}))]);

// Then forward new items.
while let Some(diffs) = timeline_stream.next().await {
listener
.on_update(diffs.into_iter().map(|d| Arc::new(TimelineDiff::new(d))).collect());
}
}));

RoomTimelineListenerResult {
items: timeline_items.into_iter().map(TimelineItem::from_arc).collect(),
items_stream: Arc::new(timeline_stream),
}
})))
}

pub fn retry_decryption(self: Arc<Self>, session_ids: Vec<String>) {
Expand Down Expand Up @@ -671,12 +675,6 @@ pub enum FocusEventError {
Other { msg: String },
}

#[derive(uniffi::Record)]
pub struct RoomTimelineListenerResult {
pub items: Vec<Arc<TimelineItem>>,
pub items_stream: Arc<TaskHandle>,
}

#[uniffi::export(callback_interface)]
pub trait TimelineListener: Sync + Send {
fn on_update(&self, diff: Vec<Arc<TimelineDiff>>);
Expand Down

0 comments on commit 2a79956

Please sign in to comment.