Skip to content

Commit

Permalink
Fix ObjectMeta::size for range requests (#5272)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 3, 2024
1 parent 2460c88 commit 5374c2f
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 24 deletions.
131 changes: 110 additions & 21 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
use snafu::{ensure, OptionExt, ResultExt, Snafu};

/// A client that can perform a get request
#[async_trait]
Expand All @@ -45,25 +50,109 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
payload: GetResultPayload::Stream(stream),
meta,
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})
}
}

struct ContentRange {
range: Range<usize>,
size: usize,
}

impl ContentRange {
fn from_str(s: &str) -> Option<Self> {
let rem = s.trim().strip_prefix("bytes ")?;
let (range, size) = rem.split_once('/')?;
let size = size.parse().ok()?;

let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;

Some(Self {
size,
range: start..end + 1,
})
}
}

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum GetResultError {
#[snafu(context(false))]
Header {
source: crate::client::header::Error,
},

#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

#[snafu(display("Content-Range header not present"))]
NoContentRange,

#[snafu(display("Failed to parse value for CONTENT_RANGE header: {value}"))]
ParseContentRange { value: String },

#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
},
}

fn get_result<T: GetClient>(
location: &Path,
range: Option<Range<usize>>,
response: Response,
) -> Result<GetResult, GetResultError> {
let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;

// ensure that we receive the range we asked for
let range = if let Some(expected) = range {
ensure!(
response.status() == StatusCode::PARTIAL_CONTENT,
NotPartialSnafu
);
let val = response
.headers()
.get(CONTENT_RANGE)
.context(NoContentRangeSnafu)?;

let value = val.to_str().context(InvalidContentRangeSnafu)?;
let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
let actual = value.range;

ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);

// Update size to reflect full size of object (#5272)
meta.size = value.size;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range,
meta,
payload: GetResultPayload::Stream(stream),
})
}
16 changes: 13 additions & 3 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,12 +1303,22 @@ mod tests {
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range.clone()));

let opts = GetOptions {
range: Some(2..5),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
assert_eq!(result.range, 2..5);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bit".as_ref());

let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range));

// Should be a non-fatal error
out_of_range_result.unwrap_err();

Expand Down

0 comments on commit 5374c2f

Please sign in to comment.