Skip to content

Commit cba7224

Browse files
committed
feat!(send queue): add a priority field to maintain ordering of sending
Prior to this patch, the send queue would not maintain the ordering of sending a media *then* a text, because it would push back a dependent request graduating into a queued request. The solution implemented here consists in adding a new priority column to the send queue, defaulting to 0 for existing events, and use higher priorities for the media uploads, so they're considered before other requests. A high priority is also used for aggregation events that are sent late, so they're sent as soon as possible, before other subsequent events.
1 parent b4fe6b4 commit cba7224

File tree

10 files changed

+158
-23
lines changed

10 files changed

+158
-23
lines changed

crates/matrix-sdk-base/src/store/integration_tests.rs

+72-6
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ pub trait StateStoreIntegrationTests {
8585
async fn test_display_names_saving(&self);
8686
/// Test operations with the send queue.
8787
async fn test_send_queue(&self);
88+
/// Test priority of operations with the send queue.
89+
async fn test_send_queue_priority(&self);
8890
/// Test operations related to send queue dependents.
8991
async fn test_send_queue_dependents(&self);
9092
/// Test saving/restoring server capabilities.
@@ -1212,7 +1214,7 @@ impl StateStoreIntegrationTests for DynStateStore {
12121214
let event0 =
12131215
SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
12141216
.unwrap();
1215-
self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap();
1217+
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
12161218

12171219
// Reading it will work.
12181220
let pending = self.load_send_queue_requests(room_id).await.unwrap();
@@ -1236,7 +1238,7 @@ impl StateStoreIntegrationTests for DynStateStore {
12361238
)
12371239
.unwrap();
12381240

1239-
self.save_send_queue_request(room_id, txn, event.into()).await.unwrap();
1241+
self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap();
12401242
}
12411243

12421244
// Reading all the events should work.
@@ -1334,7 +1336,7 @@ impl StateStoreIntegrationTests for DynStateStore {
13341336
let event =
13351337
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
13361338
.unwrap();
1337-
self.save_send_queue_request(room_id2, txn.clone(), event.into()).await.unwrap();
1339+
self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap();
13381340
}
13391341

13401342
// Add and remove one event for room3.
@@ -1344,7 +1346,7 @@ impl StateStoreIntegrationTests for DynStateStore {
13441346
let event =
13451347
SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
13461348
.unwrap();
1347-
self.save_send_queue_request(room_id3, txn.clone(), event.into()).await.unwrap();
1349+
self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap();
13481350

13491351
self.remove_send_queue_request(room_id3, &txn).await.unwrap();
13501352
}
@@ -1357,6 +1359,64 @@ impl StateStoreIntegrationTests for DynStateStore {
13571359
assert!(outstanding_rooms.iter().any(|room| room == room_id2));
13581360
}
13591361

1362+
async fn test_send_queue_priority(&self) {
1363+
let room_id = room_id!("!test_send_queue:localhost");
1364+
1365+
// No queued event in store at first.
1366+
let events = self.load_send_queue_requests(room_id).await.unwrap();
1367+
assert!(events.is_empty());
1368+
1369+
// Saving one request should work.
1370+
let low0_txn = TransactionId::new();
1371+
let ev0 =
1372+
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
1373+
.unwrap();
1374+
self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap();
1375+
1376+
// Saving one request with higher priority should work.
1377+
let high_txn = TransactionId::new();
1378+
let ev1 =
1379+
SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
1380+
.unwrap();
1381+
self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap();
1382+
1383+
// Saving another request with the low priority should work.
1384+
let low1_txn = TransactionId::new();
1385+
let ev2 =
1386+
SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
1387+
.unwrap();
1388+
self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap();
1389+
1390+
// The requests should be ordered from higher priority to lower, and when equal,
1391+
// should use the insertion order instead.
1392+
let pending = self.load_send_queue_requests(room_id).await.unwrap();
1393+
1394+
assert_eq!(pending.len(), 3);
1395+
{
1396+
assert_eq!(pending[0].transaction_id, high_txn);
1397+
1398+
let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1399+
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1400+
assert_eq!(content.body(), "high");
1401+
}
1402+
1403+
{
1404+
assert_eq!(pending[1].transaction_id, low0_txn);
1405+
1406+
let deserialized = pending[1].as_event().unwrap().deserialize().unwrap();
1407+
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1408+
assert_eq!(content.body(), "low0");
1409+
}
1410+
1411+
{
1412+
assert_eq!(pending[2].transaction_id, low1_txn);
1413+
1414+
let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1415+
assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1416+
assert_eq!(content.body(), "low1");
1417+
}
1418+
}
1419+
13601420
async fn test_send_queue_dependents(&self) {
13611421
let room_id = room_id!("!test_send_queue_dependents:localhost");
13621422

@@ -1365,7 +1425,7 @@ impl StateStoreIntegrationTests for DynStateStore {
13651425
let event0 =
13661426
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
13671427
.unwrap();
1368-
self.save_send_queue_request(room_id, txn0.clone(), event0.into()).await.unwrap();
1428+
self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap();
13691429

13701430
// No dependents, to start with.
13711431
assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
@@ -1427,7 +1487,7 @@ impl StateStoreIntegrationTests for DynStateStore {
14271487
let event1 =
14281488
SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
14291489
.unwrap();
1430-
self.save_send_queue_request(room_id, txn1.clone(), event1.into()).await.unwrap();
1490+
self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap();
14311491

14321492
self.save_dependent_queued_request(
14331493
room_id,
@@ -1609,6 +1669,12 @@ macro_rules! statestore_integration_tests {
16091669
store.test_send_queue().await;
16101670
}
16111671

1672+
#[async_test]
1673+
async fn test_send_queue_priority() {
1674+
let store = get_store().await.expect("creating store failed").into_state_store();
1675+
store.test_send_queue_priority().await;
1676+
}
1677+
16121678
#[async_test]
16131679
async fn test_send_queue_dependents() {
16141680
let store = get_store().await.expect("creating store failed").into_state_store();

crates/matrix-sdk-base/src/store/memory_store.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -807,13 +807,14 @@ impl StateStore for MemoryStore {
807807
room_id: &RoomId,
808808
transaction_id: OwnedTransactionId,
809809
kind: QueuedRequestKind,
810+
priority: usize,
810811
) -> Result<(), Self::Error> {
811812
self.send_queue_events
812813
.write()
813814
.unwrap()
814815
.entry(room_id.to_owned())
815816
.or_default()
816-
.push(QueuedRequest { kind, transaction_id, error: None });
817+
.push(QueuedRequest { kind, transaction_id, error: None, priority });
817818
Ok(())
818819
}
819820

@@ -867,7 +868,11 @@ impl StateStore for MemoryStore {
867868
&self,
868869
room_id: &RoomId,
869870
) -> Result<Vec<QueuedRequest>, Self::Error> {
870-
Ok(self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone())
871+
let mut ret =
872+
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().clone();
873+
// Inverted order of priority, use stable sort to keep insertion order.
874+
ret.sort_by(|lhs, rhs| rhs.priority.cmp(&lhs.priority));
875+
Ok(ret)
871876
}
872877

873878
async fn update_send_queue_request_status(

crates/matrix-sdk-base/src/store/send_queue.rs

+6
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ pub struct QueuedRequest {
125125
///
126126
/// `None` if the request is in the queue, waiting to be sent.
127127
pub error: Option<QueueWedgeError>,
128+
129+
/// At which priority should this be handled?
130+
///
131+
/// The bigger the value, the higher the priority at which this request
132+
/// should be handled.
133+
pub priority: usize,
128134
}
129135

130136
impl QueuedRequest {

crates/matrix-sdk-base/src/store/traits.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ pub trait StateStore: AsyncTraitDeps {
358358
room_id: &RoomId,
359359
transaction_id: OwnedTransactionId,
360360
request: QueuedRequestKind,
361+
priority: usize,
361362
) -> Result<(), Self::Error>;
362363

363364
/// Updates a send queue request with the given content, and resets its
@@ -390,6 +391,10 @@ pub trait StateStore: AsyncTraitDeps {
390391
) -> Result<bool, Self::Error>;
391392

392393
/// Loads all the send queue requests for the given room.
394+
///
395+
/// The resulting vector of queued requests should be ordered from higher
396+
/// priority to lower priority, and respect the insertion order when
397+
/// priorities are equal.
393398
async fn load_send_queue_requests(
394399
&self,
395400
room_id: &RoomId,
@@ -641,8 +646,12 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
641646
room_id: &RoomId,
642647
transaction_id: OwnedTransactionId,
643648
content: QueuedRequestKind,
649+
priority: usize,
644650
) -> Result<(), Self::Error> {
645-
self.0.save_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
651+
self.0
652+
.save_send_queue_request(room_id, transaction_id, content, priority)
653+
.await
654+
.map_err(Into::into)
646655
}
647656

648657
async fn update_send_queue_request(

crates/matrix-sdk-indexeddb/src/state_store/mod.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,8 @@ struct PersistedQueuedRequest {
437437

438438
pub error: Option<QueueWedgeError>,
439439

440+
priority: Option<usize>,
441+
440442
// Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
441443
/// Deprecated (from old format), now replaced with error field.
442444
is_wedged: Option<bool>,
@@ -459,7 +461,10 @@ impl PersistedQueuedRequest {
459461
_ => self.error,
460462
};
461463

462-
Some(QueuedRequest { kind, transaction_id: self.transaction_id, error })
464+
// By default, events without a priority have a priority of 0.
465+
let priority = self.priority.unwrap_or(0);
466+
467+
Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority })
463468
}
464469
}
465470

@@ -1329,6 +1334,7 @@ impl_state_store!({
13291334
room_id: &RoomId,
13301335
transaction_id: OwnedTransactionId,
13311336
kind: QueuedRequestKind,
1337+
priority: usize,
13321338
) -> Result<()> {
13331339
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
13341340

@@ -1357,6 +1363,7 @@ impl_state_store!({
13571363
error: None,
13581364
is_wedged: None,
13591365
event: None,
1366+
priority: Some(priority),
13601367
});
13611368

13621369
// Save the new vector into db.
@@ -1460,11 +1467,14 @@ impl_state_store!({
14601467
.get(&encoded_key)?
14611468
.await?;
14621469

1463-
let prev = prev.map_or_else(
1470+
let mut prev = prev.map_or_else(
14641471
|| Ok(Vec::new()),
14651472
|val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
14661473
)?;
14671474

1475+
// Inverted stable ordering on priority.
1476+
prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1477+
14681478
Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
14691479
}
14701480

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- Add a priority column, defaulting to 0 for all events in the send queue.
2+
ALTER TABLE "send_queue_events"
3+
ADD COLUMN "priority" INTEGER NOT NULL DEFAULT 0;

crates/matrix-sdk-sqlite/src/state_store.rs

+18-5
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ mod keys {
6969
/// This is used to figure whether the sqlite database requires a migration.
7070
/// Every new SQL migration should imply a bump of this number, and changes in
7171
/// the [`SqliteStateStore::run_migrations`] function..
72-
const DATABASE_VERSION: u8 = 9;
72+
const DATABASE_VERSION: u8 = 10;
7373

7474
/// A sqlite based cryptostore.
7575
#[derive(Clone)]
@@ -307,6 +307,17 @@ impl SqliteStateStore {
307307
.await?;
308308
}
309309

310+
if from < 10 && to >= 10 {
311+
conn.with_transaction(move |txn| {
312+
// Run the migration.
313+
txn.execute_batch(include_str!(
314+
"../migrations/state_store/009_send_queue_priority.sql"
315+
))?;
316+
txn.set_db_version(10)
317+
})
318+
.await?;
319+
}
320+
310321
Ok(())
311322
}
312323

@@ -1685,6 +1696,7 @@ impl StateStore for SqliteStateStore {
16851696
room_id: &RoomId,
16861697
transaction_id: OwnedTransactionId,
16871698
content: QueuedRequestKind,
1699+
priority: usize,
16881700
) -> Result<(), Self::Error> {
16891701
let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
16901702
let room_id_value = self.serialize_value(&room_id.to_owned())?;
@@ -1699,7 +1711,7 @@ impl StateStore for SqliteStateStore {
16991711
self.acquire()
17001712
.await?
17011713
.with_transaction(move |txn| {
1702-
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content) VALUES (?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content))?;
1714+
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?;
17031715
Ok(())
17041716
})
17051717
.await
@@ -1761,14 +1773,14 @@ impl StateStore for SqliteStateStore {
17611773
// Note: ROWID is always present and is an auto-incremented integer counter. We
17621774
// want to maintain the insertion order, so we can sort using it.
17631775
// Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1764-
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = self
1776+
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize)> = self
17651777
.acquire()
17661778
.await?
17671779
.prepare(
1768-
"SELECT transaction_id, content, wedge_reason FROM send_queue_events WHERE room_id = ? ORDER BY ROWID",
1780+
"SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
17691781
|mut stmt| {
17701782
stmt.query((room_id,))?
1771-
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
1783+
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
17721784
.collect()
17731785
},
17741786
)
@@ -1780,6 +1792,7 @@ impl StateStore for SqliteStateStore {
17801792
transaction_id: entry.0.into(),
17811793
kind: self.deserialize_json(&entry.1)?,
17821794
error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1795+
priority: entry.3,
17831796
});
17841797
}
17851798

0 commit comments

Comments
 (0)