Skip to content

Commit

Permalink
feat(sdk): Introduce event_cache::Deduplicator.
Browse files Browse the repository at this point in the history
This patch introduces `Deduplicator`, an efficient type to detect
duplicated events in the event cache. It uses a bloom filter, and
decorates a collection of events with `Decoration`, which an enum that
marks whether an event is unique, duplicated or invalid.
  • Loading branch information
Hywan committed Oct 28, 2024
1 parent 74283ea commit b04c21b
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ eyeball-im = { workspace = true }
eyre = { version = "0.6.8", optional = true }
futures-core = { workspace = true }
futures-util = { workspace = true }
growable-bloom-filter = { workspace = true }
http = { workspace = true }
imbl = { workspace = true, features = ["serde"] }
indexmap = "2.0.2"
Expand Down
266 changes: 266 additions & 0 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Copyright 2024 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
//! to learn more.
use std::{collections::BTreeSet, sync::Mutex};

use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};

use super::store::{Event, RoomEvents};

/// `Deduplicator` is an efficient type to find duplicated events.
///
/// It uses a [bloom filter] to provide a fast probabilistic answer to: “has
/// event E been seen already?”. False positives are possible, while false
/// negatives are impossible. In the case of a positive reply, we fallback to a
/// linear (backward) search on all events to check whether it's a false
/// positive or not
///
/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
pub struct Deduplicator {
bloom_filter: Mutex<GrowableBloom>,
}

impl Deduplicator {
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 800_000;
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.001;

/// Create a new `Deduplicator`.
pub fn new() -> Self {
Self {
bloom_filter: Mutex::new(
GrowableBloomBuilder::new()
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
.build(),
),
}
}

/// Scan a collection of events and detect duplications.
///
/// This method takes a collection of events `events_to_scan` and returns a
/// new collection of events, where each event is decorated by a
/// [`Decoration`], so that the caller can decide what to do with these
/// events.
///
/// Each scanned event will update `Self`'s internal state.
///
/// `existing_events` represents all events of a room that already exist.
pub fn scan_and_learn<'a, I>(
&'a self,
events_to_scan: I,
existing_events: &'a RoomEvents,
) -> impl Iterator<Item = Decoration<I::Item>> + 'a
where
I: Iterator<Item = Event> + 'a,
{
let mut already_seen = BTreeSet::new();

events_to_scan.map(move |event| {
let Some(event_id) = event.event_id() else {
// The event has no `event_id`.
return Decoration::Invalid(event);
};

if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
// Bloom filter has false positives. We are NOT sure the event is NOT present.
// Even if the false positive rate is low, we need to iterate over all events to
// ensure it isn't present.

// But first, let's ensure `event` is not a duplicate from `events`, i.e. if the
// iterator itself contains duplicated events! We use a `BTreetSet`, otherwise
// using a bloom filter again may generate false positives.
if already_seen.contains(&event_id) {
// The iterator contains a duplicated `event`.
return Decoration::Duplicated(event);
}

// Now we can iterate over all events to ensure `event` is not present in
// `existing_events`.
let duplicated = existing_events.revents().any(|(_position, other_event)| {
other_event.event_id().as_ref() == Some(&event_id)
});

already_seen.insert(event_id);

if duplicated {
Decoration::Duplicated(event)
} else {
Decoration::Unique(event)
}
} else {
already_seen.insert(event_id);

// Bloom filter has no false negatives. We are sure the event is NOT present: we
// can keep it in the iterator.
Decoration::Unique(event)
}
})
}
}

/// Information about the scanned collection of events.
#[derive(Debug)]
pub enum Decoration<I> {
/// This event is not duplicated.
Unique(I),

/// This event is duplicated.
Duplicated(I),

/// This event is invalid (i.e. not well formed).
Invalid(I),
}

#[cfg(test)]
mod tests {
use assert_matches2::assert_let;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_test::{EventBuilder, ALICE};
use ruma::{events::room::message::RoomMessageEventContent, owned_event_id, EventId};

use super::*;

fn sync_timeline_event(event_builder: &EventBuilder, event_id: &EventId) -> SyncTimelineEvent {
SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id(
*ALICE,
event_id,
RoomMessageEventContent::text_plain("foo"),
))
}

#[test]
fn test_filter_no_duplicate() {
let event_builder = EventBuilder::new();

let event_id_0 = owned_event_id!("$ev0");
let event_id_1 = owned_event_id!("$ev1");
let event_id_2 = owned_event_id!("$ev2");

let event_0 = sync_timeline_event(&event_builder, &event_id_0);
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
let event_2 = sync_timeline_event(&event_builder, &event_id_2);

let deduplicator = Deduplicator::new();
let existing_events = RoomEvents::new();

let mut events =
deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0));

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_1));

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_2));

assert!(events.next().is_none());
}

#[test]
fn test_filter_duplicates_in_new_events() {
let event_builder = EventBuilder::new();

let event_id_0 = owned_event_id!("$ev0");
let event_id_1 = owned_event_id!("$ev1");

let event_0 = sync_timeline_event(&event_builder, &event_id_0);
let event_1 = sync_timeline_event(&event_builder, &event_id_1);

let deduplicator = Deduplicator::new();
let existing_events = RoomEvents::new();

let mut events = deduplicator.scan_and_learn(
[
event_0.clone(), // OK
event_0, // Not OK
event_1, // OK
]
.into_iter(),
&existing_events,
);

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0.clone()));

assert_let!(Some(Decoration::Duplicated(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0));

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_1));

assert!(events.next().is_none());
}

#[test]
fn test_filter_duplicates_with_existing_events() {
let event_builder = EventBuilder::new();

let event_id_0 = owned_event_id!("$ev0");
let event_id_1 = owned_event_id!("$ev1");
let event_id_2 = owned_event_id!("$ev2");

let event_0 = sync_timeline_event(&event_builder, &event_id_0);
let event_1 = sync_timeline_event(&event_builder, &event_id_1);
let event_2 = sync_timeline_event(&event_builder, &event_id_2);

let deduplicator = Deduplicator::new();
let mut existing_events = RoomEvents::new();

// Simulate `event_1` is inserted inside `existing_events`.
{
let mut events =
deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events);

assert_let!(Some(Decoration::Unique(event_1)) = events.next());
assert_eq!(event_1.event_id(), Some(event_id_1.clone()));

assert!(events.next().is_none());

drop(events); // make the borrow checker happy.

// Now we can push `event_1` inside `existing_events`.
existing_events.push_events([event_1]);
}

// `event_1` will be duplicated.
{
let mut events = deduplicator.scan_and_learn(
[
event_0, // OK
event_1, // Not OK
event_2, // Ok
]
.into_iter(),
&existing_events,
);

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0));

assert_let!(Some(Decoration::Duplicated(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_1));

assert_let!(Some(Decoration::Unique(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_2));

assert!(events.next().is_none());
}
}
}
1 change: 1 addition & 0 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use self::{
};
use crate::{client::WeakClient, room::WeakRoom, Client};

mod deduplicator;
mod linked_chunk;
mod pagination;
mod store;
Expand Down

0 comments on commit b04c21b

Please sign in to comment.