From 5374c2fedcc6ee6e3d5760957e0039f375072b81 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 3 Jan 2024 14:15:24 +0000 Subject: [PATCH] Fix ObjectMeta::size for range requests (#5272) --- object_store/src/client/get.rs | 131 +++++++++++++++++++++++++++------ object_store/src/lib.rs | 16 +++- 2 files changed, 123 insertions(+), 24 deletions(-) diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 5f9cac9b424b..a896cf2745c7 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -15,13 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Range; + use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; -use crate::{Error, GetOptions, GetResult}; -use crate::{GetResultPayload, Result}; +use crate::{Error, GetOptions, GetResult, GetResultPayload, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; +use hyper::header::CONTENT_RANGE; +use hyper::StatusCode; +use reqwest::header::ToStrError; use reqwest::Response; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; /// A client that can perform a get request #[async_trait] @@ -45,25 +50,109 @@ impl GetClientExt for T { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let range = options.range.clone(); let response = self.get_request(location, options).await?; - let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| { - Error::Generic { - store: T::STORE, - source: Box::new(e), - } - })?; - - let stream = response - .bytes_stream() - .map_err(|source| Error::Generic { - store: T::STORE, - source: Box::new(source), - }) - .boxed(); - - Ok(GetResult { - range: range.unwrap_or(0..meta.size), - payload: GetResultPayload::Stream(stream), - meta, + get_result::(location, range, response).map_err(|e| crate::Error::Generic { + store: T::STORE, + source: Box::new(e), }) } } + +struct ContentRange { + range: Range, + size: usize, +} + +impl ContentRange { + fn from_str(s: &str) -> Option { + let rem = s.trim().strip_prefix("bytes ")?; + let (range, size) = rem.split_once('/')?; + let size = size.parse().ok()?; + + let (start_s, end_s) = range.split_once('-')?; + + let start = start_s.parse().ok()?; + let end: usize = end_s.parse().ok()?; + + Some(Self { + size, + range: start..end + 1, + }) + } +} + +/// A specialized `Error` for get-related errors +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +enum GetResultError { + #[snafu(context(false))] + Header { + source: crate::client::header::Error, + }, + + #[snafu(display("Received non-partial response when range requested"))] + NotPartial, + + #[snafu(display("Content-Range header not present"))] + NoContentRange, + + #[snafu(display("Failed to parse value for CONTENT_RANGE header: {value}"))] + ParseContentRange { value: String }, + + #[snafu(display("Content-Range header contained non UTF-8 characters"))] + InvalidContentRange { source: ToStrError }, + + #[snafu(display("Requested {expected:?}, got {actual:?}"))] + UnexpectedRange { + expected: Range, + actual: Range, + }, +} + +fn get_result( + location: &Path, + range: Option>, + response: Response, +) -> Result { + let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; + + // ensure that we receive the range we asked for + let range = if let Some(expected) = range { + ensure!( + response.status() == StatusCode::PARTIAL_CONTENT, + NotPartialSnafu + ); + let val = response + .headers() + .get(CONTENT_RANGE) + .context(NoContentRangeSnafu)?; + + let value = val.to_str().context(InvalidContentRangeSnafu)?; + let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?; + let actual = value.range; + + ensure!( + actual == expected, + UnexpectedRangeSnafu { expected, actual } + ); + + // Update size to reflect full size of object (#5272) + meta.size = value.size; + actual + } else { + 0..meta.size + }; + + let stream = response + .bytes_stream() + .map_err(|source| Error::Generic { + store: T::STORE, + source: Box::new(source), + }) + .boxed(); + + Ok(GetResult { + range, + meta, + payload: GetResultPayload::Stream(stream), + }) +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 632e949582da..a060f02c6f28 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1303,12 +1303,22 @@ mod tests { let range = 3..7; let range_result = storage.get_range(&location, range.clone()).await; + let bytes = range_result.unwrap(); + assert_eq!(bytes, expected_data.slice(range.clone())); + + let opts = GetOptions { + range: Some(2..5), + ..Default::default() + }; + let result = storage.get_opts(&location, opts).await.unwrap(); + assert_eq!(result.meta.size, 14); // Should return full object size (#5272) + assert_eq!(result.range, 2..5); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes, b"bit".as_ref()); + let out_of_range = 200..300; let out_of_range_result = storage.get_range(&location, out_of_range).await; - let bytes = range_result.unwrap(); - assert_eq!(bytes, expected_data.slice(range)); - // Should be a non-fatal error out_of_range_result.unwrap_err();