Skip to content

Commit

Permalink
fixup! use Decoration
Browse files Browse the repository at this point in the history
  • Loading branch information
Hywan committed Oct 21, 2024
1 parent 554e6fc commit 713cc08
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 74 deletions.
24 changes: 4 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/matrix-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ async-channel = "2.2.1"
async-stream = { workspace = true }
async-trait = { workspace = true }
axum = { version = "0.7.4", optional = true }
bloomfilter = { version = "1.0.13", default-features = false }
bytes = "1.1.0"
bytesize = "1.1"
chrono = { version = "0.4.23", optional = true }
Expand All @@ -85,6 +84,7 @@ eyeball-im = { workspace = true }
eyre = { version = "0.6.8", optional = true }
futures-core = { workspace = true }
futures-util = { workspace = true }
growable-bloom-filter = { workspace = true }
http = { workspace = true }
imbl = { workspace = true, features = ["serde"] }
indexmap = "2.0.2"
Expand Down
139 changes: 86 additions & 53 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,55 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::BTreeSet, ops::Not};
use std::collections::BTreeSet;

use bloomfilter::Bloom;
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use ruma::OwnedEventId;

use super::store::RoomEvents;

pub struct Deduplicator {
bloom_filter: Bloom<OwnedEventId>,
bloom_filter: GrowableBloom,
}

impl Deduplicator {
const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 800_000;
const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.001;
const SEED_FOR_HASHER: &'static [u8; 32] = b"matrix_sdk_event_cache_deduptor!";

/// Create a new `Self`.
pub fn new() -> Self {
Self {
bloom_filter: Bloom::new_for_fp_rate_with_seed(
Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS,
Self::DESIRED_FALSE_POSITIVE_RATE,
Self::SEED_FOR_HASHER,
),
bloom_filter: GrowableBloomBuilder::new()
.estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
.desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
.build(),
}
}

pub fn filter_and_learn<'a, I>(
/// Scan a collection of events and detect duplications.
///
/// This method takes a collection of events `events_to_scan` and returns a
/// new collection of events, where each event is decorated by a
/// [`Decoration`], so that the caller can decide what to do with these
/// events.
///
/// Each scanned event will update `Self`'s internal state.
///
/// `existing_events` represents all events of a room that already exist.
pub fn scan_and_learn<'a, I>(
&'a mut self,
events: I,
room_events: &'a RoomEvents,
) -> impl Iterator<Item = I::Item> + 'a
events_to_scan: I,
existing_events: &'a RoomEvents,
) -> impl Iterator<Item = Decoration<I::Item>> + 'a
where
I: Iterator<Item = SyncTimelineEvent> + 'a,
{
let mut already_seen = BTreeSet::new();

events.filter(move |event| {
events_to_scan.map(move |event| {
let Some(event_id) = event.event_id() else {
// The event has no `event_id`. Safe path: filter it out.
return false;
// The event has no `event_id`.
return Decoration::Invalid(event);
};

if self.bloom_filter.check_and_set(&event_id) {
Expand All @@ -64,33 +72,47 @@ impl Deduplicator {
// iterator itself contains duplicated events! We use a `BTreetSet`, otherwise
// using a bloom filter again may generate false positives.
if already_seen.contains(&event_id) {
// The iterator contains a duplicated `event`. Let's filter it out.
return false;
// The iterator contains a duplicated `event`.
return Decoration::Duplicated(event);
}

// Now we can iterate over all events to ensure `event` is not present in
// `room_events`.
let result = room_events
.revents()
.any(|(_position, other_event)| {
other_event.event_id().as_ref() == Some(&event_id)
})
.not();
// `existing_events`.
let duplicated = existing_events.revents().any(|(_position, other_event)| {
other_event.event_id().as_ref() == Some(&event_id)
});

already_seen.insert(event_id);

result
if duplicated {
Decoration::Duplicated(event)
} else {
Decoration::Ok(event)
}
} else {
already_seen.insert(event_id);

// Bloom filter has no false negatives. We are sure the event is NOT present: we
// can keep it in the iterator.
true
Decoration::Ok(event)
}
})
}
}

/// Information about the scanned collection of events.
#[derive(Debug)]
pub enum Decoration<I> {
/// This event is not duplicated.
Ok(I),

/// This event is duplicated.
Duplicated(I),

/// This event is invalid (i.e. not well formed).
Invalid(I),
}

#[cfg(test)]
mod tests {
use assert_matches2::assert_let;
Expand All @@ -101,8 +123,8 @@ mod tests {

fn sync_timeline_event(event_builder: &EventBuilder, event_id: &EventId) -> SyncTimelineEvent {
SyncTimelineEvent::new(event_builder.make_sync_message_event_with_id(
&*ALICE,
&event_id,
*ALICE,
event_id,
RoomMessageEventContent::text_plain("foo"),
))
}
Expand All @@ -120,18 +142,18 @@ mod tests {
let event_2 = sync_timeline_event(&event_builder, &event_id_2);

let mut deduplicator = Deduplicator::new();
let room_events = RoomEvents::new();
let existing_events = RoomEvents::new();

let mut events =
deduplicator.filter_and_learn([event_0, event_1, event_2].into_iter(), &room_events);
deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);

assert_let!(Some(event) = events.next());
assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0));

assert_let!(Some(event) = events.next());
assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_1));

