Skip to content

Commit

Permalink
Implement MultiPartStore for InMemory
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 11, 2024
1 parent 82fc0df commit 5492390
Showing 1 changed file with 91 additions and 1 deletion.
92 changes: 91 additions & 1 deletion object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! An in-memory object store implementation
use crate::multipart::{MultiPartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore,
Expand All @@ -28,8 +29,8 @@ use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::{BTreeMap, HashMap};
use std::io;
use std::ops::Range;
use std::pin::Pin;
Expand All @@ -52,6 +53,12 @@ enum Error {

#[snafu(display("ETag required for conditional update"))]
MissingETag,

#[snafu(display("MultipartUpload not found: {id}"))]
UploadNotFound { id: String },

#[snafu(display("Missing part at index: {part}"))]
MissingPart { part: usize },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -101,6 +108,12 @@ impl Entry {
struct Storage {
next_etag: usize,
map: BTreeMap<Path, Entry>,
uploads: HashMap<usize, PartStorage>,
}

#[derive(Debug, Default, Clone)]
struct PartStorage {
parts: Vec<Option<Bytes>>,
}

type SharedStorage = Arc<RwLock<Storage>>;
Expand Down Expand Up @@ -154,6 +167,24 @@ impl Storage {
}
}
}

fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
let parts = id
.parse()
.ok()
.and_then(|x| self.uploads.get_mut(&x))
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}

fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
let parts = id
.parse()
.ok()
.and_then(|x| self.uploads.remove(&x))
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}
}

impl std::fmt::Display for InMemory {
Expand Down Expand Up @@ -359,6 +390,64 @@ impl ObjectStore for InMemory {
}
}

#[async_trait]
impl MultiPartStore for InMemory {
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
storage.next_etag += 1;
storage.uploads.insert(etag, Default::default());
Ok(etag.to_string())
}

async fn put_part(
&self,
_path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(data);
Ok(PartId {
content_id: Default::default(),
})
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
_parts: Vec<PartId>,
) -> Result<PutResult> {
let mut storage = self.storage.write();
let upload = storage.remove_upload(id)?;

let mut cap = 0;
for (part, x) in upload.parts.iter().enumerate() {
cap += x.as_ref().context(MissingPartSnafu { part })?.len();
}
let mut buf = Vec::with_capacity(cap);
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
let etag = storage.insert(path, buf.into());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
}

async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
self.storage.write().remove_upload(id)?;
Ok(())
}
}

impl InMemory {
/// Create new in-memory storage.
pub fn new() -> Self {
Expand Down Expand Up @@ -444,6 +533,7 @@ mod tests {
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
}

#[tokio::test]
Expand Down

0 comments on commit 5492390

Please sign in to comment.