Skip to content

Commit

Permalink
Add additional WriteMultipart tests (#5743) (#5746)
Browse files Browse the repository at this point in the history
* Add additional WriteMultipart tests (#5743)

* Clippy
  • Loading branch information
tustvold authored May 11, 2024
1 parent 3d3ddb2 commit 7efe6c2
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
12 changes: 12 additions & 0 deletions object_store/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,18 @@ pub async fn stream_get(storage: &DynObjectStore) {
let meta = storage.head(&location).await.unwrap();
assert_eq!(meta.size, 6);

let location = Path::from("test_dir/test_put_part_mixed.txt");
let upload = storage.put_multipart(&location).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.put(vec![0; 2].into());
write.write(&[1, 2, 3]);
write.put(vec![4, 5, 6, 7].into());
write.finish().await.unwrap();

let r = storage.get(&location).await.unwrap();
let r = r.bytes().await.unwrap();
assert_eq!(r.as_ref(), &[0, 0, 1, 2, 3, 4, 5, 6, 7]);

// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
let mut upload = storage.put_multipart(&location).await.unwrap();
Expand Down
69 changes: 69 additions & 0 deletions object_store/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,13 @@ impl WriteMultipart {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;

use futures::FutureExt;
use parking_lot::Mutex;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};

use crate::memory::InMemory;
use crate::path::Path;
Expand All @@ -246,4 +250,69 @@ mod tests {
assert!(write.wait_for_capacity(10).now_or_never().is_none());
write.wait_for_capacity(10).await.unwrap()
}

#[derive(Debug, Default)]
struct InstrumentedUpload {
chunks: Arc<Mutex<Vec<PutPayload>>>,
}

#[async_trait]
impl MultipartUpload for InstrumentedUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
self.chunks.lock().push(data);
futures::future::ready(Ok(())).boxed()
}

async fn complete(&mut self) -> Result<PutResult> {
Ok(PutResult {
e_tag: None,
version: None,
})
}

async fn abort(&mut self) -> Result<()> {
unimplemented!()
}
}

#[tokio::test]
async fn test_write_multipart() {
let mut rng = StdRng::seed_from_u64(42);

for method in [0.0, 0.5, 1.0] {
for _ in 0..10 {
for chunk_size in [1, 17, 23] {
let upload = Box::<InstrumentedUpload>::default();
let chunks = Arc::clone(&upload.chunks);
let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);

let mut expected = Vec::with_capacity(1024);

for _ in 0..50 {
let chunk_size = rng.gen_range(0..30);
let data: Vec<_> = (0..chunk_size).map(|_| rng.gen()).collect();
expected.extend_from_slice(&data);

match rng.gen_bool(method) {
true => write.put(data.into()),
false => write.write(&data),
}
}
write.finish().await.unwrap();

let chunks = chunks.lock();

let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect();
assert_eq!(expected, actual);

for chunk in chunks.iter().take(chunks.len() - 1) {
assert_eq!(chunk.content_length(), chunk_size)
}

let last_chunk = chunks.last().unwrap().content_length();
assert!(last_chunk <= chunk_size, "{chunk_size}");
}
}
}
}
}

0 comments on commit 7efe6c2

Please sign in to comment.