Skip to content

Commit

Permalink
object_store: implement get_suffix for all stores
Browse files Browse the repository at this point in the history
  • Loading branch information
clbarnes committed Dec 12, 2023
1 parent 03af42f commit 096f730
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 10 deletions.
38 changes: 36 additions & 2 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{
AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient};
use crate::client::header::HeaderConfig;
use crate::client::header::{get_put_result, get_version};
use crate::client::list::ListClient;
Expand All @@ -29,9 +29,10 @@ use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
};
use crate::client::GetOptionsExt;
use crate::client::{with_suffix_header, GetOptionsExt};
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::HttpRange;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig,
};
Expand Down Expand Up @@ -665,6 +666,39 @@ impl GetClient for S3Client {
}
}

#[async_trait]
impl GetSuffixClient for S3Client {
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let credential = self.config.get_credential().await?;
let url = self.config.path_url(location);
let method = Method::GET;

let builder = self.client.request(method, url);

// if let Some(v) = &options.version {
// builder = builder.query(&[("versionId", v)])
// }

let response = with_suffix_header(builder, nbytes)
.with_aws_sigv4(
credential.as_deref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: location.as_ref(),
})?;

response_to_get_result::<S3Client>(response, location, Some(HttpRange::new_suffix(nbytes)))?
.bytes()
.await
}
}

#[async_trait]
impl ListClient for S3Client {
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio::io::AsyncWrite;
use url::Url;

use crate::aws::client::S3Client;
use crate::client::get::GetClientExt;
use crate::client::get::{GetClientExt, GetSuffixClient};
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
Expand Down Expand Up @@ -216,6 +216,10 @@ impl ObjectStore for AmazonS3 {
.await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.client.get_suffix(location, nbytes).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.client.get_opts(location, options).await
}
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl ObjectStore for ChunkedStore {
self.inner.get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.inner.get_suffix(location, nbytes).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.inner.head(location).await
}
Expand Down
40 changes: 38 additions & 2 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::get::GetClient;
use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient};
use crate::client::header::{get_put_result, get_version, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
};
use crate::client::GetOptionsExt;
use crate::client::{with_suffix_header, GetOptionsExt};
use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
use crate::util::HttpRange;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result,
RetryConfig,
Expand Down Expand Up @@ -462,6 +463,41 @@ impl GetClient for GoogleCloudStorageClient {
}
}

#[async_trait]
impl GetSuffixClient for GoogleCloudStorageClient {
async fn get_suffix(&self, path: &Path, nbytes: usize) -> Result<Bytes> {
let credential = self.get_credential().await?;
let url = self.object_url(path);

let method = Method::GET;

let mut request = self.client.request(method, url);

// if let Some(version) = &options.version {
// request = request.query(&[("generation", version)]);
// }

if !credential.bearer.is_empty() {
request = request.bearer_auth(&credential.bearer);
}

let response = with_suffix_header(request, nbytes)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;

response_to_get_result::<GoogleCloudStorageClient>(
response,
path,
Some(HttpRange::new_suffix(nbytes)),
)?
.bytes()
.await
}
}

#[async_trait]
impl ListClient for GoogleCloudStorageClient {
/// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use client::GoogleCloudStorageClient;
use futures::stream::BoxStream;
use tokio::io::AsyncWrite;

use crate::client::get::GetClientExt;
use crate::client::get::{GetClientExt, GetSuffixClient};
use crate::client::list::ListClientExt;
use crate::multipart::MultiPartStore;
pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
Expand Down Expand Up @@ -139,6 +139,10 @@ impl ObjectStore for GoogleCloudStorage {
self.client.get_opts(location, options).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.client.get_suffix(location, nbytes).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location).await
}
Expand Down
43 changes: 40 additions & 3 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::get::GetClient;
use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient};
use crate::client::header::HeaderConfig;
use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::client::{with_suffix_header, GetOptionsExt};
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, HttpRange};
use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
Expand Down Expand Up @@ -322,6 +322,43 @@ impl GetClient for Client {
}
}

#[async_trait]
impl GetSuffixClient for Client {
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let url = self.path_url(location);
let method = Method::GET;
let builder = self.client.request(method, url);

let res = with_suffix_header(builder, nbytes)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;

// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
}),
});
}

response_to_get_result::<Client>(res, location, Some(HttpRange::new_suffix(nbytes)))?
.bytes()
.await
}
}

/// The response returned by a PROPFIND request, i.e. list
#[derive(Deserialize, Default)]
pub struct MultiStatus {
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::get::GetClientExt;
use crate::client::get::{GetClientExt, GetSuffixClient};
use crate::client::header::get_etag;
use crate::http::client::Client;
use crate::path::Path;
Expand Down Expand Up @@ -130,6 +130,10 @@ impl ObjectStore for HttpStore {
self.client.get_opts(location, options).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.client.get_suffix(location, nbytes).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete(location).await
}
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,10 @@ macro_rules! as_ref_impl {
self.as_ref().get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.as_ref().get_suffix(location, nbytes).await
}

async fn get_ranges(
&self,
location: &Path,
Expand Down
5 changes: 5 additions & 0 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_suffix(location, nbytes).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
Expand Down
30 changes: 30 additions & 0 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,15 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, _) = open_file(&path)?;
read_suffix(&mut file, &path, nbytes)
})
.await
}

async fn delete(&self, location: &Path) -> Result<()> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
Expand Down Expand Up @@ -953,6 +962,27 @@ pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -
Ok(buf.into())
}

pub(crate) fn read_suffix(file: &mut File, path: &PathBuf, nbytes: usize) -> Result<Bytes> {
file.seek(SeekFrom::End(-(nbytes as i64)))
.context(SeekSnafu { path })?;

let mut buf = Vec::with_capacity(nbytes);
let read = file
.take(nbytes as u64)
.read_to_end(&mut buf)
.context(UnableToReadBytesSnafu { path })?;

ensure!(
read == nbytes,
OutOfRangeSnafu {
path,
expected: nbytes,
actual: read
}
);
Ok(buf.into())
}

fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
Err(e) => Err(match e.kind() {
Expand Down
10 changes: 10 additions & 0 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ enum Error {
))]
OutOfRange { range: Range<usize>, len: usize },

#[snafu(display("Suffix of length {} unavailable for object of length {}", nbytes, len))]
SuffixOutOfRange { nbytes: usize, len: usize },

#[snafu(display("Invalid range: {}..{}", range.start, range.end))]
BadRange { range: Range<usize> },

Expand Down Expand Up @@ -253,6 +256,13 @@ impl ObjectStore for InMemory {
.collect()
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let data = self.entry(location).await?.data;
let len = data.len();
ensure!(nbytes <= len, SuffixOutOfRangeSnafu { nbytes, len });
Ok(data.slice((data.len() - nbytes)..))
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let entry = self.entry(location).await?;

Expand Down
5 changes: 5 additions & 0 deletions object_store/src/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.get_range(&full_path, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let full_path = self.full_path(location);
self.inner.get_suffix(&full_path, nbytes).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get_opts(&full_path, options).await
Expand Down
10 changes: 10 additions & 0 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
self.inner.get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let config = self.config();

let sleep_duration = config.wait_get_per_call + config.wait_get_per_byte * nbytes as u32;

sleep(sleep_duration).await;

self.inner.get_suffix(location, nbytes).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
let config = self.config();

Expand Down

0 comments on commit 096f730

Please sign in to comment.