diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 5f9cac9b424b..9f181dc22a30 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -17,9 +17,11 @@ use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; +use crate::util::{concrete_range, HttpRange}; use crate::{Error, GetOptions, GetResult}; use crate::{GetResultPayload, Result}; use async_trait::async_trait; +use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; use reqwest::Response; @@ -40,30 +42,47 @@ pub trait GetClientExt { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result; } +pub(crate) fn response_to_get_result( + response: Response, + location: &Path, + range: Option, +) -> Result { + let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| { + Error::Generic { + store: T::STORE, + source: Box::new(e), + } + })?; + + let stream = response + .bytes_stream() + .map_err(|source| Error::Generic { + store: T::STORE, + source: Box::new(source), + }) + .boxed(); + + Ok(GetResult { + range: concrete_range(range, meta.size), + payload: GetResultPayload::Stream(stream), + meta, + }) +} + #[async_trait] impl GetClientExt for T { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - let range = options.range.clone(); + let range = options.range.clone().map(|r| HttpRange::from(r)); let response = self.get_request(location, options).await?; - let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| { - Error::Generic { - store: T::STORE, - source: Box::new(e), - } - })?; - - let stream = response - .bytes_stream() - .map_err(|source| Error::Generic { - store: T::STORE, - source: Box::new(source), - }) - .boxed(); - - Ok(GetResult { - range: range.unwrap_or(0..meta.size), - payload: GetResultPayload::Stream(stream), - meta, - }) + response_to_get_result::(response, location, range) } } + +/// This trait is a bodge to allow suffix requests without breaking the user-facing API. +/// +/// See https://github.com/apache/arrow-rs/issues/4611 for discussion. +#[async_trait] +pub trait GetSuffixClient { + /// Get the last `nbytes` of a resource. + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result; +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index ae092edac095..366b1deca2f7 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -571,6 +571,11 @@ impl ClientOptions { } } +pub(crate) fn with_suffix_header(builder: RequestBuilder, nbytes: usize) -> RequestBuilder { + let range = format!("bytes=-{nbytes}"); + builder.header(hyper::header::RANGE, range) +} + pub trait GetOptionsExt { fn with_get_options(self, options: GetOptions) -> Self; } diff --git a/object_store/src/util.rs b/object_store/src/util.rs index 15bbbc76c3a2..a7592dc90fc2 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -16,6 +16,11 @@ // under the License. //! Common logic for interacting with remote object stores +use std::{ + fmt::Display, + ops::{Range, RangeBounds}, +}; + use super::Result; use bytes::Bytes; use futures::{stream::StreamExt, Stream, TryStreamExt}; @@ -167,6 +172,116 @@ fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec first`, the response should be empty. + Range { + /// Offset of the first requested byte (0-based). + first: usize, + /// Offset of the last requested byte; e.g. the range `0-0` will request 1 byte. + /// If [None], read to end of resource. + last: usize, + }, + /// A range defined only by the first byte requested (requests all remaining bytes). + Offset { + /// Offset of the first byte requested. + first: usize, + }, + /// A range defined as the number of bytes at the end of the resource. + Suffix { + /// Number of bytes requested. + nbytes: usize, + }, +} + +impl HttpRange { + /// Create a new range which only has an offset. + pub fn new_offset(first: usize) -> Self { + Self::Offset { first } + } + + /// Create a new range with a start and end point. + pub fn new_range(first: usize, last: usize) -> Self { + Self::Range { first, last } + } + + /// Create a new suffix, requesting the last `nbytes` bytes of the resource. + pub fn new_suffix(nbytes: usize) -> Self { + Self::Suffix { nbytes } + } + + /// The index of the first byte requested ([None] for suffix). + pub fn first_byte(&self) -> Option { + match self { + HttpRange::Range { first, .. } => Some(*first), + HttpRange::Offset { first } => Some(*first), + HttpRange::Suffix { .. } => None, + } + } + + /// The index of the last byte requested ([None] for offset or suffix). + pub fn last_byte(&self) -> Option { + match self { + HttpRange::Range { first: _, last } => Some(*last), + HttpRange::Offset { .. } => None, + HttpRange::Suffix { .. } => None, + } + } +} + +impl> From for HttpRange { + fn from(value: T) -> Self { + use std::ops::Bound::*; + let first = match value.start_bound() { + Included(i) => *i, + Excluded(i) => i.saturating_add(1), + Unbounded => 0, + }; + match value.end_bound() { + Included(i) => HttpRange::new_range(first, *i), + Excluded(i) => HttpRange::new_range(first, i.saturating_sub(1)), + Unbounded => HttpRange::new_offset(first), + } + } +} + +impl Display for HttpRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HttpRange::Range { first, last } => f.write_fmt(format_args!("{first}-{last}")), + HttpRange::Offset { first } => f.write_fmt(format_args!("{first}-")), + HttpRange::Suffix { nbytes } => f.write_fmt(format_args!("-{nbytes}")), + } + } +} + +pub(crate) fn concrete_range(range: Option, size: usize) -> Range { + let Some(r) = range else { + return 0..size; + }; + let start = r.first_byte().unwrap_or(0); + let end = r + .last_byte() + .map_or(size, |b| b.saturating_add(1).min(size)); + start..end +} + #[cfg(test)] mod tests { use crate::Error; @@ -269,4 +384,25 @@ mod tests { } } } + #[test] + + fn http_range_str() { + assert_eq!(HttpRange::new_offset(0).to_string(), "0-"); + assert_eq!(HttpRange::new_range(10, 20).to_string(), "10-20"); + assert_eq!(HttpRange::new_suffix(10).to_string(), "-10"); + } + + #[test] + fn http_range_from() { + assert_eq!( + Into::::into(10..15), + HttpRange::new_range(10, 14), + ); + assert_eq!( + Into::::into(10..=15), + HttpRange::new_range(10, 15), + ); + assert_eq!(Into::::into(10..), HttpRange::new_offset(10),); + assert_eq!(Into::::into(..=15), HttpRange::new_range(0, 15)); + } }