Skip to content

Commit

Permalink
feat(base): add a way to update a dependent send queue request
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Nov 18, 2024
1 parent 2b9a2a2 commit 01c7d19
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 0 deletions.
56 changes: 56 additions & 0 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub trait StateStoreIntegrationTests {
async fn test_send_queue_priority(&self);
/// Test operations related to send queue dependents.
async fn test_send_queue_dependents(&self);
/// Test an update to a send queue dependent request.
async fn test_update_send_queue_dependent(&self);
/// Test saving/restoring server capabilities.
async fn test_server_capabilities_saving(&self);
}
Expand Down Expand Up @@ -1522,6 +1524,54 @@ impl StateStoreIntegrationTests for DynStateStore {
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 2);
}

async fn test_update_send_queue_dependent(&self) {
let room_id = room_id!("!test_send_queue_dependents:localhost");

let txn = TransactionId::new();

// Save a dependent redaction for an event.
let child_txn = ChildTransactionId::new();

self.save_dependent_queued_request(
room_id,
&txn,
child_txn.clone(),
DependentQueuedRequestKind::RedactEvent,
)
.await
.unwrap();

// It worked.
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].parent_key.is_none());
assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);

// Make it a reaction, instead of a redaction.
self.update_dependent_queued_request(
room_id,
&child_txn,
DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
)
.await
.unwrap();

// It worked.
let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
assert_eq!(dependents.len(), 1);
assert_eq!(dependents[0].parent_transaction_id, txn);
assert_eq!(dependents[0].own_transaction_id, child_txn);
assert!(dependents[0].parent_key.is_none());
assert_matches!(
&dependents[0].kind,
DependentQueuedRequestKind::ReactEvent { key } => {
assert_eq!(key, "👍");
}
);
}
}

/// Macro building to allow your StateStore implementation to run the entire
Expand Down Expand Up @@ -1680,6 +1730,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_send_queue_dependents().await;
}

#[async_test]
async fn test_update_send_queue_dependent() {
let store = get_store().await.expect("creating store failed").into_state_store();
store.test_update_send_queue_dependent().await;
}
}
};
}
Expand Down
17 changes: 17 additions & 0 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,23 @@ impl StateStore for MemoryStore {
Ok(num_updated)
}

async fn update_dependent_queued_request(
&self,
room: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
let mut dependent_send_queue_events = self.dependent_send_queue_events.write().unwrap();
let dependents = dependent_send_queue_events.entry(room.to_owned()).or_default();
for d in dependents.iter_mut() {
if d.own_transaction_id == *own_transaction_id {
d.kind = new_content;
return Ok(true);
}
}
Ok(false)
}

async fn remove_dependent_queued_request(
&self,
room: &RoomId,
Expand Down
22 changes: 22 additions & 0 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,16 @@ pub trait StateStore: AsyncTraitDeps {
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error>;

/// Update a dependent send queue request with the new content.
///
/// Returns true if the request was found and could be updated.
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error>;

/// Remove a specific dependent send queue request by id.
///
/// Returns true if the dependent send queue request has been indeed
Expand Down Expand Up @@ -733,6 +743,18 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
}

async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
self.0
.update_dependent_queued_request(room_id, own_transaction_id, new_content)
.await
.map_err(Into::into)
}
}

/// Convenience functionality for state stores.
Expand Down
42 changes: 42 additions & 0 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,48 @@ impl_state_store!({
Ok(())
}

async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);

let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;

let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;

// We store an encoded vector of the dependent requests.
// Reload the previous vector for this room, or create an empty one.
let prev = obj.get(&encoded_key)?.await?;

let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)?;

// Modify the dependent request, if found.
let mut found = false;
for entry in prev.iter_mut() {
if entry.own_transaction_id == *own_transaction_id {
found = true;
entry.kind = new_content;
break;
}
}

if found {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
}

Ok(found)
}

async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
Expand Down
29 changes: 29 additions & 0 deletions crates/matrix-sdk-sqlite/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,35 @@ impl StateStore for SqliteStateStore {
.await
}

async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
let content = self.serialize_json(&new_content)?;

// See comment in `save_send_queue_event`.
let own_txn_id = own_transaction_id.to_string();

let num_updated = self
.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached(
r#"UPDATE dependent_send_queue_events
SET content = ?
WHERE own_transaction_id = ?
AND room_id = ?"#,
)?
.execute((content, own_txn_id, room_id))
})
.await?;

Ok(num_updated == 1)
}

async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
Expand Down

0 comments on commit 01c7d19

Please sign in to comment.