Skip to content

Commit

Permalink
Add ObjectMeta::version and GetOptions::version (#4925)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 13, 2023
1 parent c6387c1 commit db513d5
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 16 deletions.
13 changes: 12 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::aws::{
AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
Expand Down Expand Up @@ -553,6 +554,12 @@ impl S3Client {
impl GetClient for S3Client {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: Some("x-amz-version-id"),
};

/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
async fn get_request(
&self,
Expand All @@ -567,7 +574,11 @@ impl GetClient for S3Client {
false => Method::GET,
};

let builder = self.client.request(method, url);
let mut builder = self.client.request(method, url);

if let Some(v) = &options.version {
builder = builder.query(&[("versionId", v)])
}

let response = builder
.with_get_options(options)
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
Expand Down Expand Up @@ -261,6 +262,12 @@ impl AzureClient {
impl GetClient for AzureClient {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
version_header: Some("x-ms-version-id"),
};

/// Make an Azure GET request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
Expand All @@ -277,12 +284,16 @@ impl GetClient for AzureClient {
false => Method::GET,
};

let builder = self
let mut builder = self
.client
.request(method, url)
.header(CONTENT_LENGTH, HeaderValue::from_static("0"))
.body(Bytes::new());

if let Some(v) = &options.version {
builder = builder.query(&[("versionid", v)])
}

let response = builder
.with_get_options(options)
.with_azure_authorization(&credential, &self.config.account)
Expand Down Expand Up @@ -442,6 +453,7 @@ impl TryFrom<Blob> for ObjectMeta {
last_modified: value.properties.last_modified,
size: value.properties.content_length as usize,
e_tag: value.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
}
}
Expand Down
5 changes: 1 addition & 4 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ pub trait GetClient: Send + Sync + 'static {
const STORE: &'static str;

/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
};
const HEADER_CONFIG: HeaderConfig;

async fn get_request(
&self,
Expand Down
13 changes: 11 additions & 2 deletions object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct HeaderConfig {
///
/// Defaults to `true`
pub last_modified_required: bool,

/// The version header name if any
pub version_header: Option<&'static str>,
}

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -95,14 +98,20 @@ pub fn header_meta(
.context(MissingContentLengthSnafu)?;

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
let size = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let version = match cfg.version_header.and_then(|h| headers.get(h)) {
Some(v) => Some(v.to_str().context(BadHeaderSnafu)?.to_string()),
None => None,
};

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
version,
size,
e_tag,
})
}
1 change: 1 addition & 0 deletions object_store/src/client/list_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl TryFrom<ListContents> for ObjectMeta {
last_modified: value.last_modified,
size: value.size,
e_tag: value.e_tag,
version: None,
})
}
}
27 changes: 19 additions & 8 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const STORE: &str = "GCS";

/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = GcpCredential>>;
use crate::client::header::HeaderConfig;
pub use credential::GcpCredential;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -388,6 +389,12 @@ impl GoogleCloudStorageClient {
impl GetClient for GoogleCloudStorageClient {
const STORE: &'static str = STORE;

const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: true,
last_modified_required: true,
version_header: Some("x-goog-generation"),
};

/// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
async fn get_request(
&self,
Expand All @@ -403,19 +410,23 @@ impl GetClient for GoogleCloudStorageClient {
false => Method::GET,
};

let mut request = self.client.request(method, url).with_get_options(options);
let mut request = self.client.request(method, url);

if let Some(v) = &options.version {
request = request.query(&[("generation", v)]);
}

if !credential.bearer.is_empty() {
request = request.bearer_auth(&credential.bearer);
}

let response =
request
.send_retry(&self.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
let response = request
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;

Ok(response)
}
Expand Down
2 changes: 2 additions & 0 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ impl GetClient for Client {
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
};

async fn get_request(
Expand Down Expand Up @@ -389,6 +390,7 @@ impl MultiStatusResponse {
last_modified,
size: self.size()?,
e_tag: self.prop_stat.prop.e_tag.clone(),
version: None,
})
}

Expand Down
22 changes: 22 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ pub struct ObjectMeta {
pub size: usize,
/// The unique identifier for the object
pub e_tag: Option<String>,
/// A version indicator for this object
pub version: Option<String>,
}

/// Options for a get request, such as range
Expand Down Expand Up @@ -714,6 +716,8 @@ pub struct GetOptions {
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
/// Request a particular object version
pub version: Option<String>,
}

impl GetOptions {
Expand Down Expand Up @@ -1374,6 +1378,24 @@ mod tests {
};
storage.get_opts(&path, options).await.unwrap();
}

if let Some(version) = meta.version {
storage.put(&path, "bar".into()).await.unwrap();

let options = GetOptions {
version: Some(version),
..GetOptions::default()
};

// Can retrieve previous version
let get_opts = storage.get_opts(&path, options).await.unwrap();
let old = get_opts.bytes().await.unwrap();
assert_eq!(old, b"foo".as_slice());

// Current version contains the updated data
let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(&current, b"bar".as_slice());
}
}

/// Returns a chunk of length `chunk_length`
Expand Down
1 change: 1 addition & 0 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
last_modified,
size,
e_tag: None,
version: None,
})
}

Expand Down
4 changes: 4 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ impl ObjectStore for InMemory {
last_modified,
size: data.len(),
e_tag: None,
version: None,
};

let (range, data) = match options.range {
Expand Down Expand Up @@ -191,6 +192,7 @@ impl ObjectStore for InMemory {
last_modified: entry.1,
size: entry.0.len(),
e_tag: None,
version: None,
})
}

Expand Down Expand Up @@ -222,6 +224,7 @@ impl ObjectStore for InMemory {
last_modified: value.1,
size: value.0.len(),
e_tag: None,
version: None,
})
})
.collect();
Expand Down Expand Up @@ -266,6 +269,7 @@ impl ObjectStore for InMemory {
last_modified: v.1,
size: v.0.len(),
e_tag: None,
version: None,
};
objects.push(object);
}
Expand Down
1 change: 1 addition & 0 deletions object_store/src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl<T: ObjectStore> PrefixStore<T> {
size: meta.size,
location: self.strip_prefix(meta.location),
e_tag: meta.e_tag,
version: None,
}
}
}
Expand Down

0 comments on commit db513d5

Please sign in to comment.