Skip to content

Commit

Permalink
fix(sqlite): Design a new schema to get faster insertions.
Browse files Browse the repository at this point in the history
This patch is twofold. First off, it provides a new schema allowing to
improve the performance of `SqliteEventCacheStore` for 100_000 events
from 6.7k events/sec to 284k events/sec on my machine.

Second, it now assumes that `EventCacheStore` does NOT store invalid
events. It was already the case, but the SQLite schema was not rejecting
invalid event in case some were handled. It's now explicitely forbidden.
  • Loading branch information
Hywan committed Mar 5, 2025
1 parent 36888b0 commit 16a5401
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
DROP INDEX "linked_chunks_id_and_room_id";
DROP INDEX "linked_chunks_event_id_and_room_id";
DROP TABLE "events";
DROP TABLE "gaps";
DROP TABLE "linked_chunks";

CREATE TABLE "linked_chunks" (
-- Which room does this chunk belong to? (hashed key shared with the two other tables)
"room_id" BLOB NOT NULL,
-- Identifier of the chunk, unique per room. Corresponds to a `ChunkIdentifier`.
"id" INTEGER NOT NULL,

-- Previous chunk in the linked list. Corresponds to a `ChunkIdentifier`.
"previous" INTEGER,
-- Next chunk in the linked list. Corresponds to a `ChunkIdentifier`.
"next" INTEGER,
-- Type of underlying entries: E for events, G for gaps
"type" TEXT CHECK("type" IN ('E', 'G')) NOT NULL,

-- Primary key is composed of the room ID and the chunk identifier.
-- Such pairs must be unique.
PRIMARY KEY (room_id, id)
)
WITHOUT ROWID;

