Skip to content

Commit

Permalink
task(sdk): Make the code more robust around event removals.
Browse files Browse the repository at this point in the history
This patch makes the code more robust around event removals. Sorting
events by their position is no longer done in the `Deduplicator` but in
a new `RoomEventCacheState::remove_events` method, which removes events
in the store and in the `RoomEvents`. This method is responsible to sort
events, this stuff is less fragile like so.
  • Loading branch information
Hywan committed Feb 24, 2025
1 parent c18c21c commit c1d8e6a
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 104 deletions.
68 changes: 8 additions & 60 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,12 @@ impl Deduplicator {
});
}

let mut outcome = match self {
match self {
Deduplicator::InMemory(dedup) => Ok(dedup.filter_duplicate_events(events, room_events)),
Deduplicator::PersistentStore(dedup) => {
dedup.filter_duplicate_events(events, room_events).await
}
}?;

sort_events_by_position_descending(&mut outcome.in_memory_duplicated_event_ids);
sort_events_by_position_descending(&mut outcome.in_store_duplicated_event_ids);

Ok(outcome)
}
}
}

Expand Down Expand Up @@ -329,23 +324,6 @@ pub(super) struct DeduplicationOutcome {
pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
}

/// Sort events so that they can be removed safely without messing their
/// position.
///
/// This function sort events by their position if any.
///
/// Events must be sorted by their position index, from greatest to lowest, so
/// that all positions remain valid inside the same chunk while they are being
/// removed. For the sake of debugability, we also sort by position chunk
/// identifier, but this is not required.
fn sort_events_by_position_descending(event_ids: &mut [(OwnedEventId, Position)]) {
event_ids.sort_by(|(_, a), (_, b)| {
b.chunk_identifier()
.cmp(&a.chunk_identifier())
.then_with(|| a.index().cmp(&b.index()).reverse())
});
}

