Skip to content

Commit

Permalink
object_store: full HTTP range support (#5222)
Browse files Browse the repository at this point in the history
* object_store: full HTTP range support

- Support suffix and offset ranges in GetOptions and get_opts
- Ensure that, if a range is requested, the response contains exactly
  that range

* object_store: review comments

- Use idiomatic snafu error handling
- fast-fail on azure suffix requests
- remove unused GetRange utilities

* Cleanup

* Further cleanup / fixes

* object_store: Display for GetRange includes bytes=

* Update object_store/src/util.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* Use size from ContentRange

* Update test

* Fix as_range

* Update test

* Tighten range validation logic

- Raise an error before the request is made if the range has <= 0
  bytes in it
- `GetRange::as_range` now handles more out-of-bounds cases, although in
  most cases these should result in a 416 from the server anyway.

* allow return of partial range

* Tweak docs and loosen suffix restrictions

* Fix Azure and Memory

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
3 people authored Jan 5, 2024
1 parent cf61bb8 commit 2f5dcdf
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 44 deletions.
10 changes: 9 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult,
Result, RetryConfig,
Expand Down Expand Up @@ -441,6 +441,14 @@ impl GetClient for AzureClient {
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
return Err(crate::Error::NotSupported {
source: "Azure does not support suffix range requests".into(),
});
}

let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
Expand Down
69 changes: 59 additions & 10 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
Expand Down Expand Up @@ -49,6 +49,12 @@ pub trait GetClientExt {
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
if let Some(r) = range.as_ref() {
r.is_valid().map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})?;
}
let response = self.get_request(location, options).await?;
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
Expand Down Expand Up @@ -94,6 +100,11 @@ enum GetResultError {
source: crate::client::header::Error,
},

#[snafu(context(false))]
InvalidRangeRequest {
source: crate::util::InvalidGetRange,
},

#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

Expand All @@ -115,7 +126,7 @@ enum GetResultError {

fn get_result<T: GetClient>(
location: &Path,
range: Option<Range<usize>>,
range: Option<GetRange>,
response: Response,
) -> Result<GetResult, GetResultError> {
let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;
Expand All @@ -135,21 +146,24 @@ fn get_result<T: GetClient>(
let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
let actual = value.range;

// Update size to reflect full size of object (#5272)
meta.size = value.size;

let expected = expected.as_range(meta.size)?;

ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);

// Update size to reflect full size of object (#5272)
meta.size = value.size;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
.map_err(|source| crate::Error::Generic {
store: T::STORE,
source: Box::new(source),
})
Expand Down Expand Up @@ -220,20 +234,22 @@ mod tests {
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 12);

let get_range = GetRange::from(2..3);

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/12"),
);
let res = get_result::<TestClient>(&path, Some(2..3), resp).unwrap();
let res = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap();
assert_eq!(res.meta.size, 12);
assert_eq!(res.range, 2..3);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 1);

let resp = make_response(12, Some(2..3), StatusCode::OK, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Received non-partial response when range requested"
Expand All @@ -245,7 +261,7 @@ mod tests {
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/12"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..3, got 2..4");

let resp = make_response(
Expand All @@ -254,17 +270,50 @@ mod tests {
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/*"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
);

let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Content-Range header not present in partial response"
);

let resp = make_response(
2,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/2"),
);
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"InvalidRangeRequest: Wanted range starting at 2, but object was only 2 bytes long"
);

let resp = make_response(
6,
Some(2..6),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-5/6"),
);
let res = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap();
assert_eq!(res.meta.size, 6);
assert_eq!(res.range, 2..6);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 4);

let resp = make_response(
6,
Some(2..6),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/6"),
);
let err = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..6, got 2..4");
}
}
3 changes: 1 addition & 2 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,7 @@ impl GetOptionsExt for RequestBuilder {
use hyper::header::*;

if let Some(range) = options.range {
let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
self = self.header(RANGE, range);
self = self.header(RANGE, range.to_string());
}

if let Some(tag) = options.if_match {
Expand Down
67 changes: 63 additions & 4 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
pub use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -580,10 +581,12 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;

/// Return the bytes that are stored at the specified location
/// in the given byte range
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.clone()),
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
Expand Down Expand Up @@ -913,7 +916,7 @@ pub struct GetOptions {
/// otherwise returning [`Error::NotModified`]
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
Expand Down Expand Up @@ -1308,7 +1311,7 @@ mod tests {
assert_eq!(bytes, expected_data.slice(range.clone()));

let opts = GetOptions {
range: Some(2..5),
range: Some(GetRange::Bounded(2..5)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
Expand All @@ -1324,6 +1327,62 @@ mod tests {
// Should be a non-fatal error
out_of_range_result.unwrap_err();

let opts = GetOptions {
range: Some(GetRange::Bounded(2..100)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 2..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bitrary data".as_ref());

let opts = GetOptions {
range: Some(GetRange::Suffix(2)),
..Default::default()
};
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 12..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"ta".as_ref());
}
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}

let opts = GetOptions {
range: Some(GetRange::Suffix(100)),
..Default::default()
};
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 0..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"arbitrary data".as_ref());
}
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}

let opts = GetOptions {
range: Some(GetRange::Offset(3)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 3..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"itrary data".as_ref());

let opts = GetOptions {
range: Some(GetRange::Offset(100)),
..Default::default()
};
storage.get_opts(&location, opts).await.unwrap_err();

let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
Expand Down
13 changes: 12 additions & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand Down Expand Up @@ -111,6 +112,11 @@ pub(crate) enum Error {
actual: usize,
},

#[snafu(display("Requested range was invalid"))]
InvalidRange {
source: InvalidGetRange,
},

#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: PathBuf,
Expand Down Expand Up @@ -424,9 +430,14 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = match options.range {
Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
None => 0..meta.size,
};

Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
range,
meta,
})
})
Expand Down
Loading

0 comments on commit 2f5dcdf

Please sign in to comment.