diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index ecbe556c6dfe..e44cc2277b0f 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -20,7 +20,7 @@ use crate::aws::credential::{AwsCredential, CredentialExt}; use crate::aws::{ AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, }; -use crate::client::get::GetClient; +use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient}; use crate::client::header::HeaderConfig; use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; @@ -29,9 +29,10 @@ use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, ListResponse, }; -use crate::client::GetOptionsExt; +use crate::client::{with_suffix_header, GetOptionsExt}; use crate::multipart::PartId; use crate::path::DELIMITER; +use crate::util::HttpRange; use crate::{ ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig, }; @@ -665,6 +666,39 @@ impl GetClient for S3Client { } } +#[async_trait] +impl GetSuffixClient for S3Client { + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let credential = self.config.get_credential().await?; + let url = self.config.path_url(location); + let method = Method::GET; + + let builder = self.client.request(method, url); + + // if let Some(v) = &options.version { + // builder = builder.query(&[("versionId", v)]) + // } + + let response = with_suffix_header(builder, nbytes) + .with_aws_sigv4( + credential.as_deref(), + &self.config.region, + "s3", + self.config.sign_payload, + None, + ) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: location.as_ref(), + })?; + + response_to_get_result::(response, location, Some(HttpRange::new_suffix(nbytes)))? + .bytes() + .await + } +} + #[async_trait] impl ListClient for S3Client { /// Make an S3 List request diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 0985263459b2..cd7b867fc312 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -42,7 +42,7 @@ use tokio::io::AsyncWrite; use url::Url; use crate::aws::client::S3Client; -use crate::client::get::GetClientExt; +use crate::client::get::{GetClientExt, GetSuffixClient}; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; @@ -216,6 +216,10 @@ impl ObjectStore for AmazonS3 { .await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.client.get_suffix(location, nbytes).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.client.get_opts(location, options).await } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index d33556f4b12e..e9bc7841fc82 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -136,6 +136,10 @@ impl ObjectStore for ChunkedStore { self.inner.get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.inner.get_suffix(location, nbytes).await + } + async fn head(&self, location: &Path) -> Result { self.inner.head(location).await } diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index 5f9cac9b424b..83a0d54f9319 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -17,9 +17,11 @@ use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; +use crate::util::{concrete_range, HttpRange}; use crate::{Error, GetOptions, GetResult}; use crate::{GetResultPayload, Result}; use async_trait::async_trait; +use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; use reqwest::Response; @@ -40,30 +42,47 @@ pub trait GetClientExt { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result; } +pub(crate) fn response_to_get_result( + response: Response, + location: &Path, + range: Option, +) -> Result { + let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| { + Error::Generic { + store: T::STORE, + source: Box::new(e), + } + })?; + + let stream = response + .bytes_stream() + .map_err(|source| Error::Generic { + store: T::STORE, + source: Box::new(source), + }) + .boxed(); + + Ok(GetResult { + range: concrete_range(range, meta.size), + payload: GetResultPayload::Stream(stream), + meta, + }) +} + #[async_trait] impl GetClientExt for T { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - let range = options.range.clone(); + let range = options.range.clone().map(HttpRange::from); let response = self.get_request(location, options).await?; - let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| { - Error::Generic { - store: T::STORE, - source: Box::new(e), - } - })?; - - let stream = response - .bytes_stream() - .map_err(|source| Error::Generic { - store: T::STORE, - source: Box::new(source), - }) - .boxed(); - - Ok(GetResult { - range: range.unwrap_or(0..meta.size), - payload: GetResultPayload::Stream(stream), - meta, - }) + response_to_get_result::(response, location, range) } } + +/// This trait is a bodge to allow suffix requests without breaking the user-facing API. +/// +/// See https://github.com/apache/arrow-rs/issues/4611 for discussion. +#[async_trait] +pub trait GetSuffixClient { + /// Get the last `nbytes` of a resource. + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result; +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index ae092edac095..366b1deca2f7 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -571,6 +571,11 @@ impl ClientOptions { } } +pub(crate) fn with_suffix_header(builder: RequestBuilder, nbytes: usize) -> RequestBuilder { + let range = format!("bytes=-{nbytes}"); + builder.header(hyper::header::RANGE, range) +} + pub trait GetOptionsExt { fn with_get_options(self, options: GetOptions) -> Self; } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index e4b0f9af7d15..4d0648972593 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::get::GetClient; +use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient}; use crate::client::header::{get_put_result, get_version, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; @@ -23,10 +23,11 @@ use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, ListResponse, }; -use crate::client::GetOptionsExt; +use crate::client::{with_suffix_header, GetOptionsExt}; use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; +use crate::util::HttpRange; use crate::{ ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result, RetryConfig, @@ -462,6 +463,41 @@ impl GetClient for GoogleCloudStorageClient { } } +#[async_trait] +impl GetSuffixClient for GoogleCloudStorageClient { + async fn get_suffix(&self, path: &Path, nbytes: usize) -> Result { + let credential = self.get_credential().await?; + let url = self.object_url(path); + + let method = Method::GET; + + let mut request = self.client.request(method, url); + + // if let Some(version) = &options.version { + // request = request.query(&[("generation", version)]); + // } + + if !credential.bearer.is_empty() { + request = request.bearer_auth(&credential.bearer); + } + + let response = with_suffix_header(request, nbytes) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: path.as_ref(), + })?; + + response_to_get_result::( + response, + path, + Some(HttpRange::new_suffix(nbytes)), + )? + .bytes() + .await + } +} + #[async_trait] impl ListClient for GoogleCloudStorageClient { /// Perform a list request diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 11fa68310a2e..5bc2befd95b1 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -44,7 +44,7 @@ use client::GoogleCloudStorageClient; use futures::stream::BoxStream; use tokio::io::AsyncWrite; -use crate::client::get::GetClientExt; +use crate::client::get::{GetClientExt, GetSuffixClient}; use crate::client::list::ListClientExt; use crate::multipart::MultiPartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; @@ -139,6 +139,10 @@ impl ObjectStore for GoogleCloudStorage { self.client.get_opts(location, options).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.client.get_suffix(location, nbytes).await + } + async fn delete(&self, location: &Path) -> Result<()> { self.client.delete_request(location).await } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 8700775fb243..deb16741adb9 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::client::get::GetClient; +use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient}; use crate::client::header::HeaderConfig; use crate::client::retry::{self, RetryConfig, RetryExt}; -use crate::client::GetOptionsExt; +use crate::client::{with_suffix_header, GetOptionsExt}; use crate::path::{Path, DELIMITER}; -use crate::util::deserialize_rfc1123; +use crate::util::{deserialize_rfc1123, HttpRange}; use crate::{ClientOptions, GetOptions, ObjectMeta, Result}; use async_trait::async_trait; use bytes::{Buf, Bytes}; @@ -322,6 +322,43 @@ impl GetClient for Client { } } +#[async_trait] +impl GetSuffixClient for Client { + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let url = self.path_url(location); + let method = Method::GET; + let builder = self.client.request(method, url); + + let res = with_suffix_header(builder, nbytes) + .send_retry(&self.retry_config) + .await + .map_err(|source| match source.status() { + // Some stores return METHOD_NOT_ALLOWED for get on directories + Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => { + crate::Error::NotFound { + source: Box::new(source), + path: location.to_string(), + } + } + _ => Error::Request { source }.into(), + })?; + + // We expect a 206 Partial Content response if a range was requested + // a 200 OK response would indicate the server did not fulfill the request + if res.status() != StatusCode::PARTIAL_CONTENT { + return Err(crate::Error::NotSupported { + source: Box::new(Error::RangeNotSupported { + href: location.to_string(), + }), + }); + } + + response_to_get_result::(res, location, Some(HttpRange::new_suffix(nbytes)))? + .bytes() + .await + } +} + /// The response returned by a PROPFIND request, i.e. list #[derive(Deserialize, Default)] pub struct MultiStatus { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index f1d11db4762c..ca334568befa 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -40,7 +40,7 @@ use snafu::{OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; use url::Url; -use crate::client::get::GetClientExt; +use crate::client::get::{GetClientExt, GetSuffixClient}; use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; @@ -130,6 +130,10 @@ impl ObjectStore for HttpStore { self.client.get_opts(location, options).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.client.get_suffix(location, nbytes).await + } + async fn delete(&self, location: &Path) -> Result<()> { self.client.delete(location).await } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 3a841667ff97..d6520d967c9f 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -589,6 +589,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { self.get_opts(location, options).await?.bytes().await } + /// Return the last `nbytes` of the resource at the specified location. + /// + /// The default implementation uses 2 requests: one to find the full length, + /// and one to get the requested bytes. + /// Many stores implement suffix requests directly; these should override the default. + /// + /// If you already know the size of the resource, you should use a regular range request, like + /// `my_object_store.get_range(my_location, size.saturating_sub(nbytes)..size)`. + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let options = GetOptions { + head: true, + ..Default::default() + }; + let size = self.get_opts(location, options).await?.meta.size; + self.get_range(location, size.saturating_sub(nbytes)..size) + .await + } + /// Return the bytes that are stored at the specified location /// in the given byte ranges async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { @@ -779,6 +797,10 @@ macro_rules! as_ref_impl { self.as_ref().get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.as_ref().get_suffix(location, nbytes).await + } + async fn get_ranges( &self, location: &Path, diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d1363d9a4d46..fe4eb1130d0d 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -111,6 +111,11 @@ impl ObjectStore for LimitStore { self.inner.get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.get_suffix(location, nbytes).await + } + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.get_ranges(location, ranges).await diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 71b96f058c79..3e249d3630a3 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -456,6 +456,15 @@ impl ObjectStore for LocalFileSystem { .await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let path = self.config.path_to_filesystem(location)?; + maybe_spawn_blocking(move || { + let (mut file, _) = open_file(&path)?; + read_suffix(&mut file, &path, nbytes) + }) + .await + } + async fn delete(&self, location: &Path) -> Result<()> { let path = self.config.path_to_filesystem(location)?; maybe_spawn_blocking(move || match std::fs::remove_file(&path) { @@ -953,6 +962,27 @@ pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) - Ok(buf.into()) } +pub(crate) fn read_suffix(file: &mut File, path: &PathBuf, nbytes: usize) -> Result { + file.seek(SeekFrom::End(-(nbytes as i64))) + .context(SeekSnafu { path })?; + + let mut buf = Vec::with_capacity(nbytes); + let read = file + .take(nbytes as u64) + .read_to_end(&mut buf) + .context(UnableToReadBytesSnafu { path })?; + + ensure!( + read == nbytes, + OutOfRangeSnafu { + path, + expected: nbytes, + actual: read + } + ); + Ok(buf.into()) +} + fn open_file(path: &PathBuf) -> Result<(File, Metadata)> { let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) { Err(e) => Err(match e.kind() { diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 382300123846..f7fced0a73d3 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -48,6 +48,9 @@ enum Error { ))] OutOfRange { range: Range, len: usize }, + #[snafu(display("Suffix of length {} unavailable for object of length {}", nbytes, len))] + SuffixOutOfRange { nbytes: usize, len: usize }, + #[snafu(display("Invalid range: {}..{}", range.start, range.end))] BadRange { range: Range }, @@ -253,6 +256,13 @@ impl ObjectStore for InMemory { .collect() } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let data = self.entry(location).await?.data; + let len = data.len(); + ensure!(nbytes <= len, SuffixOutOfRangeSnafu { nbytes, len }); + Ok(data.slice((data.len() - nbytes)..)) + } + async fn head(&self, location: &Path) -> Result { let entry = self.entry(location).await?; diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 38f9b07bbd05..f98c7799ad28 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -113,6 +113,11 @@ impl ObjectStore for PrefixStore { self.inner.get_range(&full_path, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let full_path = self.full_path(location); + self.inner.get_suffix(&full_path, nbytes).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let full_path = self.full_path(location); self.inner.get_opts(&full_path, options).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 252256a4599e..b33024c53bd0 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -200,6 +200,16 @@ impl ObjectStore for ThrottledStore { self.inner.get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let config = self.config(); + + let sleep_duration = config.wait_get_per_call + config.wait_get_per_byte * nbytes as u32; + + sleep(sleep_duration).await; + + self.inner.get_suffix(location, nbytes).await + } + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let config = self.config(); diff --git a/object_store/src/util.rs b/object_store/src/util.rs index fd86ba7366b0..c80f95d4e7df 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -16,6 +16,11 @@ // under the License. //! Common logic for interacting with remote object stores +use std::{ + fmt::Display, + ops::{Range, RangeBounds}, +}; + use super::Result; use bytes::Bytes; use futures::{stream::StreamExt, Stream, TryStreamExt}; @@ -124,7 +129,7 @@ where let start = range.start - fetch_range.start; let end = range.end - fetch_range.start; - fetch_bytes.slice(start..end) + fetch_bytes.slice(start..end.min(fetch_bytes.len())) }) .collect()) } @@ -167,6 +172,116 @@ fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec first`, the response should be empty. + Range { + /// Offset of the first requested byte (0-based). + first: usize, + /// Offset of the last requested byte; e.g. the range `0-0` will request 1 byte. + /// If [None], read to end of resource. + last: usize, + }, + /// A range defined only by the first byte requested (requests all remaining bytes). + Offset { + /// Offset of the first byte requested. + first: usize, + }, + /// A range defined as the number of bytes at the end of the resource. + Suffix { + /// Number of bytes requested. + nbytes: usize, + }, +} + +impl HttpRange { + /// Create a new range which only has an offset. + pub fn new_offset(first: usize) -> Self { + Self::Offset { first } + } + + /// Create a new range with a start and end point. + pub fn new_range(first: usize, last: usize) -> Self { + Self::Range { first, last } + } + + /// Create a new suffix, requesting the last `nbytes` bytes of the resource. + pub fn new_suffix(nbytes: usize) -> Self { + Self::Suffix { nbytes } + } + + /// The index of the first byte requested ([None] for suffix). + pub fn first_byte(&self) -> Option { + match self { + Self::Range { first, .. } => Some(*first), + Self::Offset { first } => Some(*first), + Self::Suffix { .. } => None, + } + } + + /// The index of the last byte requested ([None] for offset or suffix). + pub fn last_byte(&self) -> Option { + match self { + Self::Range { first: _, last } => Some(*last), + Self::Offset { .. } => None, + Self::Suffix { .. } => None, + } + } +} + +impl> From for HttpRange { + fn from(value: T) -> Self { + use std::ops::Bound::*; + let first = match value.start_bound() { + Included(i) => *i, + Excluded(i) => i.saturating_add(1), + Unbounded => 0, + }; + match value.end_bound() { + Included(i) => Self::new_range(first, *i), + Excluded(i) => Self::new_range(first, i.saturating_sub(1)), + Unbounded => Self::new_offset(first), + } + } +} + +impl Display for HttpRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Range { first, last } => f.write_fmt(format_args!("{first}-{last}")), + Self::Offset { first } => f.write_fmt(format_args!("{first}-")), + Self::Suffix { nbytes } => f.write_fmt(format_args!("-{nbytes}")), + } + } +} + +pub(crate) fn concrete_range(range: Option, size: usize) -> Range { + if let Some(r) = range { + let start = r.first_byte().unwrap_or(0); + let end = r + .last_byte() + .map_or(size, |b| b.saturating_add(1).min(size)); + start..end + } else { + 0..size + } +} + #[cfg(test)] mod tests { use crate::Error; @@ -269,4 +384,25 @@ mod tests { } } } + #[test] + + fn http_range_str() { + assert_eq!(HttpRange::new_offset(0).to_string(), "0-"); + assert_eq!(HttpRange::new_range(10, 20).to_string(), "10-20"); + assert_eq!(HttpRange::new_suffix(10).to_string(), "-10"); + } + + #[test] + fn http_range_from() { + assert_eq!( + Into::::into(10..15), + HttpRange::new_range(10, 14), + ); + assert_eq!( + Into::::into(10..=15), + HttpRange::new_range(10, 15), + ); + assert_eq!(Into::::into(10..), HttpRange::new_offset(10),); + assert_eq!(Into::::into(..=15), HttpRange::new_range(0, 15)); + } }