Skip to content

Commit

Permalink
chore(send queue): adapt to new locks around the event cache store 😎
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Nov 6, 2024
1 parent 77ee02f commit 8d07f36
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 57 deletions.
19 changes: 15 additions & 4 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ use matrix_sdk_base::{
FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
SentMediaInfo, SentRequestKey, SerializableEventContent,
},
store_locks::LockStoreError,
RoomState, StoreError,
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
Expand Down Expand Up @@ -693,10 +694,16 @@ impl RoomSendQueue {
})
})?;

let data =
room.client().event_cache_store().get_media_content(&cache_key).await?.ok_or(
crate::Error::SendQueueWedgeError(QueueWedgeError::MissingMediaContent),
)?;
let data = room
.client()
.event_cache_store()
.lock()
.await?
.get_media_content(&cache_key)
.await?
.ok_or(crate::Error::SendQueueWedgeError(
QueueWedgeError::MissingMediaContent,
))?;

#[cfg(feature = "e2e-encryption")]
let media_source = if room.is_encrypted().await? {
Expand Down Expand Up @@ -1676,6 +1683,10 @@ pub enum RoomSendQueueStorageError {
#[error(transparent)]
EventCacheStoreError(#[from] EventCacheStoreError),

/// Error caused when attempting to get a handle on the event cache store.
#[error(transparent)]
LockError(#[from] LockStoreError),

/// Error caused when (de)serializing into/from json.
#[error(transparent)]
JsonSerialization(#[from] serde_json::Error),
Expand Down
116 changes: 63 additions & 53 deletions crates/matrix-sdk/src/send_queue/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,60 +160,66 @@ impl RoomSendQueue {
Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
debug!(filename, %content_type, %upload_file_txn, "sending an attachment");

// Cache the file itself in the cache store.
let file_media_request = make_local_file_media_request(&upload_file_txn);
room.client()
.event_cache_store()
.add_media_content(&file_media_request, data.clone())
.await
.map_err(|err| RoomSendQueueError::StorageError(err.into()))?;

// Process the thumbnail, if it's been provided.
let (upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info) = if let Some(
thumbnail,
) =
config.thumbnail.take()
{
// Normalize information to retrieve the thumbnail in the cache store.
let info = thumbnail.info.as_ref();
let height = info.and_then(|info| info.height).unwrap_or_else(|| {
trace!("thumbnail height is unknown, using 0 for the cache entry");
uint!(0)
});
let width = info.and_then(|info| info.width).unwrap_or_else(|| {
trace!("thumbnail width is unknown, using 0 for the cache entry");
uint!(0)
});

let txn = TransactionId::new();
trace!(upload_thumbnail_txn = %txn, thumbnail_size = ?(height, width), "attachment has a thumbnail");

// Cache thumbnail in the cache store.
let thumbnail_media_request = make_local_thumbnail_media_request(&txn, height, width);
room.client()
let (upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info) = {
let client = room.client();
let cache_store = client
.event_cache_store()
.add_media_content(&thumbnail_media_request, thumbnail.data.clone())
.lock()
.await
.map_err(|err| RoomSendQueueError::StorageError(err.into()))?;

// Create the information required for filling the thumbnail section of the
// media event.
let thumbnail_info =
Box::new(assign!(thumbnail.info.map(ThumbnailInfo::from).unwrap_or_default(), {
mimetype: Some(thumbnail.content_type.as_ref().to_owned())
}));

(
Some(txn.clone()),
Some((thumbnail_media_request.source.clone(), thumbnail_info)),
Some((
FinishUploadThumbnailInfo { txn, width, height },
thumbnail_media_request,
thumbnail.content_type,
)),
)
} else {
Default::default()
.map_err(RoomSendQueueStorageError::LockError)?;

// Cache the file itself in the cache store.
cache_store
.add_media_content(&file_media_request, data.clone())
.await
.map_err(RoomSendQueueStorageError::EventCacheStoreError)?;

// Process the thumbnail, if it's been provided.
if let Some(thumbnail) = config.thumbnail.take() {
// Normalize information to retrieve the thumbnail in the cache store.
let info = thumbnail.info.as_ref();
let height = info.and_then(|info| info.height).unwrap_or_else(|| {
trace!("thumbnail height is unknown, using 0 for the cache entry");
uint!(0)
});
let width = info.and_then(|info| info.width).unwrap_or_else(|| {
trace!("thumbnail width is unknown, using 0 for the cache entry");
uint!(0)
});

let txn = TransactionId::new();
trace!(upload_thumbnail_txn = %txn, thumbnail_size = ?(height, width), "attachment has a thumbnail");

// Cache thumbnail in the cache store.
let thumbnail_media_request =
make_local_thumbnail_media_request(&txn, height, width);
cache_store
.add_media_content(&thumbnail_media_request, thumbnail.data.clone())
.await
.map_err(RoomSendQueueStorageError::EventCacheStoreError)?;

// Create the information required for filling the thumbnail section of the
// media event.
let thumbnail_info = Box::new(
assign!(thumbnail.info.map(ThumbnailInfo::from).unwrap_or_default(), {
mimetype: Some(thumbnail.content_type.as_ref().to_owned())
}),
);

(
Some(txn.clone()),
Some((thumbnail_media_request.source.clone(), thumbnail_info)),
Some((
FinishUploadThumbnailInfo { txn, width, height },
thumbnail_media_request,
thumbnail.content_type,
)),
)
} else {
Default::default()
}
};

// Create the content for the media event.
Expand Down Expand Up @@ -296,8 +302,13 @@ impl QueueStorage {

trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");

client
let cache_store = client
.event_cache_store()
.lock()
.await
.map_err(RoomSendQueueStorageError::LockError)?;

cache_store
.replace_media_key(
&from_req,
&MediaRequestParameters {
Expand All @@ -320,8 +331,7 @@ impl QueueStorage {
// Reuse the same format for the cached thumbnail with the final MXC ID.
let new_format = from_req.format.clone();

client
.event_cache_store()
cache_store
.replace_media_key(
&from_req,
&MediaRequestParameters { source: new_source, format: new_format },
Expand Down

0 comments on commit 8d07f36

Please sign in to comment.