Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sqlite): SqliteEventCacheStore is 35 times faster #4739

Merged
merged 3 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pprof = { version = "0.14.0", features = ["flamegraph", "criterion"] }
name = "crypto_bench"
harness = false

[[bench]]
name = "linked_chunk"
harness = false

[[bench]]
name = "store_bench"
harness = false
Expand Down
156 changes: 156 additions & 0 deletions benchmarks/benches/linked_chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use std::{sync::Arc, time::Duration};

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use matrix_sdk::{
linked_chunk::{LinkedChunk, Update},
SqliteEventCacheStore,
};
use matrix_sdk_base::event_cache::{
store::{DynEventCacheStore, IntoEventCacheStore, MemoryStore, DEFAULT_CHUNK_CAPACITY},
Event, Gap,
};
use matrix_sdk_test::{event_factory::EventFactory, ALICE};
use ruma::{room_id, EventId};
use tempfile::tempdir;
use tokio::runtime::Builder;

#[derive(Clone, Debug)]
enum Operation {
PushItemsBack(Vec<Event>),
PushGapBack(Gap),
}

pub fn writing(c: &mut Criterion) {
// Create a new asynchronous runtime.
let runtime = Builder::new_multi_thread()
.enable_time()
.enable_io()
.build()
.expect("Failed to create an asynchronous runtime");

let room_id = room_id!("!foo:bar.baz");
let event_factory = EventFactory::new().room(room_id).sender(&ALICE);

let mut group = c.benchmark_group("writing");
group.sample_size(10).measurement_time(Duration::from_secs(30));

for number_of_events in [10, 100, 1000, 10_000, 100_000] {
let sqlite_temp_dir = tempdir().unwrap();

// Declare new stores for this set of events.
let stores: [(&str, Option<Arc<DynEventCacheStore>>); 3] = [
("none", None),
("memory store", Some(MemoryStore::default().into_event_cache_store())),
(
"sqlite store",
runtime.block_on(async {
Some(
SqliteEventCacheStore::open(sqlite_temp_dir.path().join("bench"), None)
.await
.unwrap()
.into_event_cache_store(),
)
}),
),
];

for (store_name, store) in stores {
// Create the operations we want to bench.
let mut operations = Vec::new();

{
let mut events = (0..number_of_events)
.map(|nth| {
event_factory
.text_msg("foo")
.event_id(&EventId::parse(format!("$ev{nth}")).unwrap())
.into_event()
})
.peekable();

let mut gap_nth = 0;

while events.peek().is_some() {
{
let events_to_push_back = events.by_ref().take(80).collect::<Vec<_>>();

if events_to_push_back.is_empty() {
break;
}

operations.push(Operation::PushItemsBack(events_to_push_back));
}

{
operations.push(Operation::PushGapBack(Gap {
prev_token: format!("gap{gap_nth}"),
}));
gap_nth += 1;
}
}
}

// Define the throughput.
group.throughput(Throughput::Elements(number_of_events));

// Get a bencher.
group.bench_with_input(
BenchmarkId::new(store_name, number_of_events),
&operations,
|bencher, operations| {
// Bench the routine.
bencher.to_async(&runtime).iter_batched(
|| operations.clone(),
|operations| async {
// The routine to bench!

let mut linked_chunk = LinkedChunk::<DEFAULT_CHUNK_CAPACITY, Event, Gap>::new_with_update_history();

for operation in operations {
match operation {
Operation::PushItemsBack(events) => linked_chunk.push_items_back(events),
Operation::PushGapBack(gap) => linked_chunk.push_gap_back(gap),
}
}

if let Some(store) = &store {
let updates = linked_chunk.updates().unwrap().take();
store.handle_linked_chunk_updates(room_id, updates).await.unwrap();
// Empty the store.
store.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await.unwrap();
}

},
BatchSize::SmallInput
)
},
);

{
let _guard = runtime.enter();
drop(store);
}
}
}

group.finish()
}

fn criterion() -> Criterion {
#[cfg(target_os = "linux")]
let criterion = Criterion::default().with_profiler(pprof::criterion::PProfProfiler::new(
100,
pprof::criterion::Output::Flamegraph(None),
));
#[cfg(not(target_os = "linux"))]
let criterion = Criterion::default();

criterion
}

criterion_group! {
name = event_cache;
config = criterion();
targets = writing,
}
criterion_main!(event_cache);
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
DROP INDEX "linked_chunks_id_and_room_id";
DROP INDEX "linked_chunks_event_id_and_room_id";
DROP TABLE "events";
DROP TABLE "gaps";
DROP TABLE "linked_chunks";

CREATE TABLE "linked_chunks" (
-- Which room does this chunk belong to? (hashed key shared with the two other tables)
"room_id" BLOB NOT NULL,
-- Identifier of the chunk, unique per room. Corresponds to a `ChunkIdentifier`.
"id" INTEGER NOT NULL,

-- Previous chunk in the linked list. Corresponds to a `ChunkIdentifier`.
"previous" INTEGER,
-- Next chunk in the linked list. Corresponds to a `ChunkIdentifier`.
"next" INTEGER,
-- Type of underlying entries: E for events, G for gaps
"type" TEXT CHECK("type" IN ('E', 'G')) NOT NULL,

-- Primary key is composed of the room ID and the chunk identifier.
-- Such pairs must be unique.
PRIMARY KEY (room_id, id)
)
WITHOUT ROWID;

