diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/002_lease_locks.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/002_lease_locks.sql new file mode 100644 index 00000000000..e1e32d2c2ad --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/002_lease_locks.sql @@ -0,0 +1,5 @@ +CREATE TABLE "lease_locks" ( + "key" TEXT PRIMARY KEY NOT NULL, + "holder" TEXT NOT NULL, + "expiration" REAL NOT NULL +); diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 2b760731125..a3567caf8fa 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -7,6 +7,7 @@ use matrix_sdk_base::{ media::{MediaRequest, UniqueKey}, }; use matrix_sdk_store_encryption::StoreCipher; +use ruma::MilliSecondsSinceUnixEpoch; use rusqlite::OptionalExtension; use tokio::fs; use tracing::debug; @@ -26,8 +27,8 @@ mod keys { /// /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in -/// the [`SqliteEventCacheStore::run_migrations`] function. -const DATABASE_VERSION: u8 = 1; +/// the [`run_migrations`] function. +const DATABASE_VERSION: u8 = 2; /// A SQLite-based event cache store. #[derive(Clone)] @@ -133,6 +134,14 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 2 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?; + txn.set_db_version(2) + }) + .await?; + } + Ok(()) } @@ -145,8 +154,32 @@ impl EventCacheStore for SqliteEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { - todo!() + ) -> Result { + let key = key.to_owned(); + let holder = holder.to_owned(); + + let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); + let expiration = now + lease_duration_ms as u64; + + let num_touched = self + .acquire() + .await? + .with_transaction(move |txn| { + txn.execute( + "INSERT INTO lease_locks (key, holder, expiration) + VALUES (?1, ?2, ?3) + ON CONFLICT (key) + DO + UPDATE SET holder = ?2, expiration = ?3 + WHERE holder = ?2 + OR expiration < ?4 + ", + (key, holder, u64::from(expiration), u64::from(now)), + ) + }) + .await?; + + Ok(num_touched == 1) } async fn add_media_content(&self, request: &MediaRequest, content: Vec) -> Result<()> {