Skip to content

Commit

Permalink
Remove ObjectStore::append (#5016)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Nov 1, 2023
1 parent 65f7be8 commit 94fe6bb
Show file tree
Hide file tree
Showing 7 changed files with 1 addition and 277 deletions.
5 changes: 0 additions & 5 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util", "fs"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }

[target.'cfg(target_family="unix")'.dev-dependencies]
Expand Down
31 changes: 1 addition & 30 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@
//!
//! This provides some compelling advantages:
//!
//! * Except where explicitly stated otherwise, operations are atomic, and readers
//! cannot observe partial and/or failed writes
//! * All operations are atomic, and readers cannot observe partial and/or failed writes
//! * Methods map directly to object store APIs, providing both efficiency and predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
//! * Allows for functionality not native to filesystems, such as operation preconditions
Expand Down Expand Up @@ -559,30 +558,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// vary by object store.
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>;

/// Returns an [`AsyncWrite`] that can be used to append to the object at `location`
///
/// A new object will be created if it doesn't already exist, otherwise it will be
/// opened, with subsequent writes appended to the end.
///
/// This operation cannot be supported by all stores, most use-cases should prefer
/// [`ObjectStore::put`] and [`ObjectStore::put_multipart`] for better portability
/// and stronger guarantees
///
/// This API is not guaranteed to be atomic, in particular
///
/// * On error, `location` may contain partial data
/// * Concurrent calls to [`ObjectStore::list`] may return partially written objects
/// * Concurrent calls to [`ObjectStore::get`] may return partially written data
/// * Concurrent calls to [`ObjectStore::put`] may result in data loss / corruption
/// * Concurrent calls to [`ObjectStore::append`] may result in data loss / corruption
///
/// Additionally some stores, such as Azure, may only support appending to objects created
/// with [`ObjectStore::append`], and not with [`ObjectStore::put`], [`ObjectStore::copy`], or
/// [`ObjectStore::put_multipart`]
async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
Err(Error::NotImplemented)
}

/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
Expand Down Expand Up @@ -779,10 +754,6 @@ macro_rules! as_ref_impl {
self.as_ref().abort_multipart(location, multipart_id).await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
self.as_ref().append(location).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}
Expand Down
7 changes: 0 additions & 7 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.abort_multipart(location, multipart_id).await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let write = self.inner.append(location).await?;
Ok(Box::new(PermitWrapper::new(write, permit)))
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let r = self.inner.get(location).await?;
Expand Down
126 changes: 0 additions & 126 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,45 +350,6 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
// Get the path to the file from the configuration.
let path = self.config.path_to_filesystem(location)?;
loop {
// Create new `OpenOptions`.
let mut options = tokio::fs::OpenOptions::new();

// Attempt to open the file with the given options.
match options
.truncate(false)
.append(true)
.create(true)
.open(&path)
.await
{
// If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object.
Ok(file) => return Ok(Box::new(file)),
// If the error is that the file was not found, attempt to create the file and any necessary parent directories.
Err(source) if source.kind() == ErrorKind::NotFound => {
// Get the path to the parent directory of the file.
let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

// Create the parent directory and any necessary ancestors.
tokio::fs::create_dir_all(parent)
.await
// If creating the directory fails, return a `UnableToCreateDirSnafu` error.
.context(UnableToCreateDirSnafu { path: parent })?;
// Try again to open the file.
continue;
}
// If any other error occurs, return a `UnableToOpenFile` error.
Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()),
}
}
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let location = location.clone();
let path = self.config.path_to_filesystem(&location)?;
Expand Down Expand Up @@ -1449,97 +1410,10 @@ mod tests {
mod not_wasm_tests {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
use bytes::Bytes;
use std::time::Duration;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn creates_dir_if_not_present_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("nested/file/test_file");

let data = Bytes::from("arbitrary data");
let expected_data = data.clone();

let mut writer = integration.append(&location).await.unwrap();

writer.write_all(data.as_ref()).await.unwrap();

writer.flush().await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn unknown_length_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("some_file");

let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let mut writer = integration.append(&location).await.unwrap();

writer.write_all(data.as_ref()).await.unwrap();
writer.flush().await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn multiple_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("some_file");

let data = vec![
Bytes::from("arbitrary"),
Bytes::from("data"),
Bytes::from("gnz"),
];

let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}
writer.flush().await.unwrap();

let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}
writer.flush().await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn test_cleanup_intermediate_files() {
let root = TempDir::new().unwrap();
Expand Down
99 changes: 0 additions & 99 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,6 @@ impl ObjectStore for InMemory {
Ok(())
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
Ok(Box::new(InMemoryAppend {
location: location.clone(),
data: Vec::<u8>::new(),
storage: SharedStorage::clone(&self.storage),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();
Expand Down Expand Up @@ -443,53 +435,8 @@ impl AsyncWrite for InMemoryUpload {
}
}

struct InMemoryAppend {
location: Path,
data: Vec<u8>,
storage: Arc<RwLock<Storage>>,
}

impl AsyncWrite for InMemoryAppend {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.data.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}

fn poll_flush(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
let storage = Arc::clone(&self.storage);

let mut writer = storage.write();

if let Some(entry) = writer.map.remove(&self.location) {
let buf = std::mem::take(&mut self.data);
let concat = Bytes::from_iter(entry.data.into_iter().chain(buf));
writer.insert(&self.location, concat);
} else {
let data = Bytes::from(std::mem::take(&mut self.data));
writer.insert(&self.location, data);
};
Poll::Ready(Ok(()))
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
self.poll_flush(cx)
}
}

#[cfg(test)]
mod tests {
use tokio::io::AsyncWriteExt;

use super::*;

use crate::tests::*;
Expand Down Expand Up @@ -577,50 +524,4 @@ mod tests {
panic!("unexpected error type: {err:?}");
}
}

#[tokio::test]
async fn test_append_new() {
let in_memory = InMemory::new();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();

let mut writer = in_memory.append(&location).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.flush().await.unwrap();

let read_data = in_memory
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn test_append_existing() {
let in_memory = InMemory::new();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary");
let data_appended = Bytes::from(" data");
let expected_data = Bytes::from("arbitrary data");

let mut writer = in_memory.append(&location).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.flush().await.unwrap();

writer.write_all(&data_appended).await.unwrap();
writer.flush().await.unwrap();

let read_data = in_memory
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}
}
6 changes: 0 additions & 6 deletions object_store/src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
let full_path = self.full_path(location);
self.inner.abort_multipart(&full_path, multipart_id).await
}

async fn append(&self, location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
let full_path = self.full_path(location);
self.inner.append(&full_path).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
Expand Down
4 changes: 0 additions & 4 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
Err(super::Error::NotImplemented)
}

async fn append(&self, _location: &Path) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
Err(super::Error::NotImplemented)
}

async fn get(&self, location: &Path) -> Result<GetResult> {
sleep(self.config().wait_get_per_call).await;

Expand Down

0 comments on commit 94fe6bb

Please sign in to comment.