CREATE TABLE "gaps" (
-- Which room does this event belong to? (hashed key shared with linked_chunks)
"room_id" BLOB NOT NULL,
-- Which chunk does this gap refer to? Corresponds to a `ChunkIdentifier`.
"chunk_id" INTEGER NOT NULL,

-- The previous batch token of a gap (encrypted value).
"prev_token" BLOB NOT NULL,

-- Primary key is composed of the room ID and the chunk identifier.
-- Such pairs must be unique.
PRIMARY KEY (room_id, chunk_id),

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY (chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE
)
WITHOUT ROWID;

-- Items for an event chunk.
CREATE TABLE "events" (
-- Which room does this event belong to? (hashed key shared with linked_chunks)
"room_id" BLOB NOT NULL,
-- Which chunk does this event refer to? Corresponds to a `ChunkIdentifier`.
"chunk_id" INTEGER NOT NULL,

-- `OwnedEventId` for events.
"event_id" BLOB NOT NULL,
-- JSON serialized `TimelineEvent` (encrypted value).
"content" BLOB NOT NULL,
-- Position (index) in the chunk.
"position" INTEGER NOT NULL,

-- Primary key is the event ID.
PRIMARY KEY (event_id),

-- We need a uniqueness constraint over the `room_id`, `chunk_id` and
-- `position` tuple because (i) they must be unique, (ii) it dramatically
-- improves the performance.
UNIQUE (room_id, chunk_id, position),

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY (room_id, chunk_id) REFERENCES linked_chunks(room_id, id) ON DELETE CASCADE
)
WITHOUT ROWID;
57 changes: 38 additions & 19 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
use async_trait::async_trait;
use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
use matrix_sdk_base::{
deserialized_responses::TimelineEvent,
event_cache::{
store::{
media::{
Expand All @@ -38,7 +39,7 @@ use matrix_sdk_store_encryption::StoreCipher;
use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
use tokio::fs;
use tracing::{debug, trace};
use tracing::{debug, error, trace};

use crate::{
error::{Error, Result},
Expand All @@ -64,7 +65,7 @@ mod keys {
/// This is used to figure whether the SQLite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`run_migrations`] function.
const DATABASE_VERSION: u8 = 5;
const DATABASE_VERSION: u8 = 6;

/// The string used to identify a chunk of type events, in the `type` field in
/// the database.
Expand Down Expand Up @@ -351,6 +352,14 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
.await?;
}

if version < 6 {
conn.with_transaction(|txn| {
txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
txn.set_db_version(6)
})
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -486,24 +495,35 @@ impl EventCacheStore for SqliteEventCacheStore {
}

Update::PushItems { at, items } => {
if items.is_empty() {
// Should never happens, but better be safe.
continue;
}

let chunk_id = at.chunk_identifier().index();

trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());

for (i, event) in items.into_iter().enumerate() {
let mut statement = txn.prepare(
"INSERT INTO events(chunk_id, room_id, event_id, content, position) VALUES (?, ?, ?, ?, ?)"
)?;

let invalid_event = |event: TimelineEvent| {
let Some(event_id) = event.event_id() else {
error!(%room_id, "Trying to push an event with no ID");
return None;
};

Some((event_id.to_string(), event))
};

for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
let serialized = serde_json::to_vec(&event)?;
let content = this.encode_value(serialized)?;

let event_id = event.event_id().map(|event_id| event_id.to_string());
let index = at.index() + i;

txn.execute(
r#"
INSERT INTO events(chunk_id, room_id, event_id, content, position)
VALUES (?, ?, ?, ?, ?)
"#,
(chunk_id, &hashed_room_id, event_id, content, index),
)?;
statement.execute((chunk_id, &hashed_room_id, event_id, content, index))?;
}
}

Expand All @@ -517,7 +537,10 @@ impl EventCacheStore for SqliteEventCacheStore {
let content = this.encode_value(serialized)?;

// The event id should be the same, but just in case it changed…
let event_id = event.event_id().map(|event_id| event_id.to_string());
let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
error!(%room_id, "Trying to replace an event with a new one that has no ID");
continue;
};

txn.execute(
r#"
Expand Down Expand Up @@ -826,22 +849,18 @@ impl EventCacheStore for SqliteEventCacheStore {
.prepare(&query)?
.query_map(parameters, |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, String>(0)?,
row.get::<_, u64>(1)?,
row.get::<_, usize>(2)?
))
})?
{
let (duplicated_event, chunk_identifier, index) = duplicated_event?;

let Some(duplicated_event) = duplicated_event else {
// Event ID is malformed, let's skip it.
continue;
};

let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
// Normally unreachable, but the event ID has been stored even if it is
// malformed, let's skip it.
error!(%duplicated_event, %room_id, "Reading an malformed event ID");
continue;
};

Expand Down
Loading
Loading