diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index c44a0a8f82d3..2f5157c40e67 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -54,7 +54,6 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls- ring = { version = "0.17", default-features = false, features = ["std"], optional = true } rustls-pemfile = { version = "1.0", default-features = false, optional = true } tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] } -http-content-range = "0.1.2" [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.27.1", features = ["fs"] } diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 3c71e69da00c..f9b4950e58db 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -99,6 +99,9 @@ pub(crate) enum Error { source: crate::client::header::Error, }, + #[snafu(display("Operation not supported by this store: {reason}"))] + NotSupported { reason: &'static str }, + #[snafu(display("ETag required for conditional update"))] MissingETag, } @@ -109,6 +112,9 @@ impl From for crate::Error { Error::GetRequest { source, path } | Error::DeleteRequest { source, path } | Error::PutRequest { source, path } => source.error(STORE, path), + Error::NotSupported { .. } => Self::NotSupported { + source: Box::new(err), + }, _ => Self::Generic { store: STORE, source: Box::new(err), @@ -356,6 +362,19 @@ impl GetClient for AzureClient { /// /// async fn get_request(&self, path: &Path, options: GetOptions) -> Result { + // As of 2024-01-02, Azure does not support suffix requests, + // so we should fail fast here rather than sending one + if let Some(r) = options.range.as_ref() { + match r { + crate::util::GetRange::Suffix(_) => { + Err(Error::NotSupported { + reason: "suffix request", + })?; + } + _ => (), + } + } + let credential = self.get_credential().await?; let url = self.config.path_url(path); let method = match options.head { diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 5692f5b26ddb..fb27d4eccbaa 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -25,7 +25,52 @@ use crate::{GetResultPayload, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use reqwest::Response; -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; + +/// A specialized `Error` for get-related errors +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub(crate) enum Error { + #[snafu(display("Could not extract metadata from response headers"))] + Header { + store: &'static str, + source: crate::client::header::Error, + }, + + #[snafu(display("Requested an invalid range"))] + InvalidRangeRequest { + store: &'static str, + source: crate::util::InvalidGetRange, + }, + + #[snafu(display("Got an invalid range response"))] + InvalidRangeResponse { + store: &'static str, + source: crate::util::InvalidRangeResponse, + }, + + #[snafu(display("Requested {expected:?}, got {actual:?}"))] + UnexpectedRange { + store: &'static str, + expected: Range, + actual: Range, + }, +} + +impl From for crate::Error { + fn from(err: Error) -> Self { + let store = match err { + Error::Header { store, .. } => store, + Error::InvalidRangeRequest { store, .. } => store, + Error::InvalidRangeResponse { store, .. } => store, + Error::UnexpectedRange { store, .. } => store, + }; + Self::Generic { + store: store, + source: Box::new(err), + } + } +} /// A client that can perform a get request #[async_trait] @@ -38,13 +83,6 @@ pub trait GetClient: Send + Sync + 'static { async fn get_request(&self, path: &Path, options: GetOptions) -> Result; } -#[derive(Debug, Snafu)] -#[snafu(display("Requested range {expected:?}, got {actual:?}"))] -pub struct UnexpectedRange { - expected: Range, - actual: Range, -} - /// Extension trait for [`GetClient`] that adds common retrieval functionality #[async_trait] pub trait GetClientExt { @@ -57,20 +95,23 @@ impl GetClientExt for T { let range = options.range.clone(); let response = self.get_request(location, options).await?; let meta = header_meta(location, response.headers(), T::HEADER_CONFIG) - .map_err(|e| as_generic_err(T::STORE, e))?; + .context(HeaderSnafu { store: T::STORE })?; // ensure that we receive the range we asked for let out_range = if let Some(r) = range { let actual = r .as_range(meta.size) - .map_err(|source| as_generic_err(T::STORE, source))?; + .context(InvalidRangeRequestSnafu { store: T::STORE })?; + let expected = - response_range(&response).map_err(|source| as_generic_err(T::STORE, source))?; + response_range(&response).context(InvalidRangeResponseSnafu { store: T::STORE })?; + if actual != expected { - return Err(as_generic_err( - T::STORE, - UnexpectedRange { expected, actual }, - )); + Err(Error::UnexpectedRange { + store: T::STORE, + expected, + actual: actual.clone(), + })?; } actual } else { diff --git a/object_store/src/local.rs b/object_store/src/local.rs index f0b7b7b6e23e..4dd4647b687e 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -19,7 +19,7 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, - util::as_generic_err, + util::InvalidGetRange, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, }; @@ -44,8 +44,6 @@ use tokio::io::AsyncWrite; use url::Url; use walkdir::{DirEntry, WalkDir}; -const STORE: &'static str = "LocalFileSystem"; - /// A specialized `Error` for filesystem object store-related errors #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -114,6 +112,11 @@ pub(crate) enum Error { actual: usize, }, + #[snafu(display("Requested range was invalid"))] + InvalidRange { + source: InvalidGetRange, + }, + #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))] UnableToCopyFile { from: PathBuf, @@ -420,8 +423,7 @@ impl ObjectStore for LocalFileSystem { options.check_preconditions(&meta)?; let range = if let Some(r) = options.range { - r.as_range(meta.size) - .map_err(|e| as_generic_err(STORE, e))? + r.as_range(meta.size).context(InvalidRangeSnafu)? } else { 0..meta.size }; diff --git a/object_store/src/util.rs b/object_store/src/util.rs index 83f533f421b0..a12630ec7ef7 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -19,6 +19,7 @@ use std::{ fmt::Display, ops::{Range, RangeBounds}, + str::from_utf8, }; use crate::Error; @@ -26,8 +27,7 @@ use crate::Error; use super::Result; use bytes::Bytes; use futures::{stream::StreamExt, Stream, TryStreamExt}; -use http_content_range::ContentRange; -use hyper::{header::CONTENT_RANGE, StatusCode}; +use hyper::{header::CONTENT_RANGE, http::HeaderValue, StatusCode}; use reqwest::Response; use snafu::Snafu; @@ -191,7 +191,7 @@ fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec), /// A range defined only by the first byte requested (requests all remaining bytes). Offset(usize), @@ -199,49 +199,6 @@ pub enum GetRange { Suffix(usize), } -impl PartialOrd for GetRange { - fn partial_cmp(&self, other: &Self) -> Option { - use std::cmp::Ordering::*; - use GetRange::*; - // `Suffix`es cannot be compared with `Range`s or `Offset`s. - // `Range`s and `Offset`s are compared by their first byte, - // using the last byte as a tiebreaker (`Offset`s always greater than `Range`s). - match self { - Bounded(r1) => match other { - Bounded(r2) => match r1.start.cmp(&r2.start) { - Equal => Some(r1.end.cmp(&r2.end)), - o => Some(o), - }, - Offset(f2) => match r1.start.cmp(f2) { - Equal => Some(Less), - o => Some(o), - }, - Suffix { .. } => None, - }, - Offset(f1) => match other { - Bounded(r2) => match f1.cmp(&r2.start) { - Equal => Some(Greater), - o => Some(o), - }, - Offset(f2) => Some(f1.cmp(f2)), - Suffix { .. } => None, - }, - Suffix(b1) => match other { - Suffix(b2) => Some(b2.cmp(b1)), - _ => None, - }, - } - } -} - -#[derive(Debug, Snafu)] -pub enum RangeMergeError { - #[snafu(display("Ranges could not be merged because exactly one was a suffix"))] - DifferentTypes, - #[snafu(display("Ranges could not be merged because they were too far apart"))] - Spread, -} - #[derive(Debug, Snafu)] pub enum InvalidGetRange { #[snafu(display("Wanted suffix with {expected}B, resource was {actual}B long"))] @@ -258,90 +215,7 @@ pub enum InvalidGetRange { } impl GetRange { - /// Create a range representing the whole resource. - pub fn new_whole() -> Self { - Self::Offset(0) - } - - /// Whether this is an offset. - pub fn is_offset(&self) -> bool { - match self { - GetRange::Offset(_) => true, - _ => false, - } - } - - /// Whether this is a range. - pub fn is_range(&self) -> bool { - match self { - GetRange::Bounded(_) => true, - _ => false, - } - } - - /// Whether this is an offset. - pub fn is_suffix(&self) -> bool { - match self { - GetRange::Suffix(_) => true, - _ => false, - } - } - - /// Whether the range has no bytes in it (i.e. the last byte is before the first). - /// - /// [None] if the range is an `Offset` or `Suffix`. - /// The response may still be empty. - pub fn is_empty(&self) -> Option { - match self { - GetRange::Bounded(r) => Some(r.is_empty()), - _ => None, - } - } - - /// Whether the range represents the entire resource (i.e. it is an `Offset` of 0). - /// - /// [None] if the range is a `Range` or `Suffix`. - /// The response may still be the full resource. - pub fn is_whole(&self) -> Option { - match self { - GetRange::Offset(first) => Some(first == &0), - _ => None, - } - } - - /// How many bytes the range is requesting. - /// - /// Note that the server may respond with a different number of bytes, - /// depending on the length of the resource and other behaviour. - pub fn nbytes(&self) -> Option { - match self { - GetRange::Bounded(r) => Some(r.end.saturating_sub(r.start)), - GetRange::Offset(_) => None, - GetRange::Suffix(n) => Some(*n), - } - } - - /// The index of the first byte requested ([None] for suffix). - pub fn first_byte(&self) -> Option { - match self { - GetRange::Bounded(r) => Some(r.start), - GetRange::Offset(o) => Some(*o), - GetRange::Suffix(_) => None, - } - } - - /// The index of the last byte requested ([None] for offset or suffix). - pub fn last_byte(&self) -> Option { - match self { - GetRange::Bounded(r) => match r.end { - 0 => None, - n => Some(n), - }, - GetRange::Offset { .. } => None, - GetRange::Suffix { .. } => None, - } - } - + /// Convert to a [std::ops::Range] if valid. pub(crate) fn as_range(&self, len: usize) -> Result, InvalidGetRange> { match self { Self::Bounded(r) => { @@ -384,149 +258,6 @@ impl GetRange { } } } - - /// Merge two ranges which fall within a certain distance `coalesce` of each other. - /// - /// Error if exactly one is a suffix or if the ranges are too far apart. - pub(crate) fn try_merge( - &self, - other: &GetRange, - coalesce: usize, - ) -> Result { - use GetRange::*; - - let (g1, g2) = match self.partial_cmp(other) { - None => { - // One is a suffix, the other isn't. - // This is escapable if one represents the whole resource. - if let Some(whole) = self.is_whole() { - if whole { - return Ok(GetRange::new_whole()); - } - } - if let Some(whole) = other.is_whole() { - if whole { - return Ok(GetRange::new_whole()); - } - } - return Err(RangeMergeError::DifferentTypes); - } - Some(o) => match o { - std::cmp::Ordering::Greater => (other, self), - _ => (self, other), - }, - }; - - match g1 { - Bounded(r1) => match g2 { - Bounded(r2) => { - if r2.start <= r1.end.saturating_add(coalesce) { - Ok(GetRange::Bounded(r1.start..r1.end.max(r2.end))) - } else { - Err(RangeMergeError::Spread) - } - } - Offset(first) => { - if first < &(r1.end.saturating_add(coalesce)) { - Ok(GetRange::Offset(r1.start)) - } else { - Err(RangeMergeError::Spread) - } - } - Suffix { .. } => unreachable!(), - }, - // Either an offset or suffix, both of which would contain the second range. - _ => Ok(g1.clone()), - } - } - - pub fn matches_range(&self, range: Range, len: usize) -> bool { - match self { - Self::Bounded(r) => r == &range, - Self::Offset(o) => o == &range.start && len == range.end, - Self::Suffix(n) => range.end == len && range.start == len - n, - } - } -} - -/// Returns a sorted [Vec] of [HttpRange::Offset] and [HttpRange::Range] that cover `ranges`, -/// and a single [HttpRange::Suffix] if one or more are given. -/// The suffix is also omitted if any of the ranges is the whole resource (`0-`). -/// -/// The suffix is returned separately as it may still overlap with the other ranges, -/// so the caller may want to handle it differently. -pub fn merge_get_ranges + Clone>( - ranges: &[T], - coalesce: usize, -) -> (Vec, Option) { - if ranges.is_empty() { - return (vec![], None); - } - let mut v = Vec::with_capacity(ranges.len()); - let mut o = None::; - - for rng in ranges.iter().cloned().map(|r| r.into()) { - match rng { - GetRange::Suffix(n) => { - if let Some(suff) = o { - o = Some(suff.max(n)); - } else { - o = Some(n); - } - } - _ => v.push(rng), - } - } - - let mut ret = Vec::with_capacity(v.len() + 1); - let suff = o.map(|s| GetRange::Suffix(s)); - - if v.is_empty() { - if let Some(s) = suff.as_ref() { - ret.push(s.clone()); - } - return (ret, suff); - } - - // unwrap is fine because we've already filtered out the suffixes - v.sort_by(|a, b| a.partial_cmp(b).unwrap()); - let mut it = v.into_iter(); - let mut prev = it.next().unwrap(); - - for curr in it { - match prev { - GetRange::Offset { .. } => { - match prev.try_merge(&curr, coalesce) { - Ok(r) => ret.push(r), - Err(_) => { - ret.push(prev); - ret.push(curr); - } - } - - let Some(s) = suff else { - return (ret, None); - }; - - if ret.last().unwrap().is_whole().unwrap() { - return (ret, None); - } else { - return (ret, Some(s)); - } - } - GetRange::Bounded { .. } => match prev.try_merge(&curr, coalesce) { - Ok(r) => ret.push(r), - Err(_) => { - ret.push(prev); - ret.push(curr.clone()); - } - }, - GetRange::Suffix { .. } => unreachable!(), - } - prev = curr; - } - - (ret, suff) } impl Display for GetRange { @@ -555,80 +286,6 @@ impl> From for GetRange { } } -/// Takes a function `fetch` that can fetch a range of bytes and uses this to -/// fetch the provided byte `ranges` -/// -/// To improve performance it will: -/// -/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` -/// * Make multiple `fetch` requests in parallel (up to maximum of 10) -pub async fn coalesce_get_ranges( - ranges: &[GetRange], - fetch: F, - coalesce: usize, -) -> Result, E> -where - F: Send + FnMut(GetRange) -> Fut, - E: Send, - Fut: std::future::Future> + Send, -{ - let (mut fetch_ranges, suff_opt) = merge_get_ranges(ranges, coalesce); - if let Some(suff) = suff_opt.as_ref() { - fetch_ranges.push(suff.clone()); - } - if fetch_ranges.is_empty() { - return Ok(vec![]); - } - - let mut fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned()) - .map(fetch) - .buffered(OBJECT_STORE_COALESCE_PARALLEL) - .try_collect() - .await?; - - let suff = suff_opt.map(|r| { - let nbytes = match r { - GetRange::Suffix(n) => n, - _ => unreachable!(), - }; - fetch_ranges.pop().unwrap(); - let b = fetched.pop().unwrap(); - if nbytes >= b.len() { - b - } else { - b.slice((b.len() - nbytes)..) - } - }); - - Ok(ranges - .iter() - .map(|range| { - match range { - GetRange::Suffix(n) => { - let b = suff.as_ref().unwrap(); - let start = b.len().saturating_sub(*n); - return b.slice(start..b.len()); - } - _ => (), - } - // unwrapping range.first_byte() is fine because we've early-returned suffixes - let idx = fetch_ranges - .partition_point(|v| v.first_byte().unwrap() <= range.first_byte().unwrap()) - - 1; - let fetch_range = &fetch_ranges[idx]; - let fetch_bytes = &fetched[idx]; - - let start = range.first_byte().unwrap() - fetch_range.first_byte().unwrap(); - let end = range.last_byte().map_or(fetch_bytes.len(), |range_last| { - fetch_bytes - .len() - .max(range_last - fetch_range.first_byte().unwrap() + 1) - }); - fetch_bytes.slice(start..end) - }) - .collect()) -} - #[derive(Debug, Snafu)] pub enum InvalidRangeResponse { #[snafu(display("Response was not PARTIAL_CONTENT; length {length:?}"))] @@ -639,6 +296,25 @@ pub enum InvalidRangeResponse { InvalidContentRange { value: Vec }, } +fn parse_content_range(val: &HeaderValue) -> Result, InvalidRangeResponse> { + let bts = val.as_bytes(); + + let err = || InvalidRangeResponse::InvalidContentRange { + value: bts.to_vec(), + }; + let s = from_utf8(bts).map_err(|_| err())?; + let rem = s.trim().strip_prefix("bytes ").ok_or_else(err)?; + let (range, _) = rem.split_once("/").ok_or_else(err)?; + let (start_s, end_s) = range.split_once("-").ok_or_else(err)?; + + let start = start_s.parse().map_err(|_| err())?; + let end: usize = end_s.parse().map_err(|_| err())?; + + Ok(start..(end + 1)) +} + +/// Ensure that the given [Response] contains Partial Content with a single byte range, +/// and get that range range. pub(crate) fn response_range(r: &Response) -> Result, InvalidRangeResponse> { use InvalidRangeResponse::*; @@ -648,19 +324,9 @@ pub(crate) fn response_range(r: &Response) -> Result, InvalidRangeR }); } - let val_bytes = r - .headers() - .get(CONTENT_RANGE) - .ok_or(NoContentRange)? - .as_bytes(); - - match ContentRange::parse_bytes(val_bytes) { - ContentRange::Bytes(c) => Ok(c.first_byte as usize..(c.last_byte as usize + 1)), - ContentRange::UnboundBytes(c) => Ok(c.first_byte as usize..(c.last_byte as usize + 1)), - _ => Err(InvalidContentRange { - value: val_bytes.to_vec(), - }), - } + let val = r.headers().get(CONTENT_RANGE).ok_or(NoContentRange)?; + + parse_content_range(val) } pub(crate) fn as_generic_err( @@ -777,71 +443,17 @@ mod tests { } #[test] - fn http_range_str() { + fn getrange_str() { assert_eq!(GetRange::Offset(0).to_string(), "0-"); assert_eq!(GetRange::Bounded(10..19).to_string(), "10-20"); assert_eq!(GetRange::Suffix(10).to_string(), "-10"); } #[test] - fn http_range_from() { + fn getrange_from() { assert_eq!(Into::::into(10..15), GetRange::Bounded(10..15),); assert_eq!(Into::::into(10..=15), GetRange::Bounded(10..16),); assert_eq!(Into::::into(10..), GetRange::Offset(10),); assert_eq!(Into::::into(..=15), GetRange::Bounded(0..16)); } - - #[test] - fn merge_suffixes() { - assert_eq!( - GetRange::Suffix(10) - .try_merge(&GetRange::Suffix(15), 0) - .unwrap(), - GetRange::Suffix(15) - ); - } - - #[test] - fn merge_range_suffix() { - assert!(GetRange::Bounded(0..11) - .try_merge(&GetRange::Suffix(15), 0) - .is_err()); - } - - #[test] - fn merge_range_range_spread() { - assert_eq!( - GetRange::Bounded(0..11) - .try_merge(&GetRange::Bounded(11..21), 1) - .unwrap(), - GetRange::Bounded(0..21) - ); - } - - #[test] - fn merge_offset() { - assert_eq!( - GetRange::Offset(10) - .try_merge(&GetRange::Offset(15), 0) - .unwrap(), - GetRange::Offset(10) - ) - } - - #[test] - fn merge_offset_range() { - assert_eq!( - GetRange::Offset(10) - .try_merge(&GetRange::Bounded(5..16), 0) - .unwrap(), - GetRange::Offset(5) - ) - } - - #[test] - fn no_merge_offset_range() { - assert!(GetRange::Offset(20) - .try_merge(&GetRange::Bounded(5..16), 0) - .is_err()) - } }