Skip to content

Commit

Permalink
feat(sdk): EventCacheStore doesn't store invalid events.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hywan committed Mar 5, 2025
1 parent e65267f commit aed17bc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,22 @@ CREATE TABLE "events" (
-- Which chunk does this event refer to? Corresponds to a `ChunkIdentifier`.
"chunk_id" INTEGER NOT NULL,

-- `OwnedEventId` for events, can be null if malformed.
"event_id" TEXT,
-- `OwnedEventId` for events.
"event_id" BLOB NOT NULL,
-- JSON serialized `TimelineEvent` (encrypted value).
"content" BLOB NOT NULL,
-- Position (index) in the chunk.
"position" INTEGER NOT NULL,

-- Primary key cannot be the event ID because it can be null. However, we
-- can use the room ID, chunk ID and position to get a nice unique value.
PRIMARY KEY (room_id, chunk_id, position),
-- Primary key is the event ID.
PRIMARY KEY (event_id),

-- However, we need a uniqueness constraint over the `room_id`, `chunk_id`
-- and `position` tupl because (i) they must be unique, (ii) it dramatically
-- improves the performance.
UNIQUE (room_id, chunk_id, position),

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY (room_id, chunk_id) REFERENCES linked_chunks(room_id, id) ON DELETE CASCADE
)
WITHOUT ROWID;

CREATE INDEX "events_event_id_and_room_id" ON events (room_id, event_id);
32 changes: 21 additions & 11 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use matrix_sdk_store_encryption::StoreCipher;
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
use tokio::fs;
use tracing::{debug, trace};
use tracing::{debug, error, trace};

use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -507,11 +507,22 @@ impl EventCacheStore for SqliteEventCacheStore {
"INSERT INTO events(chunk_id, room_id, event_id, content, position) VALUES (?, ?, ?, ?, ?)"
)?;

for (i, event) in items.into_iter().enumerate() {
let items = items
.into_iter()
.filter_map(|event| {
let Some(event_id) = event.event_id() else {
error!(%room_id, "Trying to push an event with no ID");
return None;
};

Some((event_id.to_string(), event))
})
.enumerate();

for (i, (event_id, event)) in items {
let serialized = serde_json::to_vec(&event)?;
let content = this.encode_value(serialized)?;

let event_id = event.event_id().map(|event_id| event_id.to_string());
let index = at.index() + i;

statement.execute((chunk_id, &hashed_room_id, event_id, content, index))?;
Expand All @@ -528,7 +539,10 @@ impl EventCacheStore for SqliteEventCacheStore {
let content = this.encode_value(serialized)?;

// The event id should be the same, but just in case it changed…
let event_id = event.event_id().map(|event_id| event_id.to_string());
let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
error!(%room_id, "Trying to replace an event with a new one that has no ID");
continue;
};

txn.execute(
r#"
Expand Down Expand Up @@ -837,22 +851,18 @@ impl EventCacheStore for SqliteEventCacheStore {
.prepare(&query)?
.query_map(parameters, |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, String>(0)?,
row.get::<_, u64>(1)?,
row.get::<_, usize>(2)?
))
})?
{
let (duplicated_event, chunk_identifier, index) = duplicated_event?;

let Some(duplicated_event) = duplicated_event else {
// Event ID is malformed, let's skip it.
continue;
};

let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
// Normally unreachable, but the event ID has been stored even if it is
// malformed, let's skip it.
error!(%duplicated_event, %room_id, "Reading an malformed event ID");
continue;
};

Expand Down
27 changes: 11 additions & 16 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::BTreeSet, fmt, sync::Mutex};
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use matrix_sdk_base::{event_cache::store::EventCacheStoreLock, linked_chunk::Position};
use ruma::{OwnedEventId, OwnedRoomId};
use tracing::{debug, warn};
use tracing::{debug, error};

use super::{
room::events::{Event, RoomEvents},
Expand Down Expand Up @@ -211,20 +211,16 @@ impl BloomFilterDeduplicator {

let event_id = event
.event_id()
// SAFETY: An event with no ID is decorated as
// `Decoration::Invalid`. Thus, it's
// safe to unwrap the `Option<OwnedEventId>` here.
// SAFETY: An event with no ID is not possible, as invalid events are
// already filtered out. Thus, it's safe to unwrap the
// `Option<OwnedEventId>` here.
.expect("The event has no ID");

duplicated_event_ids.push((event_id, position));

// Keep the new event!
Some(event)
}
Decoration::Invalid(event) => {
warn!(?event, "Found an event with no ID");
None
}
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -253,13 +249,15 @@ impl BloomFilterDeduplicator {
where
I: Iterator<Item = Event> + 'a,
{
new_events_to_scan.map(move |event| {
new_events_to_scan.filter_map(move |event| {
let Some(event_id) = event.event_id() else {
// The event has no `event_id`.
return Decoration::Invalid(event);
// The event has no `event_id`. This is normally unreachable as event with no ID
// are already filtered out.
error!(?event, "Found an event with no ID");
return None;
};

if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
Some(if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
// Oh oh, it looks like we have found a duplicate!
//
// However, bloom filters have false positives. We are NOT sure the event is NOT
Expand All @@ -282,7 +280,7 @@ impl BloomFilterDeduplicator {
// Bloom filter has no false negatives. We are sure the event is NOT present: we
// can keep it in the iterator.
Decoration::Unique(event)
}
})
})
}
}
Expand All @@ -295,9 +293,6 @@ enum Decoration<I> {

/// This event is duplicated.
Duplicated((I, Position)),

/// This event is invalid (i.e. not well formed).
Invalid(I),
}

pub(super) struct DeduplicationOutcome {
Expand Down

0 comments on commit aed17bc

Please sign in to comment.