Skip to content

Commit

Permalink
fix: object store http header last modified (apache#4834)
Browse files Browse the repository at this point in the history
* fix: object store http header last modified

* refactor: make headermeta configurable on required fields

* Update object_store/src/client/header.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* Update object_store/src/client/header.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* Update object_store/src/client/header.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
2 people authored and Ryan Aston committed Nov 6, 2023
1 parent 5d1a4b0 commit 1b2238c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
12 changes: 7 additions & 5 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
let meta =
header_meta(location, response.headers()).map_err(|e| Error::Generic {
let meta = header_meta(location, response.headers(), Default::default())
.map_err(|e| Error::Generic {
store: T::STORE,
source: Box::new(e),
})?;
Expand All @@ -73,9 +73,11 @@ impl<T: GetClient> GetClientExt for T {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.get_request(location, options, true).await?;
header_meta(location, response.headers()).map_err(|e| Error::Generic {
store: T::STORE,
source: Box::new(e),
header_meta(location, response.headers(), Default::default()).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})
}
}
62 changes: 48 additions & 14 deletions object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,33 @@
use crate::path::Path;
use crate::ObjectMeta;
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
use hyper::HeaderMap;
use snafu::{OptionExt, ResultExt, Snafu};

#[derive(Debug)]
/// Configuration for header extraction
pub struct HeaderConfig {
/// Whether to require an ETag header when extracting [`ObjectMeta`] from headers.
///
/// Defaults to `true`
pub etag_required: bool,
/// Whether to require a Last-Modified header when extracting [`ObjectMeta`] from headers.
///
/// Defaults to `true`
pub last_modified_required: bool,
}

impl Default for HeaderConfig {
fn default() -> Self {
Self {
etag_required: true,
last_modified_required: true,
}
}
}

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("ETag Header missing from response"))]
Expand Down Expand Up @@ -52,32 +74,44 @@ pub enum Error {
}

/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta, Error> {
let last_modified = headers
.get(LAST_MODIFIED)
.context(MissingLastModifiedSnafu)?;
pub fn header_meta(
location: &Path,
headers: &HeaderMap,
cfg: HeaderConfig,
) -> Result<ObjectMeta, Error> {
let last_modified = match headers.get(LAST_MODIFIED) {
Some(last_modified) => {
let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
DateTime::parse_from_rfc2822(last_modified)
.context(InvalidLastModifiedSnafu { last_modified })?
.with_timezone(&Utc)
}
None if cfg.last_modified_required => return Err(Error::MissingLastModified),
None => Utc.timestamp_nanos(0),
};

let e_tag = match headers.get(ETAG) {
Some(e_tag) => {
let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
Some(e_tag.to_string())
}
None if cfg.etag_required => return Err(Error::MissingEtag),
None => None,
};

let content_length = headers
.get(CONTENT_LENGTH)
.context(MissingContentLengthSnafu)?;

let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
let last_modified = DateTime::parse_from_rfc2822(last_modified)
.context(InvalidLastModifiedSnafu { last_modified })?
.with_timezone(&Utc);

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
e_tag,
})
}
9 changes: 7 additions & 2 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::header::header_meta;
use crate::client::header::{header_meta, HeaderConfig};
use crate::http::client::Client;
use crate::path::Path;
use crate::{
Expand Down Expand Up @@ -117,7 +117,12 @@ impl ObjectStore for HttpStore {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.client.get(location, options).await?;
let meta = header_meta(location, response.headers()).context(MetadataSnafu)?;
let cfg = HeaderConfig {
last_modified_required: false,
etag_required: false,
};
let meta =
header_meta(location, response.headers(), cfg).context(MetadataSnafu)?;

let stream = response
.bytes_stream()
Expand Down

0 comments on commit 1b2238c

Please sign in to comment.