Skip to content

Commit

Permalink
feat(base,sdk): Client now uses EventCacheStoreLock.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hywan committed Nov 5, 2024
1 parent dcb18c3 commit f2ef774
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 23 deletions.
14 changes: 7 additions & 7 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "e2e-encryption")]
use std::sync::Arc;
use std::{
collections::{BTreeMap, BTreeSet},
fmt, iter,
ops::Deref,
sync::Arc,
};

use eyeball::{SharedObservable, Subscriber};
Expand Down Expand Up @@ -71,7 +72,7 @@ use crate::RoomMemberships;
use crate::{
deserialized_responses::{RawAnySyncOrStrippedTimelineEvent, SyncTimelineEvent},
error::{Error, Result},
event_cache_store::DynEventCacheStore,
event_cache_store::EventCacheStoreLock,
response_processors::AccountDataProcessor,
rooms::{
normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons},
Expand All @@ -95,7 +96,7 @@ pub struct BaseClient {
pub(crate) store: Store,

/// The store used by the event cache.
event_cache_store: Arc<DynEventCacheStore>,
event_cache_store: EventCacheStoreLock,

/// The store used for encryption.
///
Expand All @@ -114,8 +115,7 @@ pub struct BaseClient {
pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,

/// A sender that is used to communicate changes to room information. Each
/// event contains the room and a boolean whether this event should
/// trigger a room list update.
/// tick contains the room ID and the reasons that have generated this tick.
pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,

/// The strategy to use for picking recipient devices, when sending an
Expand Down Expand Up @@ -255,8 +255,8 @@ impl BaseClient {
}

/// Get a reference to the event cache store.
pub fn event_cache_store(&self) -> &DynEventCacheStore {
self.event_cache_store.as_ref()
pub fn event_cache_store(&self) -> &EventCacheStoreLock {
&self.event_cache_store
}

/// Is the client logged in.
Expand Down
1 change: 1 addition & 0 deletions crates/matrix-sdk-base/src/event_cache_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use self::{
};

/// The high-level public type to represent an `EventCacheStore` lock.
#[derive(Clone)]
pub struct EventCacheStoreLock {
/// The inner cross process lock that is used to lock the `EventCacheStore`.
cross_process_lock: CrossProcessStoreLock<LockableEventCacheStore>,
Expand Down
19 changes: 13 additions & 6 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use tokio::sync::{broadcast, Mutex, RwLock};
use tracing::warn;

use crate::{
event_cache_store::{DynEventCacheStore, IntoEventCacheStore},
event_cache_store,
rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
};
Expand Down Expand Up @@ -489,7 +489,7 @@ pub struct StoreConfig {
#[cfg(feature = "e2e-encryption")]
pub(crate) crypto_store: Arc<DynCryptoStore>,
pub(crate) state_store: Arc<DynStateStore>,
pub(crate) event_cache_store: Arc<DynEventCacheStore>,
pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
}

#[cfg(not(tarpaulin_include))]
Expand All @@ -507,8 +507,11 @@ impl StoreConfig {
#[cfg(feature = "e2e-encryption")]
crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
state_store: Arc::new(MemoryStore::new()),
event_cache_store: crate::event_cache_store::MemoryStore::new()
.into_event_cache_store(),
event_cache_store: event_cache_store::EventCacheStoreLock::new(
event_cache_store::MemoryStore::new(),
"default-key".to_owned(),
"matrix-sdk-base".to_owned(),
),
}
}

Expand All @@ -528,8 +531,12 @@ impl StoreConfig {
}

/// Set a custom implementation of an `EventCacheStore`.
pub fn event_cache_store(mut self, event_cache_store: impl IntoEventCacheStore) -> Self {
self.event_cache_store = event_cache_store.into_event_cache_store();
pub fn event_cache_store<S>(mut self, event_cache_store: S, key: String, holder: String) -> Self
where
S: event_cache_store::IntoEventCacheStore,
{
self.event_cache_store =
event_cache_store::EventCacheStoreLock::new(event_cache_store, key, holder);
self
}
}
Expand Down
12 changes: 11 additions & 1 deletion crates/matrix-sdk/src/client/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl ClientBuilder {
path: path.as_ref().to_owned(),
cache_path: None,
passphrase: passphrase.map(ToOwned::to_owned),
event_cache_store_lock_holder: "matrix-sdk".to_owned(),
};
self
}
Expand All @@ -225,6 +226,7 @@ impl ClientBuilder {
path: path.as_ref().to_owned(),
cache_path: Some(cache_path.as_ref().to_owned()),
passphrase: passphrase.map(ToOwned::to_owned),
event_cache_store_lock_holder: "matrix-sdk".to_owned(),
};
self
}
Expand Down Expand Up @@ -551,7 +553,12 @@ async fn build_store_config(
#[allow(clippy::infallible_destructuring_match)]
let store_config = match builder_config {
#[cfg(feature = "sqlite")]
BuilderStoreConfig::Sqlite { path, cache_path, passphrase } => {
BuilderStoreConfig::Sqlite {
path,
cache_path,
passphrase,
event_cache_store_lock_holder,
} => {
let store_config = StoreConfig::new()
.state_store(
matrix_sdk_sqlite::SqliteStateStore::open(&path, passphrase.as_deref()).await?,
Expand All @@ -562,6 +569,8 @@ async fn build_store_config(
passphrase.as_deref(),
)
.await?,
"default-key".to_owned(),
event_cache_store_lock_holder,
);

#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -658,6 +667,7 @@ enum BuilderStoreConfig {
path: std::path::PathBuf,
cache_path: Option<std::path::PathBuf>,
passphrase: Option<String>,
event_cache_store_lock_holder: String,
},
#[cfg(feature = "indexeddb")]
IndexedDb {
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use imbl::Vector;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_base::crypto::store::LockableCryptoStore;
use matrix_sdk_base::{
event_cache_store::DynEventCacheStore,
event_cache_store::EventCacheStoreLock,
store::{DynStateStore, ServerCapabilities},
sync::{Notification, RoomUpdates},
BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
Expand Down Expand Up @@ -590,7 +590,7 @@ impl Client {
}

/// Get a reference to the event cache store.
pub(crate) fn event_cache_store(&self) -> &DynEventCacheStore {
pub(crate) fn event_cache_store(&self) -> &EventCacheStoreLock {
self.base_client().event_cache_store()
}

Expand Down
13 changes: 9 additions & 4 deletions crates/matrix-sdk/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl Media {
// Read from the cache.
if use_cache {
if let Some(content) =
self.client.event_cache_store().get_media_content(request).await?
self.client.event_cache_store().lock().await?.get_media_content(request).await?
{
return Ok(content);
}
Expand Down Expand Up @@ -477,7 +477,12 @@ impl Media {
};

if use_cache {
self.client.event_cache_store().add_media_content(request, content.clone()).await?;
self.client
.event_cache_store()
.lock()
.await?
.add_media_content(request, content.clone())
.await?;
}

Ok(content)
Expand All @@ -489,7 +494,7 @@ impl Media {
///
/// * `request` - The `MediaRequest` of the content.
pub async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
Ok(self.client.event_cache_store().remove_media_content(request).await?)
Ok(self.client.event_cache_store().lock().await?.remove_media_content(request).await?)
}

/// Delete all the media content corresponding to the given
Expand All @@ -499,7 +504,7 @@ impl Media {
///
/// * `uri` - The `MxcUri` of the files.
pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
Ok(self.client.event_cache_store().remove_media_content_for_uri(uri).await?)
Ok(self.client.event_cache_store().lock().await?.remove_media_content_for_uri(uri).await?)
}

/// Get the file of the given media event content.
Expand Down
7 changes: 4 additions & 3 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1986,14 +1986,15 @@ impl Room {
.await?;

if store_in_cache {
let cache_store = self.client.event_cache_store();
let cache_store_lock_guard = self.client.event_cache_store().lock().await?;

// A failure to cache shouldn't prevent the whole upload from finishing
// properly, so only log errors during caching.

debug!("caching the media");
let request = MediaRequest { source: media_source.clone(), format: MediaFormat::File };
if let Err(err) = cache_store.add_media_content(&request, data).await {

if let Err(err) = cache_store_lock_guard.add_media_content(&request, data).await {
warn!("unable to cache the media after uploading it: {err}");
}

Expand All @@ -2016,7 +2017,7 @@ impl Room {
}),
};

if let Err(err) = cache_store.add_media_content(&request, data).await {
if let Err(err) = cache_store_lock_guard.add_media_content(&request, data).await {
warn!("unable to cache the media after uploading it: {err}");
}
}
Expand Down

0 comments on commit f2ef774

Please sign in to comment.