Skip to content

Commit

Permalink
fix(event cache store): start RemoveChunk transactions in write mode
Browse files Browse the repository at this point in the history
This is unfortunate, but we're hitting a bad case of a read transaction
upgrading to a write transaction, and that can plain fail if there's any
other write transaction already happening in the background.

To work around this, we need to start the transaction in write mode, so
start with the update statements themselves. This means some work is
duplicated (fetching a linked chunk), but this should be fine since
there's an index on the pair (chunk id, room id).
  • Loading branch information
bnjbvr committed Dec 12, 2024
1 parent 0264e49 commit 5b8c751
Showing 1 changed file with 76 additions and 19 deletions.
95 changes: 76 additions & 19 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,22 +441,44 @@ impl EventCacheStore for SqliteEventCacheStore {

trace!(%room_id, "removing chunk @ {chunk_id}");

// Find chunk to delete.
let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
"SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
(chunk_id, &hashed_room_id),
|row| Ok((row.get(0)?, row.get(1)?))
)?;
// Note: previously, we used a single select query to retrieve the
// previous/next values before doing two UPDATE. Unfortunately, this
// means that a read-only transaction may be upgraded to a write
// transaction, which fails if there's another write transaction
// happening elsewhere.
//
// To work-around this, we include the select statement in the update
// statement, so the transaction immediately starts in write mode.

// Replace its previous' next to its own next.
if let Some(previous) = previous {
txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
}
//
// The inner SELECT retrieves the current chunk's next, and it's stored
// as cur.next.

txn.execute(r#"
UPDATE linked_chunks AS lc
SET next = cur.next
FROM
(SELECT id, next
FROM linked_chunks
WHERE id = ? AND room_id = ?
)
AS cur
WHERE lc.next = cur.id AND lc.room_id = ?
"#, (chunk_id, &hashed_room_id, &hashed_room_id))?;

// Replace its next' previous to its own previous.
if let Some(next) = next {
txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
}
txn.execute(r#"
UPDATE linked_chunks AS lc
SET previous = cur.previous
FROM
(SELECT id, previous
FROM linked_chunks
WHERE id = ? AND room_id = ?
)
AS cur
WHERE lc.previous = cur.id AND lc.room_id = ?
"#, (chunk_id, &hashed_room_id, &hashed_room_id))?;

// Now delete it, and let cascading delete corresponding entries in the
// other data tables.
Expand Down Expand Up @@ -779,7 +801,7 @@ mod tests {
use tempfile::{tempdir, TempDir};

use super::SqliteEventCacheStore;
use crate::utils::SqliteAsyncConnExt;
use crate::{event_cache_store::keys, utils::SqliteAsyncConnExt};

static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
static NUM: AtomicU32 = AtomicU32::new(0);
Expand Down Expand Up @@ -971,11 +993,12 @@ mod tests {
async fn test_linked_chunk_remove_chunk() {
let store = get_event_cache_store().await.expect("creating cache store failed");

let room_id = &DEFAULT_TEST_ROOM_ID;
let room_id = &*DEFAULT_TEST_ROOM_ID;
let room_id2 = room_id!("!a:b.c");

store
.handle_linked_chunk_updates(
room_id,
&room_id,
vec![
Update::NewGapChunk {
previous: None,
Expand All @@ -1001,7 +1024,36 @@ mod tests {
.await
.unwrap();

let mut chunks = store.reload_linked_chunk(room_id).await.unwrap();
// Add some updates to another room using the same IDs, to make sure the query
// only affects a single room.
store
.handle_linked_chunk_updates(
room_id2,
vec![
Update::NewGapChunk {
previous: None,
new: ChunkIdentifier::new(42),
next: None,
gap: Gap { prev_token: "raclette".to_owned() },
},
Update::NewGapChunk {
previous: Some(ChunkIdentifier::new(42)),
new: ChunkIdentifier::new(43),
next: None,
gap: Gap { prev_token: "fondue".to_owned() },
},
Update::NewGapChunk {
previous: Some(ChunkIdentifier::new(43)),
new: ChunkIdentifier::new(44),
next: None,
gap: Gap { prev_token: "tartiflette".to_owned() },
},
],
)
.await
.unwrap();

let mut chunks = store.reload_linked_chunk(&room_id).await.unwrap();

assert_eq!(chunks.len(), 2);

Expand All @@ -1023,15 +1075,17 @@ mod tests {
});

// Check that cascading worked. Yes, sqlite, I doubt you.
let store2 = store.clone();
let gaps = store
.acquire()
.await
.unwrap()
.with_transaction(|txn| -> rusqlite::Result<_> {
.with_transaction(move |txn| -> rusqlite::Result<_> {
let mut gaps = Vec::new();
let hashed_room_id = store2.encode_key(keys::LINKED_CHUNKS, *room_id);
for data in txn
.prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
.query_map((), |row| row.get::<_, u64>(0))?
.prepare("SELECT chunk_id FROM gaps WHERE room_id = ? ORDER BY chunk_id")?
.query_map((&hashed_room_id,), |row| row.get::<_, u64>(0))?
{
gaps.push(data?);
}
Expand All @@ -1041,6 +1095,9 @@ mod tests {
.unwrap();

assert_eq!(gaps, vec![42, 44]);

// Sanity-check: chunks from the second room haven't been touched.
assert_eq!(store.reload_linked_chunk(room_id2).await.unwrap().len(), 3);
}

#[async_test]
Expand Down

0 comments on commit 5b8c751

Please sign in to comment.