From c62a6780f0693408fe2aaf5c7f996796ed628178 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 24 Oct 2023 11:52:43 +0100 Subject: [PATCH] Conditional Put (#4879) --- object_store/src/aws/mod.rs | 11 ++- object_store/src/azure/client.rs | 38 +++++++-- object_store/src/azure/mod.rs | 8 +- object_store/src/chunked.rs | 7 +- object_store/src/gcp/client.rs | 67 +++++++++++---- object_store/src/gcp/mod.rs | 8 +- object_store/src/http/client.rs | 27 +++++- object_store/src/http/mod.rs | 7 +- object_store/src/lib.rs | 119 ++++++++++++++++++++++++++- object_store/src/limit.rs | 6 +- object_store/src/local.rs | 47 ++++++++--- object_store/src/memory.rs | 66 +++++++++++++-- object_store/src/prefix.rs | 8 +- object_store/src/throttle.rs | 9 +- object_store/tests/get_range_file.rs | 32 +++---- 15 files changed, 375 insertions(+), 85 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 2254fb54cedb..1a64e6aa0876 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -47,8 +47,8 @@ use crate::client::CredentialProvider; use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; use crate::signer::Signer; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult, - Result, + Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode, + PutOptions, PutResult, Result, }; mod builder; @@ -158,8 +158,11 @@ impl Signer for AmazonS3 { #[async_trait] impl ObjectStore for AmazonS3 { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.client.put_request(location, bytes, &()).await + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + match opts.mode { + PutMode::Overwrite => self.client.put_request(location, bytes, &()).await, + PutMode::Create | PutMode::Update(_) => Err(Error::NotImplemented), + } } async fn put_multipart( diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index e99cfb0ce6bd..94d81cca2f93 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -27,7 +27,8 @@ use crate::multipart::PartId; use crate::path::DELIMITER; use crate::util::deserialize_rfc1123; use crate::{ - ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig, + ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult, + Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; @@ -37,11 +38,11 @@ use chrono::{DateTime, Utc}; use itertools::Itertools; use reqwest::header::CONTENT_TYPE; use reqwest::{ - header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH}, + header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH}, Client as ReqwestClient, Method, Response, StatusCode, }; use serde::{Deserialize, Serialize}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::collections::HashMap; use std::sync::Arc; use url::Url; @@ -94,6 +95,9 @@ pub(crate) enum Error { Metadata { source: crate::client::header::Error, }, + + #[snafu(display("ETag required for conditional update"))] + MissingETag, } impl From for crate::Error { @@ -163,6 +167,7 @@ impl AzureClient { path: &Path, bytes: Option, is_block_op: bool, + opts: PutOptions, query: &T, ) -> Result { let credential = self.get_credential().await?; @@ -171,9 +176,7 @@ impl AzureClient { let mut builder = self.client.request(Method::PUT, url); if !is_block_op { - builder = builder.header(&BLOB_TYPE, "BlockBlob").query(query); - } else { - builder = builder.query(query); + builder = builder.header(&BLOB_TYPE, "BlockBlob"); } if let Some(value) = self.config().client_options.get_content_type(path) { @@ -188,7 +191,14 @@ impl AzureClient { builder = builder.header(CONTENT_LENGTH, HeaderValue::from_static("0")); } + builder = match opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(IF_NONE_MATCH, "*"), + PutMode::Update(v) => builder.header(IF_MATCH, v.e_tag.context(MissingETagSnafu)?), + }; + let response = builder + .query(query) .with_azure_authorization(&credential, &self.config.account) .send_retry(&self.config.retry_config) .await @@ -200,8 +210,11 @@ impl AzureClient { } /// Make an Azure PUT request - pub async fn put_blob(&self, path: &Path, bytes: Bytes) -> Result { - let response = self.put_request(path, Some(bytes), false, &()).await?; + pub async fn put_blob(&self, path: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let response = self + .put_request(path, Some(bytes), false, opts, &()) + .await?; + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) } @@ -214,6 +227,7 @@ impl AzureClient { path, Some(data), true, + PutOptions::default(), &[ ("comp", "block"), ("blockid", &BASE64_STANDARD.encode(block_id)), @@ -235,7 +249,13 @@ impl AzureClient { let block_xml = block_list.to_xml(); let response = self - .put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")]) + .put_request( + path, + Some(block_xml.into()), + true, + PutOptions::default(), + &[("comp", "blocklist")], + ) .await?; Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 4b6e4fa44253..762a51dd9d60 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -29,7 +29,8 @@ use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -81,8 +82,8 @@ impl std::fmt::Display for MicrosoftAzure { #[async_trait] impl ObjectStore for MicrosoftAzure { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - Ok(self.client.put_blob(location, bytes).await?) + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.client.put_blob(location, bytes, opts).await } async fn put_multipart( @@ -199,6 +200,7 @@ mod tests { rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; stream_get(&integration).await; + put_opts(&integration, true).await; multipart(&integration, &integration).await; } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index 021f9f50156b..d33556f4b12e 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -29,7 +29,8 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult, + GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, + PutResult, }; use crate::{MultipartId, Result}; @@ -62,8 +63,8 @@ impl Display for ChunkedStore { #[async_trait] impl ObjectStore for ChunkedStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.inner.put(location, bytes).await + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.inner.put_opts(location, bytes, opts).await } async fn put_multipart( diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 5c3936c73fe4..98cb1704cb45 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -24,16 +24,22 @@ use crate::client::GetOptionsExt; use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; -use crate::{ClientOptions, GetOptions, ListResult, MultipartId, PutResult, Result, RetryConfig}; +use crate::{ + ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result, + RetryConfig, +}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::{header, Client, Method, Response, StatusCode}; use serde::Serialize; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; const VERSION_HEADER: &str = "x-goog-generation"; + +const VERSION_MATCH: &str = "x-goog-if-generation-match"; + #[derive(Debug, Snafu)] enum Error { #[snafu(display("Error performing list request: {}", source))] @@ -79,6 +85,9 @@ enum Error { Metadata { source: crate::client::header::Error, }, + + #[snafu(display("Version required for conditional update"))] + MissingVersion, } impl From for crate::Error { @@ -157,6 +166,7 @@ impl GoogleCloudStorageClient { &self, path: &Path, payload: Bytes, + opts: PutOptions, query: &T, ) -> Result { let credential = self.get_credential().await?; @@ -168,21 +178,44 @@ impl GoogleCloudStorageClient { .get_content_type(path) .unwrap_or("application/octet-stream"); - let response = self - .client - .request(Method::PUT, url) - .query(query) + let mut builder = self.client.request(Method::PUT, url).query(query); + + builder = match &opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(VERSION_MATCH, 0), + PutMode::Update(v) => builder.header( + VERSION_MATCH, + v.version.as_ref().context(MissingVersionSnafu)?, + ), + }; + + let result = builder .bearer_auth(&credential.bearer) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, payload.len()) .body(payload) .send_retry(&self.config.retry_config) - .await - .context(PutRequestSnafu { - path: path.as_ref(), - })?; - - Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + .await; + + match result { + Ok(response) => { + Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?) + } + Err(source) => { + return Err(match (opts.mode, source.status()) { + (PutMode::Create, Some(StatusCode::PRECONDITION_FAILED)) => { + crate::Error::AlreadyExists { + source: Box::new(source), + path: path.to_string(), + } + } + _ => crate::Error::from(Error::PutRequest { + source, + path: path.to_string(), + }), + }); + } + } } /// Perform a put part request @@ -199,6 +232,7 @@ impl GoogleCloudStorageClient { .put_request( path, data, + PutOptions::default(), &[ ("partNumber", &format!("{}", part_idx + 1)), ("uploadId", upload_id), @@ -336,7 +370,7 @@ impl GoogleCloudStorageClient { .header("x-goog-copy-source", source); if if_not_exists { - builder = builder.header("x-goog-if-generation-match", 0); + builder = builder.header(VERSION_MATCH, 0); } builder @@ -377,13 +411,18 @@ impl GetClient for GoogleCloudStorageClient { false => Method::GET, }; - let mut request = self.client.request(method, url).with_get_options(options); + let mut request = self.client.request(method, url); + + if let Some(version) = &options.version { + request = request.query(&[("generation", version)]); + } if !credential.bearer.is_empty() { request = request.bearer_auth(&credential.bearer); } let response = request + .with_get_options(options) .send_retry(&self.config.retry_config) .await .context(GetRequestSnafu { diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index d3a84f838c03..43d1162adb43 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -35,7 +35,8 @@ use crate::client::CredentialProvider; use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::Path, - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -107,8 +108,8 @@ impl PutPart for GCSMultipartUpload { #[async_trait] impl ObjectStore for GoogleCloudStorage { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.client.put_request(location, bytes, &()).await + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + self.client.put_request(location, bytes, opts, &()).await } async fn put_multipart( @@ -220,6 +221,7 @@ mod test { multipart(&integration, &integration).await; // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; + put_opts(&integration, true).await; } } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index a7dbdfcbe844..dd90b0da5552 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -21,12 +21,12 @@ use crate::client::retry::{self, RetryConfig, RetryExt}; use crate::client::GetOptionsExt; use crate::path::{Path, DELIMITER}; use crate::util::deserialize_rfc1123; -use crate::{ClientOptions, GetOptions, ObjectMeta, Result}; +use crate::{ClientOptions, GetOptions, ObjectMeta, PutMode, PutOptions, Result}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use percent_encoding::percent_decode_str; -use reqwest::header::CONTENT_TYPE; +use reqwest::header::{CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}; use reqwest::{Method, Response, StatusCode}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; @@ -69,6 +69,9 @@ enum Error { path: String, source: crate::path::Error, }, + + #[snafu(display("ETag required for conditional update"))] + MissingETag, } impl From for crate::Error { @@ -156,7 +159,7 @@ impl Client { Ok(()) } - pub async fn put(&self, location: &Path, bytes: Bytes) -> Result { + pub async fn put(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { let mut retry = false; loop { let url = self.path_url(location); @@ -165,6 +168,14 @@ impl Client { builder = builder.header(CONTENT_TYPE, value); } + let builder = match &opts.mode { + PutMode::Overwrite => builder, + PutMode::Create => builder.header(IF_NONE_MATCH, "*"), + PutMode::Update(v) => { + builder.header(IF_MATCH, v.e_tag.as_ref().context(MissingETagSnafu)?) + } + }; + match builder.send_retry(&self.retry_config).await { Ok(response) => return Ok(response), Err(source) => match source.status() { @@ -173,6 +184,12 @@ impl Client { retry = true; self.create_parent_directories(location).await? } + Some(StatusCode::NOT_MODIFIED) if matches!(opts.mode, PutMode::Create) => { + return Err(crate::Error::AlreadyExists { + path: location.to_string(), + source: Box::new(source), + }) + } _ => return Err(Error::Request { source }.into()), }, } @@ -243,6 +260,10 @@ impl Client { .header("Destination", self.path_url(to).as_str()); if !overwrite { + // While the Overwrite header appears to duplicate + // the functionality of the If-Match: * header of HTTP/1.1, If-Match + // applies only to the Request-URI, and not to the Destination of a COPY + // or MOVE. builder = builder.header("Overwrite", "F"); } diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index 3929af9fd1f9..abe03e061746 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -46,7 +46,7 @@ use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, - ObjectStore, PutResult, Result, RetryConfig, + ObjectStore, PutOptions, PutResult, Result, RetryConfig, }; mod client; @@ -96,8 +96,8 @@ impl std::fmt::Display for HttpStore { #[async_trait] impl ObjectStore for HttpStore { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let response = self.client.put(location, bytes).await?; + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let response = self.client.put(location, bytes, opts).await?; let e_tag = match get_etag(response.headers()) { Ok(e_tag) => Some(e_tag), Err(crate::client::header::Error::MissingEtag) => None, @@ -264,5 +264,6 @@ mod tests { list_with_delimiter(&integration).await; rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; + put_opts(&integration, true).await; } } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 6d53c3e1afd0..a7d402de8f51 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -299,7 +299,12 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// The operation is guaranteed to be atomic, it will either successfully /// write the entirety of `bytes` to `location`, or fail. No clients /// should be able to observe a partially written object - async fn put(&self, location: &Path, bytes: Bytes) -> Result; + async fn put(&self, location: &Path, bytes: Bytes) -> Result { + self.put_opts(location, bytes, PutOptions::default()).await + } + + /// Save the provided bytes to the specified location with the given options + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result; /// Get a multi-part upload that allows writing data in chunks. /// @@ -531,6 +536,15 @@ macro_rules! as_ref_impl { self.as_ref().put(location, bytes).await } + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + opts: PutOptions, + ) -> Result { + self.as_ref().put_opts(location, bytes, opts).await + } + async fn put_multipart( &self, location: &Path, @@ -837,6 +851,56 @@ impl GetResult { } } +/// Configure preconditions for the put operation +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub enum PutMode { + /// Perform an atomic write operation, overwriting any object present at the provided path + #[default] + Overwrite, + /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an + /// object already exists at the provided path + Create, + /// Perform an atomic write operation if the current version of the object matches the + /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise + Update(UpdateVersion), +} + +/// Uniquely identifies a version of an object to update +/// +/// Stores will use differing combinations of `e_tag` and `version` to provide conditional +/// updates, and it is therefore recommended applications preserve both +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UpdateVersion { + /// The unique identifier for the newly created object + /// + /// + pub e_tag: Option, + /// A version indicator for the newly created object + pub version: Option, +} + +impl From for UpdateVersion { + fn from(value: PutResult) -> Self { + Self { + e_tag: value.e_tag, + version: value.version, + } + } +} + +/// Options for a put request +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct PutOptions { + /// Configure the [`PutMode`] for this operation + pub mode: PutMode, +} + +impl From for PutOptions { + fn from(mode: PutMode) -> Self { + Self { mode } + } +} + /// Result for a put request #[derive(Debug, Clone, PartialEq, Eq)] pub struct PutResult { @@ -1408,7 +1472,7 @@ mod tests { // Can retrieve previous version let get_opts = storage.get_opts(&path, options).await.unwrap(); let old = get_opts.bytes().await.unwrap(); - assert_eq!(old, b"foo".as_slice()); + assert_eq!(old, b"test".as_slice()); // Current version contains the updated data let current = storage.get(&path).await.unwrap().bytes().await.unwrap(); @@ -1416,6 +1480,57 @@ mod tests { } } + pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + let path = Path::from("put_opts"); + let v1 = storage + .put_opts(&path, "a".into(), PutMode::Create.into()) + .await + .unwrap(); + + let err = storage + .put_opts(&path, "b".into(), PutMode::Create.into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::AlreadyExists { .. }), "{err}"); + + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(b.as_ref(), b"a"); + + if !supports_update { + return; + } + + let v2 = storage + .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into()) + .await + .unwrap(); + + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(b.as_ref(), b"c"); + + let err = storage + .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + storage + .put_opts(&path, "e".into(), PutMode::Update(v2.clone().into()).into()) + .await + .unwrap(); + + let b = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(b.as_ref(), b"e"); + + // Update not exists + let path = Path::from("I don't exist"); + let err = storage + .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into()) + .await + .unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + } + /// Returns a chunk of length `chunk_length` fn get_chunk(chunk_length: usize) -> Bytes { let mut data = vec![0_u8; chunk_length]; diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index cd01a964dc3e..39cc605c4768 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -19,7 +19,7 @@ use crate::{ BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, - ObjectStore, Path, PutResult, Result, StreamExt, + ObjectStore, Path, PutOptions, PutResult, Result, StreamExt, }; use async_trait::async_trait; use bytes::Bytes; @@ -77,6 +77,10 @@ impl ObjectStore for LimitStore { self.inner.put(location, bytes).await } + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.put_opts(location, bytes, opts).await + } async fn put_multipart( &self, location: &Path, diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 3df486d506ae..919baf71b0a8 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -20,7 +20,7 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, - PutResult, Result, + PutMode, PutOptions, PutResult, Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -271,20 +271,44 @@ impl Config { #[async_trait] impl ObjectStore for LocalFileSystem { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + if matches!(opts.mode, PutMode::Update(_)) { + return Err(crate::Error::NotImplemented); + } + let path = self.config.path_to_filesystem(location)?; maybe_spawn_blocking(move || { let (mut file, suffix) = new_staged_upload(&path)?; let staging_path = staged_upload_path(&path, &suffix); - file.write_all(&bytes) - .context(UnableToCopyDataToFileSnafu) - .and_then(|_| { - std::fs::rename(&staging_path, &path).context(UnableToRenameFileSnafu) - }) - .map_err(|e| { - let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup - e - })?; + + let err = match file.write_all(&bytes) { + Ok(_) => match opts.mode { + PutMode::Overwrite => match std::fs::rename(&staging_path, &path) { + Ok(_) => None, + Err(source) => Some(Error::UnableToRenameFile { source }), + }, + PutMode::Create => match std::fs::hard_link(&staging_path, &path) { + Ok(_) => { + let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup + None + } + Err(source) => match source.kind() { + ErrorKind::AlreadyExists => Some(Error::AlreadyExists { + path: path.to_str().unwrap().to_string(), + source, + }), + _ => Some(Error::UnableToRenameFile { source }), + }, + }, + PutMode::Update(_) => unreachable!(), + }, + Err(source) => Some(Error::UnableToCopyDataToFile { source }), + }; + + if let Some(err) = err { + let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup + return Err(err.into()); + } let metadata = file.metadata().map_err(|e| Error::Metadata { source: e.into(), @@ -1055,6 +1079,7 @@ mod tests { rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; stream_get(&integration).await; + put_opts(&integration, false).await; } #[test] diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 3401ceb7c463..9d79a798ad1f 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -17,7 +17,8 @@ //! An in-memory object store implementation use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult, Result, + path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutMode, + PutOptions, PutResult, Result, UpdateVersion, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -52,6 +53,9 @@ enum Error { #[snafu(display("Object already exists at that location: {path}"))] AlreadyExists { path: String }, + + #[snafu(display("ETag required for conditional update"))] + MissingETag, } impl From for super::Error { @@ -110,9 +114,50 @@ impl Storage { let etag = self.next_etag; self.next_etag += 1; let entry = Entry::new(bytes, Utc::now(), etag); - self.map.insert(location.clone(), entry); + self.overwrite(location, entry); etag } + + fn overwrite(&mut self, location: &Path, entry: Entry) { + self.map.insert(location.clone(), entry); + } + + fn create(&mut self, location: &Path, entry: Entry) -> Result<()> { + use std::collections::btree_map; + match self.map.entry(location.clone()) { + btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists { + path: location.to_string(), + } + .into()), + btree_map::Entry::Vacant(v) => { + v.insert(entry); + Ok(()) + } + } + } + + fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> { + match self.map.get_mut(location) { + // Return Precondition instead of NotFound for consistency with stores + None => Err(crate::Error::Precondition { + path: location.to_string(), + source: format!("Object at location {location} not found").into(), + }), + Some(e) => { + let existing = e.e_tag.to_string(); + let expected = v.e_tag.context(MissingETagSnafu)?; + if existing == expected { + *e = entry; + Ok(()) + } else { + Err(crate::Error::Precondition { + path: location.to_string(), + source: format!("{existing} does not match {expected}").into(), + }) + } + } + } + } } impl std::fmt::Display for InMemory { @@ -123,8 +168,18 @@ impl std::fmt::Display for InMemory { #[async_trait] impl ObjectStore for InMemory { - async fn put(&self, location: &Path, bytes: Bytes) -> Result { - let etag = self.storage.write().insert(location, bytes); + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let mut storage = self.storage.write(); + let etag = storage.next_etag; + let entry = Entry::new(bytes, Utc::now(), etag); + + match opts.mode { + PutMode::Overwrite => storage.overwrite(location, entry), + PutMode::Create => storage.create(location, entry)?, + PutMode::Update(v) => storage.update(location, v, entry)?, + } + storage.next_etag += 1; + Ok(PutResult { e_tag: Some(etag.to_string()), version: None, @@ -426,7 +481,7 @@ impl AsyncWrite for InMemoryAppend { fn poll_shutdown( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { self.poll_flush(cx) } } @@ -450,6 +505,7 @@ mod tests { rename_and_copy(&integration).await; copy_if_not_exists(&integration).await; stream_get(&integration).await; + put_opts(&integration, true).await; } #[tokio::test] diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index b5bff8b12dd7..68101307fbdf 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -23,7 +23,8 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result, }; #[doc(hidden)] @@ -85,6 +86,11 @@ impl ObjectStore for PrefixStore { self.inner.put(&full_path, bytes).await } + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + let full_path = self.full_path(location); + self.inner.put_opts(&full_path, bytes, opts).await + } + async fn put_multipart( &self, location: &Path, diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index c5521256b8a6..dcd2c04bcf05 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -21,7 +21,8 @@ use std::ops::Range; use std::{convert::TryInto, sync::Arc}; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult, Result, + path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions, + PutResult, Result, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -149,10 +150,14 @@ impl std::fmt::Display for ThrottledStore { impl ObjectStore for ThrottledStore { async fn put(&self, location: &Path, bytes: Bytes) -> Result { sleep(self.config().wait_put_per_call).await; - self.inner.put(location, bytes).await } + async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result { + sleep(self.config().wait_put_per_call).await; + self.inner.put_opts(location, bytes, opts).await + } + async fn put_multipart( &self, _location: &Path, diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index 3fa1cc7104b3..85231a5a5b9b 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -22,9 +22,7 @@ use bytes::Bytes; use futures::stream::BoxStream; use object_store::local::LocalFileSystem; use object_store::path::Path; -use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, -}; +use object_store::*; use std::fmt::Formatter; use tempfile::tempdir; use tokio::io::AsyncWrite; @@ -40,50 +38,42 @@ impl std::fmt::Display for MyStore { #[async_trait] impl ObjectStore for MyStore { - async fn put(&self, path: &Path, data: Bytes) -> object_store::Result { - self.0.put(path, data).await + async fn put_opts(&self, path: &Path, data: Bytes, opts: PutOptions) -> Result { + self.0.put_opts(path, data, opts).await } async fn put_multipart( &self, _: &Path, - ) -> object_store::Result<(MultipartId, Box)> { + ) -> Result<(MultipartId, Box)> { todo!() } - async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> object_store::Result<()> { + async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> { todo!() } - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> object_store::Result { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.0.get_opts(location, options).await } - async fn head(&self, _: &Path) -> object_store::Result { - todo!() - } - - async fn delete(&self, _: &Path) -> object_store::Result<()> { + async fn delete(&self, _: &Path) -> Result<()> { todo!() } - fn list(&self, _: Option<&Path>) -> BoxStream<'_, object_store::Result> { + fn list(&self, _: Option<&Path>) -> BoxStream<'_, Result> { todo!() } - async fn list_with_delimiter(&self, _: Option<&Path>) -> object_store::Result { + async fn list_with_delimiter(&self, _: Option<&Path>) -> Result { todo!() } - async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> { + async fn copy(&self, _: &Path, _: &Path) -> Result<()> { todo!() } - async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> object_store::Result<()> { + async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> Result<()> { todo!() } }