diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 33baf630b41..fd7657684ef 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -27,7 +27,7 @@ use matrix_sdk_base::{ }; use matrix_sdk_store_encryption::StoreCipher; use ruma::{MilliSecondsSinceUnixEpoch, RoomId}; -use rusqlite::{OptionalExtension, Transaction}; +use rusqlite::{OptionalExtension, Transaction, TransactionBehavior}; use tokio::fs; use tracing::{debug, trace}; @@ -378,9 +378,7 @@ impl EventCacheStore for SqliteEventCacheStore { let room_id = room_id.to_owned(); let this = self.clone(); - self.acquire() - .await? - .with_transaction(move |txn| -> Result<_, Self::Error> { + with_immediate_transaction(self.acquire().await?, move |txn| { for up in updates { match up { Update::NewItemsChunk { previous, new, next } => { @@ -709,6 +707,43 @@ impl EventCacheStore for SqliteEventCacheStore { } } +/// Like `deadpool::managed::Object::with_transaction`, but starts the +/// transaction in immediate (write) mode from the beginning, precluding errors +/// of the kind SQLITE_BUSY from happening, for transactions that may involve +/// both reads and writes, and start with a write. +async fn with_immediate_transaction< + T: Send + 'static, + F: FnOnce(&Transaction<'_>) -> Result + Send + 'static, +>( + conn: SqliteAsyncConn, + f: F, +) -> Result { + conn.interact(move |conn| -> Result { + // Start the transaction in IMMEDIATE mode since all updates may cause writes, + // to avoid read transactions upgrading to write mode and causing + // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions + conn.set_transaction_behavior(TransactionBehavior::Immediate); + + let code = || -> Result { + let txn = conn.transaction()?; + let res = f(&txn)?; + txn.commit()?; + Ok(res) + }; + + let res = code(); + + // Reset the transaction behavior to use Deferred, after this transaction has + // been run, whether it was successful or not. + conn.set_transaction_behavior(TransactionBehavior::Deferred); + + res + }) + .await + // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].` + .unwrap() +} + fn insert_chunk( txn: &Transaction<'_>, room_id: &Key,