CREATE TABLE "gaps" (
-- Which room does this event belong to? (hashed key shared with linked_chunks)
"room_id" BLOB NOT NULL,
-- Which chunk does this gap refer to? Corresponds to a `ChunkIdentifier`.
"chunk_id" INTEGER NOT NULL,

-- The previous batch token of a gap (encrypted value).
"prev_token" BLOB NOT NULL,

-- Primary key is composed of the room ID and the chunk identifier.
-- Such pairs must be unique.
PRIMARY KEY (room_id, chunk_id),

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY (chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE
)
WITHOUT ROWID;

-- Items for an event chunk.
CREATE TABLE "events" (
-- Which room does this event belong to? (hashed key shared with linked_chunks)
"room_id" BLOB NOT NULL,
-- Which chunk does this event refer to? Corresponds to a `ChunkIdentifier`.
"chunk_id" INTEGER NOT NULL,

-- `OwnedEventId` for events.
"event_id" BLOB NOT NULL,
-- JSON serialized `TimelineEvent` (encrypted value).
"content" BLOB NOT NULL,
-- Position (index) in the chunk.
"position" INTEGER NOT NULL,

-- Primary key is the event ID.
PRIMARY KEY (event_id),

-- We need a uniqueness constraint over the `room_id`, `chunk_id` and
-- `position` tuple because (i) they must be unique, (ii) it dramatically
-- improves the performance.
UNIQUE (room_id, chunk_id, position),

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY (room_id, chunk_id) REFERENCES linked_chunks(room_id, id) ON DELETE CASCADE
)
WITHOUT ROWID;
40 changes: 28 additions & 12 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{
deserialized_responses::TimelineEvent,
event_cache::{
store::{
media::{
Expand All @@ -38,7 +39,7 @@ use matrix_sdk_store_encryption::StoreCipher;
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
use tokio::fs;
use tracing::{debug, trace};
use tracing::{debug, error, trace};

use crate::{
error::{Error, Result},
Expand All @@ -64,7 +65,7 @@ mod keys {
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 5;
const DATABASE_VERSION: u8 = 6;

/// The string used to identify a chunk of type events, in the `type` field in
/// the database.
Expand Down Expand Up @@ -351,6 +352,14 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}

if version < 6 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
txn.set_db_version(6)
})
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -499,11 +508,19 @@ impl EventCacheStore for SqliteEventCacheStore {
"INSERT INTO events(chunk_id, room_id, event_id, content, position) VALUES (?, ?, ?, ?, ?)"
)?;

for (i, event) in items.into_iter().enumerate() {
let invalid_event = |event: TimelineEvent| {
let Some(event_id) = event.event_id() else {
error!(%room_id, "Trying to push an event with no ID");
return None;
};

Some((event_id.to_string(), event))
};

for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
let serialized = serde_json::to_vec(&event)?;
let content = this.encode_value(serialized)?;

let event_id = event.event_id().map(|event_id| event_id.to_string());
let index = at.index() + i;

statement.execute((chunk_id, &hashed_room_id, event_id, content, index))?;
Expand All @@ -520,7 +537,10 @@ impl EventCacheStore for SqliteEventCacheStore {
let content = this.encode_value(serialized)?;

// The event id should be the same, but just in case it changed…
let event_id = event.event_id().map(|event_id| event_id.to_string());
let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
error!(%room_id, "Trying to replace an event with a new one that has no ID");
continue;
};

txn.execute(
r#"
Expand Down Expand Up @@ -829,22 +849,18 @@ impl EventCacheStore for SqliteEventCacheStore {
.prepare(&query)?
.query_map(parameters, |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, String>(0)?,
row.get::<_, u64>(1)?,
row.get::<_, usize>(2)?
))
})?
{
let (duplicated_event, chunk_identifier, index) = duplicated_event?;

let Some(duplicated_event) = duplicated_event else {
// Event ID is malformed, let's skip it.
continue;
};

let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
// Normally unreachable, but the event ID has been stored even if it is
// malformed, let's skip it.
error!(%duplicated_event, %room_id, "Reading an malformed event ID");
continue;
};

Expand Down
33 changes: 14 additions & 19 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::BTreeSet, fmt, sync::Mutex};
use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
use matrix_sdk_base::{event_cache::store::EventCacheStoreLock, linked_chunk::Position};
use ruma::{OwnedEventId, OwnedRoomId};
use tracing::{debug, warn};
use tracing::{debug, error};

use super::{
room::events::{Event, RoomEvents},
Expand Down Expand Up @@ -204,26 +204,22 @@ impl BloomFilterDeduplicator {

let events = self
.scan_and_learn(events.into_iter(), room_events)
.filter_map(|decorated_event| match decorated_event {
Decoration::Unique(event) => Some(event),
.map(|decorated_event| match decorated_event {
Decoration::Unique(event) => event,
Decoration::Duplicated((event, position)) => {
debug!(event_id = ?event.event_id(), "Found a duplicated event");

let event_id = event
.event_id()
// SAFETY: An event with no ID is decorated as
// `Decoration::Invalid`. Thus, it's
// safe to unwrap the `Option<OwnedEventId>` here.
// SAFETY: An event with no ID is not possible, as invalid events are
// already filtered out. Thus, it's safe to unwrap the
// `Option<OwnedEventId>` here.
.expect("The event has no ID");

duplicated_event_ids.push((event_id, position));

// Keep the new event!
Some(event)
}
Decoration::Invalid(event) => {
warn!(?event, "Found an event with no ID");
None
event
}
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -253,13 +249,15 @@ impl BloomFilterDeduplicator {
where
I: Iterator<Item = Event> + 'a,
{
new_events_to_scan.map(move |event| {
new_events_to_scan.filter_map(move |event| {
let Some(event_id) = event.event_id() else {
// The event has no `event_id`.
return Decoration::Invalid(event);
// The event has no `event_id`. This is normally unreachable as event with no ID
// are already filtered out.
error!(?event, "Found an event with no ID");
return None;
};

if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
Some(if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
// Oh oh, it looks like we have found a duplicate!
//
// However, bloom filters have false positives. We are NOT sure the event is NOT
Expand All @@ -282,7 +280,7 @@ impl BloomFilterDeduplicator {
// Bloom filter has no false negatives. We are sure the event is NOT present: we
// can keep it in the iterator.
Decoration::Unique(event)
}
})
})
}
}
Expand All @@ -295,9 +293,6 @@ enum Decoration<I> {

/// This event is duplicated.
Duplicated((I, Position)),

/// This event is invalid (i.e. not well formed).
Invalid(I),
}

pub(super) struct DeduplicationOutcome {
Expand Down

0 comments on commit 16a5401

Please sign in to comment.