From 5a67f1f1e8fda9329df7edda42f82219a975ddc7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 3 Jan 2024 19:34:16 +0000 Subject: [PATCH] Fix ObjectMeta::size for range requests (#5272) (#5276) * Fix ObjectMeta::size for range requests (#5272) * Docs * Update object_store/src/lib.rs Co-authored-by: Andrew Lamb * Add tests --------- Co-authored-by: Andrew Lamb --- object_store/src/client/get.rs | 243 ++++++++++++++++++++++++++++++--- object_store/src/lib.rs | 17 ++- 2 files changed, 236 insertions(+), 24 deletions(-) diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 5f9cac9b424b..b7e7f24b29c2 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -15,13 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Range; + use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; -use crate::{Error, GetOptions, GetResult}; -use crate::{GetResultPayload, Result}; +use crate::{Error, GetOptions, GetResult, GetResultPayload, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; +use hyper::header::CONTENT_RANGE; +use hyper::StatusCode; +use reqwest::header::ToStrError; use reqwest::Response; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; /// A client that can perform a get request #[async_trait] @@ -45,25 +50,221 @@ impl GetClientExt for T { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let range = options.range.clone(); let response = self.get_request(location, options).await?; - let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| { - Error::Generic { - store: T::STORE, - source: Box::new(e), - } - })?; - - let stream = response - .bytes_stream() - .map_err(|source| Error::Generic { - store: T::STORE, - source: Box::new(source), - }) - .boxed(); - - Ok(GetResult { - range: range.unwrap_or(0..meta.size), - payload: GetResultPayload::Stream(stream), - meta, + get_result::(location, range, response).map_err(|e| crate::Error::Generic { + store: T::STORE, + source: Box::new(e), + }) + } +} + +struct ContentRange { + /// The range of the object returned + range: Range, + /// The total size of the object being requested + size: usize, +} + +impl ContentRange { + /// Parse a content range of the form `bytes -/` + /// + /// + fn from_str(s: &str) -> Option { + let rem = s.trim().strip_prefix("bytes ")?; + let (range, size) = rem.split_once('/')?; + let size = size.parse().ok()?; + + let (start_s, end_s) = range.split_once('-')?; + + let start = start_s.parse().ok()?; + let end: usize = end_s.parse().ok()?; + + Some(Self { + size, + range: start..end + 1, }) } } + +/// A specialized `Error` for get-related errors +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +enum GetResultError { + #[snafu(context(false))] + Header { + source: crate::client::header::Error, + }, + + #[snafu(display("Received non-partial response when range requested"))] + NotPartial, + + #[snafu(display("Content-Range header not present in partial response"))] + NoContentRange, + + #[snafu(display("Failed to parse value for CONTENT_RANGE header: \"{value}\""))] + ParseContentRange { value: String }, + + #[snafu(display("Content-Range header contained non UTF-8 characters"))] + InvalidContentRange { source: ToStrError }, + + #[snafu(display("Requested {expected:?}, got {actual:?}"))] + UnexpectedRange { + expected: Range, + actual: Range, + }, +} + +fn get_result( + location: &Path, + range: Option>, + response: Response, +) -> Result { + let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; + + // ensure that we receive the range we asked for + let range = if let Some(expected) = range { + ensure!( + response.status() == StatusCode::PARTIAL_CONTENT, + NotPartialSnafu + ); + let val = response + .headers() + .get(CONTENT_RANGE) + .context(NoContentRangeSnafu)?; + + let value = val.to_str().context(InvalidContentRangeSnafu)?; + let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?; + let actual = value.range; + + ensure!( + actual == expected, + UnexpectedRangeSnafu { expected, actual } + ); + + // Update size to reflect full size of object (#5272) + meta.size = value.size; + actual + } else { + 0..meta.size + }; + + let stream = response + .bytes_stream() + .map_err(|source| Error::Generic { + store: T::STORE, + source: Box::new(source), + }) + .boxed(); + + Ok(GetResult { + range, + meta, + payload: GetResultPayload::Stream(stream), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use hyper::http; + use hyper::http::header::*; + + struct TestClient {} + + #[async_trait] + impl GetClient for TestClient { + const STORE: &'static str = "TEST"; + + const HEADER_CONFIG: HeaderConfig = HeaderConfig { + etag_required: false, + last_modified_required: false, + version_header: None, + }; + + async fn get_request(&self, _: &Path, _: GetOptions) -> Result { + unimplemented!() + } + } + + fn make_response( + object_size: usize, + range: Option>, + status: StatusCode, + content_range: Option<&str>, + ) -> Response { + let mut builder = http::Response::builder(); + if let Some(range) = content_range { + builder = builder.header(CONTENT_RANGE, range); + } + + let body = match range { + Some(range) => vec![0_u8; range.end - range.start], + None => vec![0_u8; object_size], + }; + + builder + .status(status) + .header(CONTENT_LENGTH, object_size) + .body(body) + .unwrap() + .into() + } + + #[tokio::test] + async fn test_get_result() { + let path = Path::from("test"); + + let resp = make_response(12, None, StatusCode::OK, None); + let res = get_result::(&path, None, resp).unwrap(); + assert_eq!(res.meta.size, 12); + assert_eq!(res.range, 0..12); + let bytes = res.bytes().await.unwrap(); + assert_eq!(bytes.len(), 12); + + let resp = make_response( + 12, + Some(2..3), + StatusCode::PARTIAL_CONTENT, + Some("bytes 2-2/12"), + ); + let res = get_result::(&path, Some(2..3), resp).unwrap(); + assert_eq!(res.meta.size, 12); + assert_eq!(res.range, 2..3); + let bytes = res.bytes().await.unwrap(); + assert_eq!(bytes.len(), 1); + + let resp = make_response(12, Some(2..3), StatusCode::OK, None); + let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + assert_eq!( + err.to_string(), + "Received non-partial response when range requested" + ); + + let resp = make_response( + 12, + Some(2..3), + StatusCode::PARTIAL_CONTENT, + Some("bytes 2-3/12"), + ); + let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + assert_eq!(err.to_string(), "Requested 2..3, got 2..4"); + + let resp = make_response( + 12, + Some(2..3), + StatusCode::PARTIAL_CONTENT, + Some("bytes 2-2/*"), + ); + let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + assert_eq!( + err.to_string(), + "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\"" + ); + + let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None); + let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + assert_eq!( + err.to_string(), + "Content-Range header not present in partial response" + ); + } +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 632e949582da..b438254bdd01 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1303,12 +1303,23 @@ mod tests { let range = 3..7; let range_result = storage.get_range(&location, range.clone()).await; + let bytes = range_result.unwrap(); + assert_eq!(bytes, expected_data.slice(range.clone())); + + let opts = GetOptions { + range: Some(2..5), + ..Default::default() + }; + let result = storage.get_opts(&location, opts).await.unwrap(); + // Data is `"arbitrary data"`, length 14 bytes + assert_eq!(result.meta.size, 14); // Should return full object size (#5272) + assert_eq!(result.range, 2..5); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes, b"bit".as_ref()); + let out_of_range = 200..300; let out_of_range_result = storage.get_range(&location, out_of_range).await; - let bytes = range_result.unwrap(); - assert_eq!(bytes, expected_data.slice(range)); - // Should be a non-fatal error out_of_range_result.unwrap_err();