Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Implement event_cache::Deduplicator #3580

Closed
wants to merge 14 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore(sdk): Add an Event type alias for the sake of convenience.
This patch adds an `Event` type alias to `SyncTimelineEvent` to (i) make
the code shorter, (ii) remove some cognitive effort, (iii) make things
more convenient.
  • Loading branch information
Hywan committed Oct 28, 2024
commit 946274a128cb650fcf4647e5063554939df93c69
20 changes: 14 additions & 6 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
@@ -15,11 +15,18 @@
use std::{collections::BTreeSet, sync::Mutex};

use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;

use super::store::RoomEvents;

pub struct Deduplicator {
use super::store::{Event, RoomEvents};

/// Use `Deduplicator` to find duplicates.
///
/// This type uses a [bloom filter] to efficiently detect duplicates. Every time
/// [`Self::scan_and_learn`] is called, the bloom filter is updated (hence the
/// _learn_ part). Because a bloom filter has (rare) false positives, it is
/// still necessary to provide all existing events to apply a linear search.
///
/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
pub(super) struct Deduplicator {
bloom_filter: Mutex<GrowableBloom>,
}

@@ -55,7 +62,7 @@ impl Deduplicator {
existing_events: &'a RoomEvents,
) -> impl Iterator<Item = Decoration<I::Item>> + 'a
where
I: Iterator<Item = SyncTimelineEvent> + 'a,
I: Iterator<Item = Event> + 'a,
{
let mut already_seen = BTreeSet::new();

@@ -104,7 +111,7 @@ impl Deduplicator {

/// Information about the scanned collection of events.
#[derive(Debug)]
pub enum Decoration<I> {
pub(super) enum Decoration<I> {
/// This event is not duplicated.
Unique(I),

@@ -118,6 +125,7 @@ pub enum Decoration<I> {
#[cfg(test)]
mod tests {
use assert_matches2::assert_let;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_test::{EventBuilder, ALICE};
use ruma::{events::room::message::RoomMessageEventContent, owned_event_id, EventId};

25 changes: 14 additions & 11 deletions crates/matrix-sdk/src/event_cache/store.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,9 @@ use super::{
linked_chunk::{Chunk, ChunkIdentifier, Error, Iter, LinkedChunk, Position},
};

/// An alias for the real event type.
pub type Event = SyncTimelineEvent;

#[derive(Clone, Debug)]
pub struct Gap {
/// The token to use in the query, extracted from a previous "from" /
@@ -32,7 +35,7 @@ pub struct Gap {
const DEFAULT_CHUNK_CAPACITY: usize = 128;

pub struct RoomEvents {
chunks: LinkedChunk<DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>,
chunks: LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>,
deduplicator: Deduplicator,
}

@@ -53,9 +56,9 @@ impl RoomEvents {
}

/// Deduplicate `events` considering all events in `Self::chunks`.
fn deduplicate<'a, I>(&'a self, events: I) -> impl Iterator<Item = SyncTimelineEvent> + 'a
fn deduplicate<'a, I>(&'a self, events: I) -> impl Iterator<Item = Event> + 'a
where
I: Iterator<Item = SyncTimelineEvent> + 'a,
I: Iterator<Item = Event> + 'a,
{
self.deduplicator.scan_and_learn(events, self).map(
|decorated_event| match decorated_event {
@@ -79,7 +82,7 @@ impl RoomEvents {
/// The last event in `events` is the most recent one.
pub fn push_events<I>(&mut self, events: I)
where
I: IntoIterator<Item = SyncTimelineEvent>,
I: IntoIterator<Item = Event>,
{
let events = self.deduplicate(events.into_iter()).collect::<Vec<_>>();

@@ -94,7 +97,7 @@ impl RoomEvents {
/// Insert events at a specified position.
pub fn insert_events_at<I>(&mut self, events: I, position: Position) -> Result<(), Error>
where
I: IntoIterator<Item = SyncTimelineEvent>,
I: IntoIterator<Item = Event>,
{
let events = self.deduplicate(events.into_iter()).collect::<Vec<_>>();

@@ -117,9 +120,9 @@ impl RoomEvents {
&mut self,
events: I,
gap_identifier: ChunkIdentifier,
) -> Result<&Chunk<DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, Error>
) -> Result<&Chunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Error>
where
I: IntoIterator<Item = SyncTimelineEvent>,
I: IntoIterator<Item = Event>,
{
let events = self.deduplicate(events.into_iter()).collect::<Vec<_>>();

@@ -129,29 +132,29 @@ impl RoomEvents {
/// Search for a chunk, and return its identifier.
pub fn chunk_identifier<'a, P>(&'a self, predicate: P) -> Option<ChunkIdentifier>
where
P: FnMut(&'a Chunk<DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>) -> bool,
P: FnMut(&'a Chunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>) -> bool,
{
self.chunks.chunk_identifier(predicate)
}

/// Iterate over the chunks, forward.
///
/// The oldest chunk comes first.
pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, Event, Gap> {
self.chunks.chunks()
}

/// Iterate over the events, backward.
///
/// The most recent event comes first.
pub fn revents(&self) -> impl Iterator<Item = (Position, &SyncTimelineEvent)> {
pub fn revents(&self) -> impl Iterator<Item = (Position, &Event)> {
self.chunks.ritems()
}

/// Iterate over the events, forward.
///
/// The oldest event comes first.
pub fn events(&self) -> impl Iterator<Item = (Position, &SyncTimelineEvent)> {
pub fn events(&self) -> impl Iterator<Item = (Position, &Event)> {
self.chunks.items()
}
}