diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 41cfcc490da6..41ee1091a3b2 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -16,6 +16,7 @@ // under the License. //! An in-memory object store implementation +use crate::multipart::{MultiPartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, @@ -28,8 +29,8 @@ use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; -use std::collections::BTreeMap; use std::collections::BTreeSet; +use std::collections::{BTreeMap, HashMap}; use std::io; use std::ops::Range; use std::pin::Pin; @@ -52,6 +53,12 @@ enum Error { #[snafu(display("ETag required for conditional update"))] MissingETag, + + #[snafu(display("MultipartUpload not found: {id}"))] + UploadNotFound { id: String }, + + #[snafu(display("Missing part at index: {part}"))] + MissingPart { part: usize }, } impl From for super::Error { @@ -101,6 +108,12 @@ impl Entry { struct Storage { next_etag: usize, map: BTreeMap, + uploads: HashMap, +} + +#[derive(Debug, Default, Clone)] +struct PartStorage { + parts: Vec>, } type SharedStorage = Arc>; @@ -154,6 +167,24 @@ impl Storage { } } } + + fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> { + let parts = id + .parse() + .ok() + .and_then(|x| self.uploads.get_mut(&x)) + .context(UploadNotFoundSnafu { id })?; + Ok(parts) + } + + fn remove_upload(&mut self, id: &MultipartId) -> Result { + let parts = id + .parse() + .ok() + .and_then(|x| self.uploads.remove(&x)) + .context(UploadNotFoundSnafu { id })?; + Ok(parts) + } } impl std::fmt::Display for InMemory { @@ -359,6 +390,64 @@ impl ObjectStore for InMemory { } } +#[async_trait] +impl MultiPartStore for InMemory { + async fn create_multipart(&self, _path: &Path) -> Result { + let mut storage = self.storage.write(); + let etag = storage.next_etag; + storage.next_etag += 1; + storage.uploads.insert(etag, Default::default()); + Ok(etag.to_string()) + } + + async fn put_part( + &self, + _path: &Path, + id: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result { + let mut storage = self.storage.write(); + let upload = storage.upload_mut(id)?; + if part_idx <= upload.parts.len() { + upload.parts.resize(part_idx + 1, None); + } + upload.parts[part_idx] = Some(data); + Ok(PartId { + content_id: Default::default(), + }) + } + + async fn complete_multipart( + &self, + path: &Path, + id: &MultipartId, + _parts: Vec, + ) -> Result { + let mut storage = self.storage.write(); + let upload = storage.remove_upload(id)?; + + let mut cap = 0; + for (part, x) in upload.parts.iter().enumerate() { + cap += x.as_ref().context(MissingPartSnafu { part })?.len(); + } + let mut buf = Vec::with_capacity(cap); + for x in &upload.parts { + buf.extend_from_slice(x.as_ref().unwrap()) + } + let etag = storage.insert(path, buf.into()); + Ok(PutResult { + e_tag: Some(etag.to_string()), + version: None, + }) + } + + async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> { + self.storage.write().remove_upload(id)?; + Ok(()) + } +} + impl InMemory { /// Create new in-memory storage. pub fn new() -> Self { @@ -444,6 +533,7 @@ mod tests { copy_if_not_exists(&integration).await; stream_get(&integration).await; put_opts(&integration, true).await; + multipart(&integration, &integration).await; } #[tokio::test]