Skip to content

Commit

Permalink
feat(send queue): retry uploads if they've failed with transient errors
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Nov 12, 2024
1 parent 9f43946 commit e1b77f6
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 5 deletions.
19 changes: 14 additions & 5 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ use crate::{
config::RequestConfig,
error::RetryKind,
room::{edit::EditedContent, WeakRoom},
Client, Room,
Client, Media, Room,
};

mod upload;
Expand Down Expand Up @@ -709,18 +709,27 @@ impl RoomSendQueue {
let media_source = if room.is_encrypted().await? {
trace!("upload will be encrypted (encrypted room)");
let mut cursor = std::io::Cursor::new(data);
let encrypted_file =
room.client().upload_encrypted_file(&mime, &mut cursor).await?;
let encrypted_file = room
.client()
.upload_encrypted_file(&mime, &mut cursor)
.with_request_config(RequestConfig::short_retry())
.await?;
MediaSource::Encrypted(Box::new(encrypted_file))
} else {
trace!("upload will be in clear text (room without encryption)");
let res = room.client().media().upload(&mime, data).await?;
let request_config = RequestConfig::short_retry()
.timeout(Media::reasonable_upload_timeout(&data));
let res =
room.client().media().upload(&mime, data, Some(request_config)).await?;
MediaSource::Plain(res.content_uri)
};

#[cfg(not(feature = "e2e-encryption"))]
let media_source = {
let res = room.client().media().upload(&mime, data).await?;
let request_config = RequestConfig::short_retry()
.timeout(Media::reasonable_upload_timeout(&data));
let res =
room.client().media().upload(&mime, data, Some(request_config)).await?;
MediaSource::Plain(res.content_uri)
};

Expand Down
14 changes: 14 additions & 0 deletions crates/matrix-sdk/src/test_utils/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ impl<'a> MatrixMock<'a> {
Self { mock: self.mock.up_to_n_times(1).expect(1), ..self }
}

/// Specify an upper limit to the number of times you would like this
/// [`MatrixMock`] to respond to incoming requests that satisfy the
/// conditions imposed by your matchers.
pub fn up_to_n_times(self, num: u64) -> Self {
Self { mock: self.mock.up_to_n_times(num), ..self }
}

/// Mount a [`MatrixMock`] on the attached server.
///
/// The [`MatrixMock`] will remain active until the [`MatrixMockServer`] is
Expand Down Expand Up @@ -518,6 +525,13 @@ impl<'a> MockUpload<'a> {
MatrixMock { server: self.server, mock }
}

/// Returns a send endpoint that emulates a transient failure, i.e responds
/// with error 500.
pub fn error500(self) -> MatrixMock<'a> {
let mock = self.mock.respond_with(ResponseTemplate::new(500));
MatrixMock { server: self.server, mock }
}

/// Specify how to respond to a query (viz., like
/// [`MockBuilder::respond_with`] does), when other predefined responses
/// aren't sufficient.
Expand Down
88 changes: 88 additions & 0 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1932,3 +1932,91 @@ async fn test_media_uploads() {
// That's all, folks!
assert!(watch.is_empty());
}

#[async_test]
async fn test_media_upload_retry() {
let mock = MatrixMockServer::new().await;

// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.make_client().await;
let room = mock.sync_joined_room(&client, room_id).await;

let q = room.send_queue();
let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());

// Create the media to send (no thumbnails).
let filename = "surprise.jpeg.exe";
let content_type = mime::IMAGE_JPEG;
let data = b"hello world".to_vec();

let config = AttachmentConfig::new().info(AttachmentInfo::Image(BaseImageInfo {
height: Some(uint!(13)),
width: Some(uint!(37)),
size: Some(uint!(42)),
blurhash: None,
}));

// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;

// Fail for the first three attempts.
mock.mock_upload()
.expect_mime_type("image/jpeg")
.error500()
.up_to_n_times(3)
.expect(3)
.mount()
.await;

// Send the media.
assert!(watch.is_empty());
q.send_attachment(filename, content_type, data, config)
.await
.expect("queuing the attachment works");

// Observe the local echo.
let (event_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.body, filename);

// Let the upload stumble and the queue disable itself.
let error = assert_update!(watch => error { recoverable=true, txn=event_txn });
let error = error.as_client_api_error().unwrap();
assert_eq!(error.status_code, 500);
assert!(q.is_enabled().not());

// Mount the mock for the upload and sending the event.
mock.mock_upload()
.expect_mime_type("image/jpeg")
.ok(mxc_uri!("mxc://sdk.rs/media"))
.mock_once()
.mount()
.await;
mock.mock_room_send().ok(event_id!("$1")).mock_once().mount().await;

// Restart the send queue.
q.set_enabled(true);

assert_update!(watch => uploaded {
related_to = event_txn,
mxc = mxc_uri!("mxc://sdk.rs/media")
});

let edit_msg = assert_update!(watch => edit local echo {
txn = event_txn
});
assert_let!(MessageType::Image(new_content) = edit_msg.msgtype);
assert_let!(MediaSource::Plain(new_uri) = &new_content.source);
assert_eq!(new_uri, mxc_uri!("mxc://sdk.rs/media"));

// The event is sent, at some point.
assert_update!(watch => sent {
txn = event_txn,
event_id = event_id!("$1")
});

// That's all, folks!
assert!(watch.is_empty());
}

0 comments on commit e1b77f6

Please sign in to comment.