Skip to content

Commit

Permalink
Conditional Put (apache#4879)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 25, 2023
1 parent ae2222d commit c62a678
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 85 deletions.
11 changes: 7 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use crate::client::CredentialProvider;
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::signer::Signer;
use crate::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult,
Result,
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode,
PutOptions, PutResult, Result,
};

mod builder;
Expand Down Expand Up @@ -158,8 +158,11 @@ impl Signer for AmazonS3 {

#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
self.client.put_request(location, bytes, &()).await
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
match opts.mode {
PutMode::Overwrite => self.client.put_request(location, bytes, &()).await,
PutMode::Create | PutMode::Update(_) => Err(Error::NotImplemented),
}
}

async fn put_multipart(
Expand Down
38 changes: 29 additions & 9 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig,
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult,
Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
Expand All @@ -37,11 +38,11 @@ use chrono::{DateTime, Utc};
use itertools::Itertools;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH},
Client as ReqwestClient, Method, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
Expand Down Expand Up @@ -94,6 +95,9 @@ pub(crate) enum Error {
Metadata {
source: crate::client::header::Error,
},

#[snafu(display("ETag required for conditional update"))]
MissingETag,
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -163,6 +167,7 @@ impl AzureClient {
path: &Path,
bytes: Option<Bytes>,
is_block_op: bool,
opts: PutOptions,
query: &T,
) -> Result<Response> {
let credential = self.get_credential().await?;
Expand All @@ -171,9 +176,7 @@ impl AzureClient {
let mut builder = self.client.request(Method::PUT, url);

if !is_block_op {
builder = builder.header(&BLOB_TYPE, "BlockBlob").query(query);
} else {
builder = builder.query(query);
builder = builder.header(&BLOB_TYPE, "BlockBlob");
}

if let Some(value) = self.config().client_options.get_content_type(path) {
Expand All @@ -188,7 +191,14 @@ impl AzureClient {
builder = builder.header(CONTENT_LENGTH, HeaderValue::from_static("0"));
}

builder = match opts.mode {
PutMode::Overwrite => builder,
PutMode::Create => builder.header(IF_NONE_MATCH, "*"),
PutMode::Update(v) => builder.header(IF_MATCH, v.e_tag.context(MissingETagSnafu)?),
};

let response = builder
.query(query)
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
Expand All @@ -200,8 +210,11 @@ impl AzureClient {
}

/// Make an Azure PUT request <https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>
pub async fn put_blob(&self, path: &Path, bytes: Bytes) -> Result<PutResult> {
let response = self.put_request(path, Some(bytes), false, &()).await?;
pub async fn put_blob(&self, path: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let response = self
.put_request(path, Some(bytes), false, opts, &())
.await?;

Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
}

Expand All @@ -214,6 +227,7 @@ impl AzureClient {
path,
Some(data),
true,
PutOptions::default(),
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
Expand All @@ -235,7 +249,13 @@ impl AzureClient {
let block_xml = block_list.to_xml();

let response = self
.put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")])
.put_request(
path,
Some(block_xml.into()),
true,
PutOptions::default(),
&[("comp", "blocklist")],
)
.await?;

Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
Expand Down
8 changes: 5 additions & 3 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Result,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -81,8 +82,8 @@ impl std::fmt::Display for MicrosoftAzure {

#[async_trait]
impl ObjectStore for MicrosoftAzure {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
Ok(self.client.put_blob(location, bytes).await?)
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
self.client.put_blob(location, bytes, opts).await
}

async fn put_multipart(
Expand Down Expand Up @@ -199,6 +200,7 @@ mod tests {
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
}

Expand Down
7 changes: 4 additions & 3 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use tokio::io::AsyncWrite;

use crate::path::Path;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutResult,
GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions,
PutResult,
};
use crate::{MultipartId, Result};

Expand Down Expand Up @@ -62,8 +63,8 @@ impl Display for ChunkedStore {

#[async_trait]
impl ObjectStore for ChunkedStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
self.inner.put(location, bytes).await
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}

async fn put_multipart(
Expand Down
67 changes: 53 additions & 14 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@ use crate::client::GetOptionsExt;
use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
use crate::{ClientOptions, GetOptions, ListResult, MultipartId, PutResult, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result,
RetryConfig,
};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::{header, Client, Method, Response, StatusCode};
use serde::Serialize;
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;

const VERSION_HEADER: &str = "x-goog-generation";

const VERSION_MATCH: &str = "x-goog-if-generation-match";

#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Error performing list request: {}", source))]
Expand Down Expand Up @@ -79,6 +85,9 @@ enum Error {
Metadata {
source: crate::client::header::Error,
},

#[snafu(display("Version required for conditional update"))]
MissingVersion,
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -157,6 +166,7 @@ impl GoogleCloudStorageClient {
&self,
path: &Path,
payload: Bytes,
opts: PutOptions,
query: &T,
) -> Result<PutResult> {
let credential = self.get_credential().await?;
Expand All @@ -168,21 +178,44 @@ impl GoogleCloudStorageClient {
.get_content_type(path)
.unwrap_or("application/octet-stream");

let response = self
.client
.request(Method::PUT, url)
.query(query)
let mut builder = self.client.request(Method::PUT, url).query(query);

builder = match &opts.mode {
PutMode::Overwrite => builder,
PutMode::Create => builder.header(VERSION_MATCH, 0),
PutMode::Update(v) => builder.header(
VERSION_MATCH,
v.version.as_ref().context(MissingVersionSnafu)?,
),
};

let result = builder
.bearer_auth(&credential.bearer)
.header(header::CONTENT_TYPE, content_type)
.header(header::CONTENT_LENGTH, payload.len())
.body(payload)
.send_retry(&self.config.retry_config)
.await
.context(PutRequestSnafu {
path: path.as_ref(),
})?;

Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
.await;

match result {
Ok(response) => {
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
}
Err(source) => {
return Err(match (opts.mode, source.status()) {
(PutMode::Create, Some(StatusCode::PRECONDITION_FAILED)) => {
crate::Error::AlreadyExists {
source: Box::new(source),
path: path.to_string(),
}
}
_ => crate::Error::from(Error::PutRequest {
source,
path: path.to_string(),
}),
});
}
}
}

/// Perform a put part request <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
Expand All @@ -199,6 +232,7 @@ impl GoogleCloudStorageClient {
.put_request(
path,
data,
PutOptions::default(),
&[
("partNumber", &format!("{}", part_idx + 1)),
("uploadId", upload_id),
Expand Down Expand Up @@ -336,7 +370,7 @@ impl GoogleCloudStorageClient {
.header("x-goog-copy-source", source);

if if_not_exists {
builder = builder.header("x-goog-if-generation-match", 0);
builder = builder.header(VERSION_MATCH, 0);
}

builder
Expand Down Expand Up @@ -377,13 +411,18 @@ impl GetClient for GoogleCloudStorageClient {
false => Method::GET,
};

let mut request = self.client.request(method, url).with_get_options(options);
let mut request = self.client.request(method, url);

if let Some(version) = &options.version {
request = request.query(&[("generation", version)]);
}

if !credential.bearer.is_empty() {
request = request.bearer_auth(&credential.bearer);
}

let response = request
.with_get_options(options)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
Expand Down
8 changes: 5 additions & 3 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::client::CredentialProvider;
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Result,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -107,8 +108,8 @@ impl PutPart for GCSMultipartUpload {

#[async_trait]
impl ObjectStore for GoogleCloudStorage {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
self.client.put_request(location, bytes, &()).await
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
self.client.put_request(location, bytes, opts, &()).await
}

async fn put_multipart(
Expand Down Expand Up @@ -220,6 +221,7 @@ mod test {
multipart(&integration, &integration).await;
// Fake GCS server doesn't currently honor preconditions
get_opts(&integration).await;
put_opts(&integration, true).await;
}
}

Expand Down
Loading

0 comments on commit c62a678

Please sign in to comment.