Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(send queue): use a specialized mutex for locking access to the state store and being_sent #4268

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 93 additions & 53 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ use std::{
str::FromStr as _,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock as SyncRwLock,
Arc, RwLock,
},
};

Expand Down Expand Up @@ -161,7 +161,7 @@ use ruma::{
serde::Raw,
OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
};
use tokio::sync::{broadcast, oneshot, Notify, RwLock};
use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
use tracing::{debug, error, info, instrument, trace, warn};

#[cfg(feature = "e2e-encryption")]
Expand Down Expand Up @@ -310,7 +310,7 @@ impl Client {

pub(super) struct SendQueueData {
/// Mapping of room to their unique send queue.
rooms: SyncRwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,

/// Is the whole mechanism enabled or disabled?
///
Expand Down Expand Up @@ -877,19 +877,56 @@ impl BeingSentInfo {
}
}

/// A specialized lock that guards both against the state store and the
/// [`Self::being_sent`] data.
#[derive(Clone)]
struct QueueStorage {
struct StoreLock {
/// Reference to the client, to get access to the underlying store.
client: WeakClient,

/// To which room is this storage related.
room_id: OwnedRoomId,

/// The one queued request that is being sent at the moment, along with
/// associated data that can be useful to act upon it.
///
/// It also serves as an internal lock on the storage backend.
being_sent: Arc<RwLock<Option<BeingSentInfo>>>,
/// Also used as the lock to access the state store.
being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
}

impl StoreLock {
/// Gets a hold of the locked store and [`Self::being_sent`] pair.
async fn lock(&self) -> StoreLockGuard {
StoreLockGuard {
client: self.client.clone(),
being_sent: self.being_sent.clone().lock_owned().await,
}
}
}

/// A lock guard obtained through locking with [`StoreLock`].
/// `being_sent` data.
struct StoreLockGuard {
/// Reference to the client, to get access to the underlying store.
client: WeakClient,

/// The one queued request that is being sent at the moment, along with
/// associated data that can be useful to act upon it.
being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
}

impl StoreLockGuard {
/// Get a client from the locked state, useful to get a handle on a store.
fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
}
}

#[derive(Clone)]
struct QueueStorage {
/// A lock to make sure the state store is only accessed once at a time, to
/// make some store operations atomic.
store: StoreLock,

/// To which room is this storage related.
room_id: OwnedRoomId,
}

impl QueueStorage {
Expand All @@ -901,12 +938,7 @@ impl QueueStorage {

/// Create a new queue for queuing requests to be sent later.
fn new(client: WeakClient, room: OwnedRoomId) -> Self {
Self { room_id: room, being_sent: Default::default(), client }
}

/// Small helper to get a strong Client from the weak one.
fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
}

/// Push a new event to be sent in the queue, with a default priority of 0.
Expand All @@ -918,7 +950,10 @@ impl QueueStorage {
) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
let transaction_id = TransactionId::new();

self.client()?
self.store
.lock()
.await
.client()?
.store()
.save_send_queue_request(
&self.room_id,
Expand All @@ -939,11 +974,9 @@ impl QueueStorage {
&self,
) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
{
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;

let mut guard = self.store.lock().await;
let queued_requests =
self.client()?.store().load_send_queue_requests(&self.room_id).await?;
guard.client()?.store().load_send_queue_requests(&self.room_id).await?;

if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
let (cancel_upload_tx, cancel_upload_rx) =
Expand All @@ -954,7 +987,7 @@ impl QueueStorage {
Default::default()
};

let prev = being_sent.replace(BeingSentInfo {
let prev = guard.being_sent.replace(BeingSentInfo {
transaction_id: request.transaction_id.clone(),
cancel_upload: cancel_upload_tx,
});
Expand All @@ -976,7 +1009,7 @@ impl QueueStorage {
/// with the given transaction id as not being sent anymore, so it can
/// be removed from the queue later.
async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
let was_being_sent = self.being_sent.write().await.take();
let was_being_sent = self.store.lock().await.being_sent.take();

let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
if prev_txn != Some(transaction_id) {
Expand All @@ -993,15 +1026,15 @@ impl QueueStorage {
reason: QueueWedgeError,
) -> Result<(), RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;
let was_being_sent = being_sent.take();
let mut guard = self.store.lock().await;
let was_being_sent = guard.being_sent.take();

let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
if prev_txn != Some(transaction_id) {
error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
}

Ok(self
Ok(guard
.client()?
.store()
.update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
Expand All @@ -1015,6 +1048,9 @@ impl QueueStorage {
transaction_id: &TransactionId,
) -> Result<(), RoomSendQueueStorageError> {
Ok(self
.store
.lock()
.await
.client()?
.store()
.update_send_queue_request_status(&self.room_id, transaction_id, None)
Expand All @@ -1029,15 +1065,15 @@ impl QueueStorage {
parent_key: SentRequestKey,
) -> Result<(), RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;
let was_being_sent = being_sent.take();
let mut guard = self.store.lock().await;
let was_being_sent = guard.being_sent.take();

let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
if prev_txn != Some(transaction_id) {
error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
}

let client = self.client()?;
let client = guard.client()?;
let store = client.store();

// Update all dependent requests.
Expand All @@ -1062,12 +1098,14 @@ impl QueueStorage {
&self,
transaction_id: &TransactionId,
) -> Result<bool, RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let being_sent = self.being_sent.read().await;
let guard = self.store.lock().await;

if being_sent.as_ref().map(|info| info.transaction_id.as_ref()) == Some(transaction_id) {
if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
== Some(transaction_id)
{
// Save the intent to redact the event.
self.client()?
guard
.client()?
.store()
.save_dependent_queued_request(
&self.room_id,
Expand All @@ -1080,8 +1118,11 @@ impl QueueStorage {
return Ok(true);
}

let removed =
self.client()?.store().remove_send_queue_request(&self.room_id, transaction_id).await?;
let removed = guard
.client()?
.store()
.remove_send_queue_request(&self.room_id, transaction_id)
.await?;

Ok(removed)
}
Expand All @@ -1097,12 +1138,14 @@ impl QueueStorage {
transaction_id: &TransactionId,
serializable: SerializableEventContent,
) -> Result<bool, RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let being_sent = self.being_sent.read().await;
let guard = self.store.lock().await;

if being_sent.as_ref().map(|info| info.transaction_id.as_ref()) == Some(transaction_id) {
if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
== Some(transaction_id)
{
// Save the intent to edit the associated event.
self.client()?
guard
.client()?
.store()
.save_dependent_queued_request(
&self.room_id,
Expand All @@ -1115,7 +1158,7 @@ impl QueueStorage {
return Ok(true);
}

let edited = self
let edited = guard
.client()?
.store()
.update_send_queue_request(&self.room_id, transaction_id, serializable.into())
Expand All @@ -1136,12 +1179,8 @@ impl QueueStorage {
file_media_request: MediaRequestParameters,
thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
) -> Result<(), RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
// TODO refactor to make the relationship between being_sent and the store more
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// obvious.
let _guard = self.being_sent.read().await;

let client = self.client()?;
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.store();

let thumbnail_info =
Expand Down Expand Up @@ -1223,7 +1262,8 @@ impl QueueStorage {
transaction_id: &TransactionId,
key: String,
) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
let client = self.client()?;
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.store();

let requests = store.load_send_queue_requests(&self.room_id).await?;
Expand Down Expand Up @@ -1253,7 +1293,8 @@ impl QueueStorage {
&self,
room: &RoomSendQueue,
) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
let client = self.client()?;
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.store();

let local_requests =
Expand Down Expand Up @@ -1563,10 +1604,9 @@ impl QueueStorage {
&self,
new_updates: &mut Vec<RoomSendQueueUpdate>,
) -> Result<(), RoomSendQueueError> {
// Keep the lock until we're done touching the storage.
let _being_sent = self.being_sent.read().await;
let guard = self.store.lock().await;

let client = self.client()?;
let client = guard.client()?;
let store = client.store();

let dependent_requests = store
Expand Down Expand Up @@ -1637,10 +1677,10 @@ impl QueueStorage {
&self,
dependent_event_id: &ChildTransactionId,
) -> Result<bool, RoomSendQueueStorageError> {
// Keep the lock until we're done touching the storage.
let _being_sent = self.being_sent.read().await;

Ok(self
.store
.lock()
.await
.client()?
.store()
.remove_dependent_queued_request(&self.room_id, dependent_event_id)
Expand Down
12 changes: 6 additions & 6 deletions crates/matrix-sdk/src/send_queue/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,10 @@ impl QueueStorage {
event_txn: &TransactionId,
handles: &MediaHandles,
) -> Result<bool, RoomSendQueueStorageError> {
let client = self.client()?;
let mut guard = self.store.lock().await;
let client = guard.client()?;

// Keep the lock until we're done touching the storage.
let mut being_sent = self.being_sent.write().await;
debug!("trying to abort an upload");

let store = client.store();
Expand All @@ -450,10 +450,10 @@ impl QueueStorage {
trace!("could remove thumbnail request, removing 2 dependent requests now");

// 1. Try to abort sending using the being_sent info, in case it was active.
if let Some(info) = being_sent.as_ref() {
if let Some(info) = guard.being_sent.as_ref() {
if info.transaction_id == *thumbnail_txn {
// SAFETY: we knew it was Some(), two lines above.
let info = being_sent.take().unwrap();
let info = guard.being_sent.take().unwrap();
if info.cancel_upload() {
trace!("aborted ongoing thumbnail upload");
}
Expand Down Expand Up @@ -492,10 +492,10 @@ impl QueueStorage {
trace!("could remove file upload request, removing 1 dependent request");

// 1. Try to abort sending using the being_sent info, in case it was active.
if let Some(info) = being_sent.as_ref() {
if let Some(info) = guard.being_sent.as_ref() {
if info.transaction_id == handles.upload_file_txn {
// SAFETY: we knew it was Some(), two lines above.
let info = being_sent.take().unwrap();
let info = guard.being_sent.take().unwrap();
if info.cancel_upload() {
trace!("aborted ongoing file upload");
}
Expand Down
Loading