assert_let!(Some(event) = events.next());
assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_2));

assert!(events.next().is_none());
Expand All @@ -148,22 +170,25 @@ mod tests {
let event_1 = sync_timeline_event(&event_builder, &event_id_1);

let mut deduplicator = Deduplicator::new();
let room_events = RoomEvents::new();
let existing_events = RoomEvents::new();

let mut events = deduplicator.filter_and_learn(
let mut events = deduplicator.scan_and_learn(
[
event_0.clone(), // OK
event_0, // Not OK
event_1, // OK
]
.into_iter(),
&room_events,
&existing_events,
);

assert_let!(Some(event) = events.next());
assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0.clone()));

assert_let!(Some(Decoration::Duplicated(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0));

assert_let!(Some(event) = events.next());
assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_1));

assert!(events.next().is_none());
Expand All @@ -182,35 +207,43 @@ mod tests {
let event_2 = sync_timeline_event(&event_builder, &event_id_2);

let mut deduplicator = Deduplicator::new();
let mut room_events = RoomEvents::new();
let mut existing_events = RoomEvents::new();

// Simulate `event_1` is inserted inside `room_events`.
// Simulate `event_1` is inserted inside `existing_events`.
{
let mut events =
deduplicator.filter_and_learn([event_1.clone()].into_iter(), &room_events);
deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events);

assert_let!(Some(event_1) = events.next());
assert_eq!(event_1.event_id(), Some(event_id_1));
assert_let!(Some(Decoration::Ok(event_1)) = events.next());
assert_eq!(event_1.event_id(), Some(event_id_1.clone()));

assert!(events.next().is_none());

drop(events); // make the borrow checker happy.

// Now we can push `event_1` inside `room_events`.
room_events.push_event(event_1);
// Now we can push `event_1` inside `existing_events`.
existing_events.push_events([event_1]);
}

// `event_1` will be duplicated.
{
let mut events = deduplicator
.filter_and_learn([event_0, event_1, event_2].into_iter(), &room_events);

assert_let!(Some(event) = events.next());
let mut events = deduplicator.scan_and_learn(
[
event_0, // OK
event_1, // Not OK
event_2, // Ok
]
.into_iter(),
&existing_events,
);

assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_0));

// `event_1` is missing.
assert_let!(Some(Decoration::Duplicated(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_1));

assert_let!(Some(event) = events.next());
assert_let!(Some(Decoration::Ok(event)) = events.next());
assert_eq!(event.event_id(), Some(event_id_2));

assert!(events.next().is_none());
Expand Down

0 comments on commit 713cc08

Please sign in to comment.