From 096f730a8ef1e0d78ddf5ff3e457357b89d641be Mon Sep 17 00:00:00 2001 From: Chris Barnes Date: Tue, 12 Dec 2023 12:22:01 +0000 Subject: [PATCH] object_store: implement get_suffix for all stores --- object_store/src/aws/client.rs | 38 +++++++++++++++++++++++++++-- object_store/src/aws/mod.rs | 6 ++++- object_store/src/chunked.rs | 4 +++ object_store/src/gcp/client.rs | 40 ++++++++++++++++++++++++++++-- object_store/src/gcp/mod.rs | 6 ++++- object_store/src/http/client.rs | 43 ++++++++++++++++++++++++++++++--- object_store/src/http/mod.rs | 6 ++++- object_store/src/lib.rs | 4 +++ object_store/src/limit.rs | 5 ++++ object_store/src/local.rs | 30 +++++++++++++++++++++++ object_store/src/memory.rs | 10 ++++++++ object_store/src/prefix.rs | 5 ++++ object_store/src/throttle.rs | 10 ++++++++ 13 files changed, 197 insertions(+), 10 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index ecbe556c6dfe..da7274f751cb 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -20,7 +20,7 @@ use crate::aws::credential::{AwsCredential, CredentialExt}; use crate::aws::{ AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, }; -use crate::client::get::GetClient; +use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient}; use crate::client::header::HeaderConfig; use crate::client::header::{get_put_result, get_version}; use crate::client::list::ListClient; @@ -29,9 +29,10 @@ use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, ListResponse, }; -use crate::client::GetOptionsExt; +use crate::client::{with_suffix_header, GetOptionsExt}; use crate::multipart::PartId; use crate::path::DELIMITER; +use crate::util::HttpRange; use crate::{ ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig, }; @@ -665,6 +666,39 @@ impl GetClient for S3Client { } } +#[async_trait] +impl GetSuffixClient for S3Client { + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let credential = self.config.get_credential().await?; + let url = self.config.path_url(location); + let method = Method::GET; + + let builder = self.client.request(method, url); + + // if let Some(v) = &options.version { + // builder = builder.query(&[("versionId", v)]) + // } + + let response = with_suffix_header(builder, nbytes) + .with_aws_sigv4( + credential.as_deref(), + &self.config.region, + "s3", + self.config.sign_payload, + None, + ) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: location.as_ref(), + })?; + + response_to_get_result::(response, location, Some(HttpRange::new_suffix(nbytes)))? + .bytes() + .await + } +} + #[async_trait] impl ListClient for S3Client { /// Make an S3 List request diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 0985263459b2..cd7b867fc312 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -42,7 +42,7 @@ use tokio::io::AsyncWrite; use url::Url; use crate::aws::client::S3Client; -use crate::client::get::GetClientExt; +use crate::client::get::{GetClientExt, GetSuffixClient}; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; @@ -216,6 +216,10 @@ impl ObjectStore for AmazonS3 { .await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.client.get_suffix(location, nbytes).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { self.client.get_opts(location, options).await } diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index d33556f4b12e..e9bc7841fc82 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -136,6 +136,10 @@ impl ObjectStore for ChunkedStore { self.inner.get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.inner.get_suffix(location, nbytes).await + } + async fn head(&self, location: &Path) -> Result { self.inner.head(location).await } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index e4b0f9af7d15..887d7bfb172f 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::client::get::GetClient; +use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient}; use crate::client::header::{get_put_result, get_version, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; @@ -23,10 +23,11 @@ use crate::client::s3::{ CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult, ListResponse, }; -use crate::client::GetOptionsExt; +use crate::client::{with_suffix_header, GetOptionsExt}; use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; +use crate::util::HttpRange; use crate::{ ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result, RetryConfig, @@ -462,6 +463,41 @@ impl GetClient for GoogleCloudStorageClient { } } +#[async_trait] +impl GetSuffixClient for GoogleCloudStorageClient { + async fn get_suffix(&self, path: &Path, nbytes: usize) -> Result { + let credential = self.get_credential().await?; + let url = self.object_url(path); + + let method = Method::GET; + + let mut request = self.client.request(method, url); + + // if let Some(version) = &options.version { + // request = request.query(&[("generation", version)]); + // } + + if !credential.bearer.is_empty() { + request = request.bearer_auth(&credential.bearer); + } + + let response = with_suffix_header(request, nbytes) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: path.as_ref(), + })?; + + response_to_get_result::( + response, + path, + Some(HttpRange::new_suffix(nbytes)), + )? + .bytes() + .await + } +} + #[async_trait] impl ListClient for GoogleCloudStorageClient { /// Perform a list request diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 11fa68310a2e..5bc2befd95b1 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -44,7 +44,7 @@ use client::GoogleCloudStorageClient; use futures::stream::BoxStream; use tokio::io::AsyncWrite; -use crate::client::get::GetClientExt; +use crate::client::get::{GetClientExt, GetSuffixClient}; use crate::client::list::ListClientExt; use crate::multipart::MultiPartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; @@ -139,6 +139,10 @@ impl ObjectStore for GoogleCloudStorage { self.client.get_opts(location, options).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.client.get_suffix(location, nbytes).await + } + async fn delete(&self, location: &Path) -> Result<()> { self.client.delete_request(location).await } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 8700775fb243..53726957fdc9 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::client::get::GetClient; +use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient}; use crate::client::header::HeaderConfig; use crate::client::retry::{self, RetryConfig, RetryExt}; -use crate::client::GetOptionsExt; +use crate::client::{with_suffix_header, GetOptionsExt}; use crate::path::{Path, DELIMITER}; -use crate::util::deserialize_rfc1123; +use crate::util::{deserialize_rfc1123, HttpRange}; use crate::{ClientOptions, GetOptions, ObjectMeta, Result}; use async_trait::async_trait; use bytes::{Buf, Bytes}; @@ -322,6 +322,43 @@ impl GetClient for Client { } } +#[async_trait] +impl GetSuffixClient for Client { + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let url = self.path_url(location); + let method = Method::GET; + let builder = self.client.request(method, url); + + let res = with_suffix_header(builder, nbytes) + .send_retry(&self.retry_config) + .await + .map_err(|source| match source.status() { + // Some stores return METHOD_NOT_ALLOWED for get on directories + Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => { + crate::Error::NotFound { + source: Box::new(source), + path: location.to_string(), + } + } + _ => Error::Request { source }.into(), + })?; + + // We expect a 206 Partial Content response if a range was requested + // a 200 OK response would indicate the server did not fulfill the request + if res.status() != StatusCode::PARTIAL_CONTENT { + return Err(crate::Error::NotSupported { + source: Box::new(Error::RangeNotSupported { + href: location.to_string(), + }), + }); + } + + response_to_get_result::(res, location, Some(HttpRange::new_suffix(nbytes)))? + .bytes() + .await + } +} + /// The response returned by a PROPFIND request, i.e. list #[derive(Deserialize, Default)] pub struct MultiStatus { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index f1d11db4762c..ca334568befa 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -40,7 +40,7 @@ use snafu::{OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; use url::Url; -use crate::client::get::GetClientExt; +use crate::client::get::{GetClientExt, GetSuffixClient}; use crate::client::header::get_etag; use crate::http::client::Client; use crate::path::Path; @@ -130,6 +130,10 @@ impl ObjectStore for HttpStore { self.client.get_opts(location, options).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.client.get_suffix(location, nbytes).await + } + async fn delete(&self, location: &Path) -> Result<()> { self.client.delete(location).await } diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 7bb8cdce13c9..d6520d967c9f 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -797,6 +797,10 @@ macro_rules! as_ref_impl { self.as_ref().get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + self.as_ref().get_suffix(location, nbytes).await + } + async fn get_ranges( &self, location: &Path, diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d1363d9a4d46..fe4eb1130d0d 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -111,6 +111,11 @@ impl ObjectStore for LimitStore { self.inner.get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.get_suffix(location, nbytes).await + } + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.get_ranges(location, ranges).await diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 71b96f058c79..3e249d3630a3 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -456,6 +456,15 @@ impl ObjectStore for LocalFileSystem { .await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let path = self.config.path_to_filesystem(location)?; + maybe_spawn_blocking(move || { + let (mut file, _) = open_file(&path)?; + read_suffix(&mut file, &path, nbytes) + }) + .await + } + async fn delete(&self, location: &Path) -> Result<()> { let path = self.config.path_to_filesystem(location)?; maybe_spawn_blocking(move || match std::fs::remove_file(&path) { @@ -953,6 +962,27 @@ pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range) - Ok(buf.into()) } +pub(crate) fn read_suffix(file: &mut File, path: &PathBuf, nbytes: usize) -> Result { + file.seek(SeekFrom::End(-(nbytes as i64))) + .context(SeekSnafu { path })?; + + let mut buf = Vec::with_capacity(nbytes); + let read = file + .take(nbytes as u64) + .read_to_end(&mut buf) + .context(UnableToReadBytesSnafu { path })?; + + ensure!( + read == nbytes, + OutOfRangeSnafu { + path, + expected: nbytes, + actual: read + } + ); + Ok(buf.into()) +} + fn open_file(path: &PathBuf) -> Result<(File, Metadata)> { let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) { Err(e) => Err(match e.kind() { diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 382300123846..f7fced0a73d3 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -48,6 +48,9 @@ enum Error { ))] OutOfRange { range: Range, len: usize }, + #[snafu(display("Suffix of length {} unavailable for object of length {}", nbytes, len))] + SuffixOutOfRange { nbytes: usize, len: usize }, + #[snafu(display("Invalid range: {}..{}", range.start, range.end))] BadRange { range: Range }, @@ -253,6 +256,13 @@ impl ObjectStore for InMemory { .collect() } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let data = self.entry(location).await?.data; + let len = data.len(); + ensure!(nbytes <= len, SuffixOutOfRangeSnafu { nbytes, len }); + Ok(data.slice((data.len() - nbytes)..)) + } + async fn head(&self, location: &Path) -> Result { let entry = self.entry(location).await?; diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 38f9b07bbd05..f98c7799ad28 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -113,6 +113,11 @@ impl ObjectStore for PrefixStore { self.inner.get_range(&full_path, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let full_path = self.full_path(location); + self.inner.get_suffix(&full_path, nbytes).await + } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let full_path = self.full_path(location); self.inner.get_opts(&full_path, options).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 252256a4599e..b33024c53bd0 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -200,6 +200,16 @@ impl ObjectStore for ThrottledStore { self.inner.get_range(location, range).await } + async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result { + let config = self.config(); + + let sleep_duration = config.wait_get_per_call + config.wait_get_per_byte * nbytes as u32; + + sleep(sleep_duration).await; + + self.inner.get_suffix(location, nbytes).await + } + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { let config = self.config();