diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index e439965c416..28c4e90cf4b 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -85,6 +85,8 @@ pub trait StateStoreIntegrationTests { async fn test_display_names_saving(&self); /// Test operations with the send queue. async fn test_send_queue(&self); + /// Test priority of operations with the send queue. + async fn test_send_queue_priority(&self); /// Test operations related to send queue dependents. async fn test_send_queue_dependents(&self); /// Test saving/restoring server capabilities. @@ -1212,7 +1214,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap(); + self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap(); // Reading it will work. let pending = self.load_send_queue_requests(room_id).await.unwrap(); @@ -1236,7 +1238,7 @@ impl StateStoreIntegrationTests for DynStateStore { ) .unwrap(); - self.save_send_queue_request(room_id, txn, event.into()).await.unwrap(); + self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap(); } // Reading all the events should work. @@ -1334,7 +1336,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into()) .unwrap(); - self.save_send_queue_request(room_id2, txn.clone(), event.into()).await.unwrap(); + self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap(); } // Add and remove one event for room3. @@ -1344,7 +1346,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into()) .unwrap(); - self.save_send_queue_request(room_id3, txn.clone(), event.into()).await.unwrap(); + self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap(); self.remove_send_queue_request(room_id3, &txn).await.unwrap(); } @@ -1357,6 +1359,64 @@ impl StateStoreIntegrationTests for DynStateStore { assert!(outstanding_rooms.iter().any(|room| room == room_id2)); } + async fn test_send_queue_priority(&self) { + let room_id = room_id!("!test_send_queue:localhost"); + + // No queued event in store at first. + let events = self.load_send_queue_requests(room_id).await.unwrap(); + assert!(events.is_empty()); + + // Saving one request should work. + let low0_txn = TransactionId::new(); + let ev0 = + SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into()) + .unwrap(); + self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap(); + + // Saving one request with higher priority should work. + let high_txn = TransactionId::new(); + let ev1 = + SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into()) + .unwrap(); + self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap(); + + // Saving another request with the low priority should work. + let low1_txn = TransactionId::new(); + let ev2 = + SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into()) + .unwrap(); + self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap(); + + // The requests should be ordered from higher priority to lower, and when equal, + // should use the insertion order instead. + let pending = self.load_send_queue_requests(room_id).await.unwrap(); + + assert_eq!(pending.len(), 3); + { + assert_eq!(pending[0].transaction_id, high_txn); + + let deserialized = pending[0].as_event().unwrap().deserialize().unwrap(); + assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); + assert_eq!(content.body(), "high"); + } + + { + assert_eq!(pending[1].transaction_id, low0_txn); + + let deserialized = pending[1].as_event().unwrap().deserialize().unwrap(); + assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); + assert_eq!(content.body(), "low0"); + } + + { + assert_eq!(pending[2].transaction_id, low1_txn); + + let deserialized = pending[2].as_event().unwrap().deserialize().unwrap(); + assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized); + assert_eq!(content.body(), "low1"); + } + } + async fn test_send_queue_dependents(&self) { let room_id = room_id!("!test_send_queue_dependents:localhost"); @@ -1365,7 +1425,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap(); + self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap(); // No dependents, to start with. assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty()); @@ -1427,7 +1487,7 @@ impl StateStoreIntegrationTests for DynStateStore { let event1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into()) .unwrap(); - self.save_send_queue_request(room_id, txn1.clone(), event1.into()).await.unwrap(); + self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap(); self.save_dependent_queued_request( room_id, @@ -1609,6 +1669,12 @@ macro_rules! statestore_integration_tests { store.test_send_queue().await; } + #[async_test] + async fn test_send_queue_priority() { + let store = get_store().await.expect("creating store failed").into_state_store(); + store.test_send_queue_priority().await; + } + #[async_test] async fn test_send_queue_dependents() { let store = get_store().await.expect("creating store failed").into_state_store(); diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 2ba91d6f200..60701a3e5f6 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -807,13 +807,14 @@ impl StateStore for MemoryStore { room_id: &RoomId, transaction_id: OwnedTransactionId, kind: QueuedRequestKind, + priority: usize, ) -> Result<(), Self::Error> { self.send_queue_events .write() .unwrap() .entry(room_id.to_owned()) .or_default() - .push(QueuedRequest { kind, transaction_id, error: None }); + .push(QueuedRequest { kind, transaction_id, error: None, priority }); Ok(()) } @@ -867,7 +868,11 @@ impl StateStore for MemoryStore { &self, room_id: &RoomId, ) -> Result, Self::Error> { - Ok(self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone()) + let mut ret = + self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone(); + // Inverted order of priority, use stable sort to keep insertion order. + ret.sort_by(|lhs, rhs| rhs.priority.cmp(&lhs.priority)); + Ok(ret) } async fn update_send_queue_request_status( diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index f5f0ccaa7e0..4d3b4a76bb8 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -125,6 +125,12 @@ pub struct QueuedRequest { /// /// `None` if the request is in the queue, waiting to be sent. pub error: Option, + + /// At which priority should this be handled? + /// + /// The bigger the value, the higher the priority at which this request + /// should be handled. + pub priority: usize, } impl QueuedRequest { diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 4dabd9eefac..f60189dd3d3 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -358,6 +358,7 @@ pub trait StateStore: AsyncTraitDeps { room_id: &RoomId, transaction_id: OwnedTransactionId, request: QueuedRequestKind, + priority: usize, ) -> Result<(), Self::Error>; /// Updates a send queue request with the given content, and resets its @@ -390,6 +391,10 @@ pub trait StateStore: AsyncTraitDeps { ) -> Result; /// Loads all the send queue requests for the given room. + /// + /// The resulting vector of queued requests should be ordered from higher + /// priority to lower priority, and respect the insertion order when + /// priorities are equal. async fn load_send_queue_requests( &self, room_id: &RoomId, @@ -641,8 +646,12 @@ impl StateStore for EraseStateStoreError { room_id: &RoomId, transaction_id: OwnedTransactionId, content: QueuedRequestKind, + priority: usize, ) -> Result<(), Self::Error> { - self.0.save_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into) + self.0 + .save_send_queue_request(room_id, transaction_id, content, priority) + .await + .map_err(Into::into) } async fn update_send_queue_request( diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 39156c70a25..372a179c9f9 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -437,6 +437,8 @@ struct PersistedQueuedRequest { pub error: Option, + priority: Option, + // Migrated fields: keep these private, they're not used anymore elsewhere in the code base. /// Deprecated (from old format), now replaced with error field. is_wedged: Option, @@ -459,7 +461,10 @@ impl PersistedQueuedRequest { _ => self.error, }; - Some(QueuedRequest { kind, transaction_id: self.transaction_id, error }) + // By default, events without a priority have a priority of 0. + let priority = self.priority.unwrap_or(0); + + Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority }) } } @@ -1329,6 +1334,7 @@ impl_state_store!({ room_id: &RoomId, transaction_id: OwnedTransactionId, kind: QueuedRequestKind, + priority: usize, ) -> Result<()> { let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id); @@ -1357,6 +1363,7 @@ impl_state_store!({ error: None, is_wedged: None, event: None, + priority: Some(priority), }); // Save the new vector into db. @@ -1460,11 +1467,14 @@ impl_state_store!({ .get(&encoded_key)? .await?; - let prev = prev.map_or_else( + let mut prev = prev.map_or_else( || Ok(Vec::new()), |val| self.deserialize_value::>(&val), )?; + // Inverted stable ordering on priority. + prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0))); + Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect()) } diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/009_send_queue_priority.sql b/crates/matrix-sdk-sqlite/migrations/state_store/009_send_queue_priority.sql new file mode 100644 index 00000000000..48913b5381c --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/009_send_queue_priority.sql @@ -0,0 +1,3 @@ +-- Add a priority column, defaulting to 0 for all events in the send queue. +ALTER TABLE "send_queue_events" + ADD COLUMN "priority" INTEGER NOT NULL DEFAULT 0; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 4f453577651..ab152289059 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -69,7 +69,7 @@ mod keys { /// This is used to figure whether the sqlite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`SqliteStateStore::run_migrations`] function.. -const DATABASE_VERSION: u8 = 9; +const DATABASE_VERSION: u8 = 10; /// A sqlite based cryptostore. #[derive(Clone)] @@ -307,6 +307,17 @@ impl SqliteStateStore { .await?; } + if from < 10 && to >= 10 { + conn.with_transaction(move |txn| { + // Run the migration. + txn.execute_batch(include_str!( + "../migrations/state_store/009_send_queue_priority.sql" + ))?; + txn.set_db_version(10) + }) + .await?; + } + Ok(()) } @@ -1685,6 +1696,7 @@ impl StateStore for SqliteStateStore { room_id: &RoomId, transaction_id: OwnedTransactionId, content: QueuedRequestKind, + priority: usize, ) -> Result<(), Self::Error> { let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id); let room_id_value = self.serialize_value(&room_id.to_owned())?; @@ -1699,7 +1711,7 @@ impl StateStore for SqliteStateStore { self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content) VALUES (?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content))?; + txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?; Ok(()) }) .await @@ -1761,14 +1773,14 @@ impl StateStore for SqliteStateStore { // Note: ROWID is always present and is an auto-incremented integer counter. We // want to maintain the insertion order, so we can sort using it. // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, Vec, Option>)> = self + let res: Vec<(String, Vec, Option>, usize)> = self .acquire() .await? .prepare( - "SELECT transaction_id, content, wedge_reason FROM send_queue_events WHERE room_id = ? ORDER BY ROWID", + "SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID", |mut stmt| { stmt.query((room_id,))? - .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))) + .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) .collect() }, ) @@ -1780,6 +1792,7 @@ impl StateStore for SqliteStateStore { transaction_id: entry.0.into(), kind: self.deserialize_json(&entry.1)?, error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?, + priority: entry.3, }); } diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index fc419b7b4a1..4dfba7918a0 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -837,6 +837,12 @@ struct QueueStorage { } impl QueueStorage { + /// Default priority for a queued request. + const LOW_PRIORITY: usize = 0; + + /// High priority for a queued request that must be handled before others. + const HIGH_PRIORITY: usize = 10; + /// Create a new queue for queuing requests to be sent later. fn new(client: WeakClient, room: OwnedRoomId) -> Self { Self { room_id: room, being_sent: Default::default(), client } @@ -847,7 +853,7 @@ impl QueueStorage { self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown) } - /// Push a new event to be sent in the queue. + /// Push a new event to be sent in the queue, with a default priority of 0. /// /// Returns the transaction id chosen to identify the request. async fn push( @@ -858,7 +864,12 @@ impl QueueStorage { self.client()? .store() - .save_send_queue_request(&self.room_id, transaction_id.clone(), request) + .save_send_queue_request( + &self.room_id, + transaction_id.clone(), + request, + Self::LOW_PRIORITY, + ) .await?; Ok(transaction_id) @@ -1058,6 +1069,7 @@ impl QueueStorage { thumbnail_source: None, // the thumbnail has no thumbnails :) related_to: send_event_txn.clone(), }, + Self::LOW_PRIORITY, ) .await?; @@ -1088,6 +1100,7 @@ impl QueueStorage { thumbnail_source: None, related_to: send_event_txn.clone(), }, + Self::LOW_PRIORITY, ) .await?; @@ -1311,6 +1324,7 @@ impl QueueStorage { &self.room_id, dependent_request.own_transaction_id.into(), serializable.into(), + Self::HIGH_PRIORITY, ) .await .map_err(RoomSendQueueStorageError::StateStoreError)?; @@ -1397,6 +1411,7 @@ impl QueueStorage { &self.room_id, dependent_request.own_transaction_id.into(), serializable.into(), + Self::HIGH_PRIORITY, ) .await .map_err(RoomSendQueueStorageError::StateStoreError)?; diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 16a3f7fb657..d1259c5e57f 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -350,7 +350,12 @@ impl QueueStorage { client .store() - .save_send_queue_request(&self.room_id, event_txn, new_content.into()) + .save_send_queue_request( + &self.room_id, + event_txn, + new_content.into(), + Self::HIGH_PRIORITY, + ) .await .map_err(RoomSendQueueStorageError::StateStoreError)?; @@ -392,7 +397,7 @@ impl QueueStorage { client .store() - .save_send_queue_request(&self.room_id, next_upload_txn, request) + .save_send_queue_request(&self.room_id, next_upload_txn, request, Self::HIGH_PRIORITY) .await .map_err(RoomSendQueueStorageError::StateStoreError)?; diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 476f79bcb58..8815a965971 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -900,13 +900,16 @@ async fn test_edit() { // Let the server process the responses. drop(lock_guard); - // Now the server will process the messages in order. + // The queue sends the first event, without the edit. assert_update!(watch => sent { txn = txn1, }); - assert_update!(watch => sent { txn = txn2, }); - // Let a bit of time to process the edit event sent to the server for txn1. + // The queue sends the edit; we can't check the transaction id because it's + // unknown. assert_update!(watch => sent {}); + // The queue sends the second event. + assert_update!(watch => sent { txn = txn2, }); + assert!(watch.is_empty()); }