From e1b77f629171f90a6efb0d031acde47d6261b4d8 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Thu, 7 Nov 2024 13:39:42 +0100 Subject: [PATCH] feat(send queue): retry uploads if they've failed with transient errors --- crates/matrix-sdk/src/send_queue.rs | 19 ++-- crates/matrix-sdk/src/test_utils/mocks.rs | 14 +++ .../tests/integration/send_queue.rs | 88 +++++++++++++++++++ 3 files changed, 116 insertions(+), 5 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index a842a9e895e..d460affab2d 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -171,7 +171,7 @@ use crate::{ config::RequestConfig, error::RetryKind, room::{edit::EditedContent, WeakRoom}, - Client, Room, + Client, Media, Room, }; mod upload; @@ -709,18 +709,27 @@ impl RoomSendQueue { let media_source = if room.is_encrypted().await? { trace!("upload will be encrypted (encrypted room)"); let mut cursor = std::io::Cursor::new(data); - let encrypted_file = - room.client().upload_encrypted_file(&mime, &mut cursor).await?; + let encrypted_file = room + .client() + .upload_encrypted_file(&mime, &mut cursor) + .with_request_config(RequestConfig::short_retry()) + .await?; MediaSource::Encrypted(Box::new(encrypted_file)) } else { trace!("upload will be in clear text (room without encryption)"); - let res = room.client().media().upload(&mime, data).await?; + let request_config = RequestConfig::short_retry() + .timeout(Media::reasonable_upload_timeout(&data)); + let res = + room.client().media().upload(&mime, data, Some(request_config)).await?; MediaSource::Plain(res.content_uri) }; #[cfg(not(feature = "e2e-encryption"))] let media_source = { - let res = room.client().media().upload(&mime, data).await?; + let request_config = RequestConfig::short_retry() + .timeout(Media::reasonable_upload_timeout(&data)); + let res = + room.client().media().upload(&mime, data, Some(request_config)).await?; MediaSource::Plain(res.content_uri) }; diff --git a/crates/matrix-sdk/src/test_utils/mocks.rs b/crates/matrix-sdk/src/test_utils/mocks.rs index 35f6970e2af..874568eef3f 100644 --- a/crates/matrix-sdk/src/test_utils/mocks.rs +++ b/crates/matrix-sdk/src/test_utils/mocks.rs @@ -293,6 +293,13 @@ impl<'a> MatrixMock<'a> { Self { mock: self.mock.up_to_n_times(1).expect(1), ..self } } + /// Specify an upper limit to the number of times you would like this + /// [`MatrixMock`] to respond to incoming requests that satisfy the + /// conditions imposed by your matchers. + pub fn up_to_n_times(self, num: u64) -> Self { + Self { mock: self.mock.up_to_n_times(num), ..self } + } + /// Mount a [`MatrixMock`] on the attached server. /// /// The [`MatrixMock`] will remain active until the [`MatrixMockServer`] is @@ -518,6 +525,13 @@ impl<'a> MockUpload<'a> { MatrixMock { server: self.server, mock } } + /// Returns a send endpoint that emulates a transient failure, i.e responds + /// with error 500. + pub fn error500(self) -> MatrixMock<'a> { + let mock = self.mock.respond_with(ResponseTemplate::new(500)); + MatrixMock { server: self.server, mock } + } + /// Specify how to respond to a query (viz., like /// [`MockBuilder::respond_with`] does), when other predefined responses /// aren't sufficient. diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index c8ee9496090..ef7bd06a6cd 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -1932,3 +1932,91 @@ async fn test_media_uploads() { // That's all, folks! assert!(watch.is_empty()); } + +#[async_test] +async fn test_media_upload_retry() { + let mock = MatrixMockServer::new().await; + + // Mark the room as joined. + let room_id = room_id!("!a:b.c"); + let client = mock.make_client().await; + let room = mock.sync_joined_room(&client, room_id).await; + + let q = room.send_queue(); + let (local_echoes, mut watch) = q.subscribe().await.unwrap(); + assert!(local_echoes.is_empty()); + + // Create the media to send (no thumbnails). + let filename = "surprise.jpeg.exe"; + let content_type = mime::IMAGE_JPEG; + let data = b"hello world".to_vec(); + + let config = AttachmentConfig::new().info(AttachmentInfo::Image(BaseImageInfo { + height: Some(uint!(13)), + width: Some(uint!(37)), + size: Some(uint!(42)), + blurhash: None, + })); + + // Prepare endpoints. + mock.mock_room_state_encryption().plain().mount().await; + + // Fail for the first three attempts. + mock.mock_upload() + .expect_mime_type("image/jpeg") + .error500() + .up_to_n_times(3) + .expect(3) + .mount() + .await; + + // Send the media. + assert!(watch.is_empty()); + q.send_attachment(filename, content_type, data, config) + .await + .expect("queuing the attachment works"); + + // Observe the local echo. + let (event_txn, _send_handle, content) = assert_update!(watch => local echo event); + assert_let!(MessageType::Image(img_content) = content.msgtype); + assert_eq!(img_content.body, filename); + + // Let the upload stumble and the queue disable itself. + let error = assert_update!(watch => error { recoverable=true, txn=event_txn }); + let error = error.as_client_api_error().unwrap(); + assert_eq!(error.status_code, 500); + assert!(q.is_enabled().not()); + + // Mount the mock for the upload and sending the event. + mock.mock_upload() + .expect_mime_type("image/jpeg") + .ok(mxc_uri!("mxc://sdk.rs/media")) + .mock_once() + .mount() + .await; + mock.mock_room_send().ok(event_id!("$1")).mock_once().mount().await; + + // Restart the send queue. + q.set_enabled(true); + + assert_update!(watch => uploaded { + related_to = event_txn, + mxc = mxc_uri!("mxc://sdk.rs/media") + }); + + let edit_msg = assert_update!(watch => edit local echo { + txn = event_txn + }); + assert_let!(MessageType::Image(new_content) = edit_msg.msgtype); + assert_let!(MediaSource::Plain(new_uri) = &new_content.source); + assert_eq!(new_uri, mxc_uri!("mxc://sdk.rs/media")); + + // The event is sent, at some point. + assert_update!(watch => sent { + txn = event_txn, + event_id = event_id!("$1") + }); + + // That's all, folks! + assert!(watch.is_empty()); +}