#[cfg(test)]
mod tests {
use assert_matches2::{assert_let, assert_matches};
Expand All @@ -363,36 +341,6 @@ mod tests {
.into_event()
}

#[test]
fn test_sort_events_by_position_descending() {
let ev1 = owned_event_id!("$ev1");
let ev2 = owned_event_id!("$ev2");
let ev3 = owned_event_id!("$ev3");
let ev4 = owned_event_id!("$ev4");
let ev5 = owned_event_id!("$ev5");

let mut event_ids = vec![
(ev1.clone(), Position::new(ChunkIdentifier::new(2), 1)),
(ev2.clone(), Position::new(ChunkIdentifier::new(1), 0)),
(ev3.clone(), Position::new(ChunkIdentifier::new(2), 0)),
(ev4.clone(), Position::new(ChunkIdentifier::new(1), 1)),
(ev5.clone(), Position::new(ChunkIdentifier::new(0), 0)),
];

sort_events_by_position_descending(&mut event_ids);

assert_eq!(
event_ids,
&[
(ev1, Position::new(ChunkIdentifier::new(2), 1)),
(ev3, Position::new(ChunkIdentifier::new(2), 0)),
(ev4, Position::new(ChunkIdentifier::new(1), 1)),
(ev2, Position::new(ChunkIdentifier::new(1), 0)),
(ev5, Position::new(ChunkIdentifier::new(0), 0)),
]
);
}

#[async_test]
async fn test_filter_find_duplicates_in_the_input() {
let event_id_0 = owned_event_id!("$ev0");
Expand Down Expand Up @@ -587,11 +535,11 @@ mod tests {
assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
assert_eq!(
outcome.in_memory_duplicated_event_ids[0],
(event_id_3, Position::new(ChunkIdentifier::new(0), 1))
(event_id_2, Position::new(ChunkIdentifier::new(0), 0))
);
assert_eq!(
outcome.in_memory_duplicated_event_ids[1],
(event_id_2, Position::new(ChunkIdentifier::new(0), 0))
(event_id_3, Position::new(ChunkIdentifier::new(0), 1))
);

// From these 4 events, 2 are duplicated and live in the store only, they have
Expand All @@ -601,11 +549,11 @@ mod tests {
assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
assert_eq!(
outcome.in_store_duplicated_event_ids[0],
(event_id_1, Position::new(ChunkIdentifier::new(42), 1))
(event_id_0, Position::new(ChunkIdentifier::new(42), 0))
);
assert_eq!(
outcome.in_store_duplicated_event_ids[1],
(event_id_0, Position::new(ChunkIdentifier::new(42), 0))
(event_id_1, Position::new(ChunkIdentifier::new(42), 1))
);
}

Expand Down Expand Up @@ -755,11 +703,11 @@ mod tests {
assert_eq!(in_store_duplicated_event_ids.len(), 2);
assert_eq!(
in_store_duplicated_event_ids[0],
(eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
(eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
);
assert_eq!(
in_store_duplicated_event_ids[1],
(eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
(eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
);
}
}
66 changes: 49 additions & 17 deletions crates/matrix-sdk/src/event_cache/room/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,10 @@ impl RoomEvents {

/// Remove some events from the linked chunk.
///
/// Empty chunks are going to be removed.
///
/// Events **must** be sorted by their position (descending, i.e. from
/// newest to oldest).
pub fn remove_events_by_position<P>(&mut self, positions: P) -> Result<(), Error>
where
P: Iterator<Item = Position>,
{
/// If a chunk becomes empty, it's going to be removed.
pub fn remove_events_by_position(&mut self, mut positions: Vec<Position>) -> Result<(), Error> {
sort_positions_descending(&mut positions);

for position in positions {
self.chunks.remove_item_at(
position,
Expand Down Expand Up @@ -363,6 +359,21 @@ fn chunk_debug_string(content: &ChunkContent<Event, Gap>) -> String {
}
}

/// Sort positions of events so that events can be removed safely without
/// messing their position.
///
/// Events must be sorted by their position index, from greatest to lowest, so
/// that all positions remain valid inside the same chunk while they are being
/// removed. For the sake of debugability, we also sort by position chunk
/// identifier, but this is not required.
pub(super) fn sort_positions_descending(positions: &mut [Position]) {
positions.sort_by(|a, b| {
b.chunk_identifier()
.cmp(&a.chunk_identifier())
.then_with(|| a.index().cmp(&b.index()).reverse())
});
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down Expand Up @@ -636,13 +647,10 @@ mod tests {

// Remove some events.
room_events
.remove_events_by_position(
[
Position::new(ChunkIdentifier::new(2), 1),
Position::new(ChunkIdentifier::new(0), 1),
]
.into_iter(),
)
.remove_events_by_position(vec![
Position::new(ChunkIdentifier::new(2), 1),
Position::new(ChunkIdentifier::new(0), 1),
])
.unwrap();

assert_events_eq!(
Expand All @@ -655,7 +663,7 @@ mod tests {

// Ensure chunks are removed once empty.
room_events
.remove_events_by_position([Position::new(ChunkIdentifier::new(2), 0)].into_iter())
.remove_events_by_position(vec![Position::new(ChunkIdentifier::new(2), 0)])
.unwrap();

assert_events_eq!(
Expand All @@ -677,7 +685,7 @@ mod tests {
// Remove one undefined event.
// An error is expected.
room_events
.remove_events_by_position([Position::new(ChunkIdentifier::new(42), 153)].into_iter())
.remove_events_by_position(vec![Position::new(ChunkIdentifier::new(42), 153)])
.unwrap_err();

assert_events_eq!(room_events.events(), []);
Expand Down Expand Up @@ -759,4 +767,28 @@ mod tests {
assert_eq!(&output[0], "chunk #0: events[$12345678, $2]");
assert_eq!(&output[1], "chunk #1: gap['raclette']");
}

#[test]
fn test_sort_positions_descending() {
let mut positions = vec![
Position::new(ChunkIdentifier::new(2), 1),
Position::new(ChunkIdentifier::new(1), 0),
Position::new(ChunkIdentifier::new(2), 0),
Position::new(ChunkIdentifier::new(1), 1),
Position::new(ChunkIdentifier::new(0), 0),
];

sort_positions_descending(&mut positions);

assert_eq!(
positions,
&[
Position::new(ChunkIdentifier::new(2), 1),
Position::new(ChunkIdentifier::new(2), 0),
Position::new(ChunkIdentifier::new(1), 1),
Position::new(ChunkIdentifier::new(1), 0),
Position::new(ChunkIdentifier::new(0), 0),
]
);
}
}
90 changes: 63 additions & 27 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
use std::{collections::BTreeMap, fmt, sync::Arc};

use events::Gap;
use events::{sort_positions_descending, Gap};
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, TimelineEvent},
linked_chunk::Update,
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
};
use ruma::{
Expand Down Expand Up @@ -441,36 +440,23 @@ impl RoomEventCacheInner {
// No new events, thus no need to change the room events.
vec![]
} else {
state
.send_updates_to_store(
in_store_duplicated_event_ids
.into_iter()
.map(|(_event_id, position)| Update::RemoveItem { at: position })
.collect(),
)
// Remove the old duplicated events.
//
// We don't have to worry the removals can change the position of the
// existing events, because we are pushing all _new_
// `events` at the back.
let mut sync_timeline_events_diffs = state
.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
.await?;

// Add the previous back-pagination token (if present), followed by the timeline
// events themselves.
let (_, sync_timeline_events_diffs) = state
let (_, sync_timeline_events_diffs_next) = state
.with_events_mut(|room_events| {
if let Some(prev_token) = &prev_batch {
room_events.push_gap(Gap { prev_token: prev_token.clone() });
}

// Remove the old duplicated events.
//
// We don't have to worry the removals can change the position of the
// existing events, because we are pushing all _new_
// `events` at the back.
room_events
.remove_events_by_position(
in_memory_duplicated_event_ids
.into_iter()
.map(|(_event_id, position)| position),
)
.unwrap();

// Push the new events.
room_events.push_events(events.clone());

Expand All @@ -481,6 +467,7 @@ impl RoomEventCacheInner {
{
// Fill the AllEventsCache.
let mut all_events = self.all_events.write().await;

for sync_timeline_event in sync_timeline_events {
if let Some(event_id) = sync_timeline_event.event_id() {
all_events.append_related_event(&sync_timeline_event);
Expand All @@ -492,6 +479,8 @@ impl RoomEventCacheInner {
}
}

sync_timeline_events_diffs.extend(sync_timeline_events_diffs_next);

sync_timeline_events_diffs
};

Expand Down Expand Up @@ -547,11 +536,11 @@ mod private {
use matrix_sdk_base::{
deserialized_responses::{TimelineEvent, TimelineEventKind},
event_cache::{store::EventCacheStoreLock, Event, Gap},
linked_chunk::{lazy_loader, ChunkContent, Update},
linked_chunk::{lazy_loader, ChunkContent, Position, Update},
};
use matrix_sdk_common::executor::spawn;
use once_cell::sync::OnceCell;
use ruma::{serde::Raw, OwnedRoomId};
use ruma::{serde::Raw, OwnedEventId, OwnedRoomId};
use tracing::{error, instrument, trace};

use super::{
Expand All @@ -560,7 +549,7 @@ mod private {
EventCacheError,
},
events::RoomEvents,
LoadMoreEventsBackwardsOutcome,
sort_positions_descending, LoadMoreEventsBackwardsOutcome,
};

/// State for a single room's event cache.
Expand Down Expand Up @@ -812,6 +801,53 @@ mod private {
}
}

/// Remove events by their position, in `RoomEvents` and in
/// `EventCacheStore`.
///
/// This method is purposely isolated because it must ensure that
/// positions are sorted appropriately or it can be disastrous.
#[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
pub(super) async fn remove_events(
&mut self,
in_memory_events: Vec<(OwnedEventId, Position)>,
in_store_events: Vec<(OwnedEventId, Position)>,
) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
// In-store events.
{
let mut positions = in_store_events
.into_iter()
.map(|(_event_id, position)| position)
.collect::<Vec<_>>();

sort_positions_descending(&mut positions);

self.send_updates_to_store(
positions
.into_iter()
.map(|position| Update::RemoveItem { at: position })
.collect(),
)
.await?;
}

// In-memory events.
let ((), timeline_event_diffs) = self
.with_events_mut(|room_events| {
// `remove_events_by_position` sorts the positions by itself.
room_events
.remove_events_by_position(
in_memory_events
.into_iter()
.map(|(_event_id, position)| position)
.collect(),
)
.expect("failed to remove an event")
})
.await?;

Ok(timeline_event_diffs)
}

/// Propagate changes to the underlying storage.
#[instrument(skip_all)]
async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
Expand All @@ -820,7 +856,7 @@ mod private {
self.send_updates_to_store(updates).await
}

pub(super) async fn send_updates_to_store(
pub async fn send_updates_to_store(
&mut self,
mut updates: Vec<Update<TimelineEvent, Gap>>,
) -> Result<(), EventCacheError> {
Expand Down

0 comments on commit c1d8e6a

Please sign in to comment.