Skip to content

Commit

Permalink
feat(base,sdk): Client now uses EventCacheStoreLock.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hywan committed Nov 6, 2024
1 parent 8b85ff2 commit 7b3eb0b
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 27 deletions.
15 changes: 7 additions & 8 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "e2e-encryption")]
use std::sync::Arc;
use std::{
collections::{BTreeMap, BTreeSet},
fmt, iter,
ops::Deref,
sync::Arc,
};

use eyeball::{SharedObservable, Subscriber};
Expand Down Expand Up @@ -71,7 +72,7 @@ use crate::RoomMemberships;
use crate::{
deserialized_responses::{RawAnySyncOrStrippedTimelineEvent, SyncTimelineEvent},
error::{Error, Result},
event_cache_store::DynEventCacheStore,
event_cache_store::EventCacheStoreLock,
response_processors::AccountDataProcessor,
rooms::{
normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons},
Expand All @@ -95,7 +96,7 @@ pub struct BaseClient {
pub(crate) store: Store,

/// The store used by the event cache.
event_cache_store: Arc<DynEventCacheStore>,
event_cache_store: EventCacheStoreLock,

/// The store used for encryption.
///
Expand All @@ -114,8 +115,7 @@ pub struct BaseClient {
pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,

/// A sender that is used to communicate changes to room information. Each
/// event contains the room and a boolean whether this event should
/// trigger a room list update.
/// tick contains the room ID and the reasons that have generated this tick.
pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,

/// The strategy to use for picking recipient devices, when sending an
Expand Down Expand Up @@ -249,14 +249,13 @@ impl BaseClient {
}

/// Get a reference to the store.
#[allow(unknown_lints, clippy::explicit_auto_deref)]
pub fn store(&self) -> &DynStateStore {
self.store.deref()
}

/// Get a reference to the event cache store.
pub fn event_cache_store(&self) -> &DynEventCacheStore {
self.event_cache_store.as_ref()
pub fn event_cache_store(&self) -> &EventCacheStoreLock {
&self.event_cache_store
}

/// Is the client logged in.
Expand Down
3 changes: 3 additions & 0 deletions crates/matrix-sdk-base/src/event_cache_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use self::{
};

/// The high-level public type to represent an `EventCacheStore` lock.
#[derive(Clone)]
pub struct EventCacheStoreLock {
/// The inner cross process lock that is used to lock the `EventCacheStore`.
cross_process_lock: CrossProcessStoreLock<LockableEventCacheStore>,
Expand All @@ -50,6 +51,7 @@ pub struct EventCacheStoreLock {
store: Arc<DynEventCacheStore>,
}

#[cfg(not(tarpaulin_include))]
impl fmt::Debug for EventCacheStoreLock {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
Expand Down Expand Up @@ -94,6 +96,7 @@ pub struct EventCacheStoreLockGuard<'a> {
store: &'a DynEventCacheStore,
}

#[cfg(not(tarpaulin_include))]
impl<'a> fmt::Debug for EventCacheStoreLockGuard<'a> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
Expand Down
19 changes: 13 additions & 6 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::warn;

use crate::{
event_cache_store::{DynEventCacheStore, IntoEventCacheStore},
event_cache_store,
rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};
Expand Down Expand Up @@ -489,7 +489,7 @@ pub struct StoreConfig {
#[cfg(feature = "e2e-encryption")]
pub(crate) crypto_store: Arc<DynCryptoStore>,
pub(crate) state_store: Arc<DynStateStore>,
pub(crate) event_cache_store: Arc<DynEventCacheStore>,
pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
}

#[cfg(not(tarpaulin_include))]
Expand All @@ -507,8 +507,11 @@ impl StoreConfig {
#[cfg(feature = "e2e-encryption")]
crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
state_store: Arc::new(MemoryStore::new()),
event_cache_store: crate::event_cache_store::MemoryStore::new()
.into_event_cache_store(),
event_cache_store: event_cache_store::EventCacheStoreLock::new(
event_cache_store::MemoryStore::new(),
"default-key".to_owned(),
"matrix-sdk-base".to_owned(),
),
}
}

Expand All @@ -528,8 +531,12 @@ impl StoreConfig {
}

/// Set a custom implementation of an `EventCacheStore`.
pub fn event_cache_store(mut self, event_cache_store: impl IntoEventCacheStore) -> Self {
self.event_cache_store = event_cache_store.into_event_cache_store();
pub fn event_cache_store<S>(mut self, event_cache_store: S, key: String, holder: String) -> Self
where
S: event_cache_store::IntoEventCacheStore,
{
self.event_cache_store =
event_cache_store::EventCacheStoreLock::new(event_cache_store, key, holder);
self
}
}
Expand Down
31 changes: 27 additions & 4 deletions crates/matrix-sdk/src/client/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl ClientBuilder {
path: path.as_ref().to_owned(),
cache_path: None,
passphrase: passphrase.map(ToOwned::to_owned),
event_cache_store_lock_holder: "matrix-sdk".to_owned(),
};
self
}
Expand All @@ -225,6 +226,7 @@ impl ClientBuilder {
path: path.as_ref().to_owned(),
cache_path: Some(cache_path.as_ref().to_owned()),
passphrase: passphrase.map(ToOwned::to_owned),
event_cache_store_lock_holder: "matrix-sdk".to_owned(),
};
self
}
Expand All @@ -235,6 +237,7 @@ impl ClientBuilder {
self.store_config = BuilderStoreConfig::IndexedDb {
name: name.to_owned(),
passphrase: passphrase.map(ToOwned::to_owned),
event_cache_store_lock_holder: "matrix-sdk".to_owned(),
};
self
}
Expand Down Expand Up @@ -551,7 +554,12 @@ async fn build_store_config(
#[allow(clippy::infallible_destructuring_match)]
let store_config = match builder_config {
#[cfg(feature = "sqlite")]
BuilderStoreConfig::Sqlite { path, cache_path, passphrase } => {
BuilderStoreConfig::Sqlite {
path,
cache_path,
passphrase,
event_cache_store_lock_holder,
} => {
let store_config = StoreConfig::new()
.state_store(
matrix_sdk_sqlite::SqliteStateStore::open(&path, passphrase.as_deref()).await?,
Expand All @@ -562,6 +570,8 @@ async fn build_store_config(
passphrase.as_deref(),
)
.await?,
"default-key".to_owned(),
event_cache_store_lock_holder,
);

#[cfg(feature = "e2e-encryption")]
Expand All @@ -573,8 +583,13 @@ async fn build_store_config(
}

#[cfg(feature = "indexeddb")]
BuilderStoreConfig::IndexedDb { name, passphrase } => {
build_indexeddb_store_config(&name, passphrase.as_deref()).await?
BuilderStoreConfig::IndexedDb { name, passphrase, event_cache_store_lock_holder } => {
build_indexeddb_store_config(
&name,
passphrase.as_deref(),
event_cache_store_lock_holder,
)
.await?
}

BuilderStoreConfig::Custom(config) => config,
Expand All @@ -588,6 +603,7 @@ async fn build_store_config(
async fn build_indexeddb_store_config(
name: &str,
passphrase: Option<&str>,
event_cache_store_lock_holder: String,
) -> Result<StoreConfig, ClientBuildError> {
#[cfg(feature = "e2e-encryption")]
let store_config = {
Expand All @@ -604,7 +620,11 @@ async fn build_indexeddb_store_config(

let store_config = {
tracing::warn!("The IndexedDB backend does not implement an event cache store, falling back to the in-memory event cache store…");
store_config.event_cache_store(matrix_sdk_base::event_cache_store::MemoryStore::new())
store_config.event_cache_store(
matrix_sdk_base::event_cache_store::MemoryStore::new(),
"default-key".to_owned(),
event_cache_store_lock_holder,
)
};

Ok(store_config)
Expand All @@ -614,6 +634,7 @@ async fn build_indexeddb_store_config(
async fn build_indexeddb_store_config(
_name: &str,
_passphrase: Option<&str>,
_event_cache_store_lock_holder: String,
) -> Result<StoreConfig, ClientBuildError> {
panic!("the IndexedDB is only available on the 'wasm32' arch")
}
Expand Down Expand Up @@ -658,11 +679,13 @@ enum BuilderStoreConfig {
path: std::path::PathBuf,
cache_path: Option<std::path::PathBuf>,
passphrase: Option<String>,
event_cache_store_lock_holder: String,
},
#[cfg(feature = "indexeddb")]
IndexedDb {
name: String,
passphrase: Option<String>,
event_cache_store_lock_holder: String,
},
Custom(StoreConfig),
}
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use imbl::Vector;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_base::crypto::store::LockableCryptoStore;
use matrix_sdk_base::{
event_cache_store::DynEventCacheStore,
event_cache_store::EventCacheStoreLock,
store::{DynStateStore, ServerCapabilities},
sync::{Notification, RoomUpdates},
BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
Expand Down Expand Up @@ -590,7 +590,7 @@ impl Client {
}

/// Get a reference to the event cache store.
pub(crate) fn event_cache_store(&self) -> &DynEventCacheStore {
pub(crate) fn event_cache_store(&self) -> &EventCacheStoreLock {
self.base_client().event_cache_store()
}

Expand Down
13 changes: 9 additions & 4 deletions crates/matrix-sdk/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl Media {
// Read from the cache.
if use_cache {
if let Some(content) =
self.client.event_cache_store().get_media_content(request).await?
self.client.event_cache_store().lock().await?.get_media_content(request).await?
{
return Ok(content);
}
Expand Down Expand Up @@ -477,7 +477,12 @@ impl Media {
};

if use_cache {
self.client.event_cache_store().add_media_content(request, content.clone()).await?;
self.client
.event_cache_store()
.lock()
.await?
.add_media_content(request, content.clone())
.await?;
}

Ok(content)
Expand All @@ -489,7 +494,7 @@ impl Media {
///
/// * `request` - The `MediaRequest` of the content.
pub async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
Ok(self.client.event_cache_store().remove_media_content(request).await?)
Ok(self.client.event_cache_store().lock().await?.remove_media_content(request).await?)
}

/// Delete all the media content corresponding to the given
Expand All @@ -499,7 +504,7 @@ impl Media {
///
/// * `uri` - The `MxcUri` of the files.
pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
Ok(self.client.event_cache_store().remove_media_content_for_uri(uri).await?)
Ok(self.client.event_cache_store().lock().await?.remove_media_content_for_uri(uri).await?)
}

/// Get the file of the given media event content.
Expand Down
7 changes: 4 additions & 3 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1988,14 +1988,15 @@ impl Room {
.await?;

if store_in_cache {
let cache_store = self.client.event_cache_store();
let cache_store_lock_guard = self.client.event_cache_store().lock().await?;

// A failure to cache shouldn't prevent the whole upload from finishing
// properly, so only log errors during caching.

debug!("caching the media");
let request = MediaRequest { source: media_source.clone(), format: MediaFormat::File };
if let Err(err) = cache_store.add_media_content(&request, data).await {

if let Err(err) = cache_store_lock_guard.add_media_content(&request, data).await {
warn!("unable to cache the media after uploading it: {err}");
}

Expand All @@ -2018,7 +2019,7 @@ impl Room {
}),
};

if let Err(err) = cache_store.add_media_content(&request, data).await {
if let Err(err) = cache_store_lock_guard.add_media_content(&request, data).await {
warn!("unable to cache the media after uploading it: {err}");
}
}
Expand Down

0 comments on commit 7b3eb0b

Please sign in to comment.