Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetOptions::head #4931

Merged
merged 1 commit into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,15 +554,10 @@ impl GetClient for S3Client {
const STORE: &'static str = STORE;

/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
async fn get_request(
&self,
path: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match head {
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
};
Expand Down
4 changes: 0 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ impl ObjectStore for AmazonS3 {
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location, &()).await
}
Expand Down
9 changes: 2 additions & 7 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,10 @@ impl GetClient for AzureClient {
/// Make an Azure GET request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
async fn get_request(
&self,
path: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match head {
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
};
Expand Down
4 changes: 0 additions & 4 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,6 @@ impl ObjectStore for MicrosoftAzure {
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location, &()).await
}
Expand Down
24 changes: 3 additions & 21 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult, ObjectMeta};
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
Expand All @@ -34,27 +34,20 @@ pub trait GetClient: Send + Sync + 'static {
last_modified_required: true,
};

async fn get_request(
&self,
path: &Path,
options: GetOptions,
head: bool,
) -> Result<Response>;
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
}

/// Extension trait for [`GetClient`] that adds common retrieval functionality
#[async_trait]
pub trait GetClientExt {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;

async fn head(&self, location: &Path) -> Result<ObjectMeta>;
}

#[async_trait]
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options, false).await?;
let response = self.get_request(location, options).await?;
let meta =
header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
Expand All @@ -77,15 +70,4 @@ impl<T: GetClient> GetClientExt for T {
meta,
})
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.get_request(location, options, true).await?;
header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})
}
}
13 changes: 2 additions & 11 deletions object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,16 +389,11 @@ impl GetClient for GoogleCloudStorageClient {
const STORE: &'static str = STORE;

/// Perform a get request <https://cloud.google.com/storage/docs/xml-api/get-object-download>
async fn get_request(
&self,
path: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.object_url(path);

let method = match head {
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
};
Expand Down Expand Up @@ -604,10 +599,6 @@ impl ObjectStore for GoogleCloudStorage {
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location).await
}
Expand Down
15 changes: 5 additions & 10 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,9 @@ impl GetClient for Client {
last_modified_required: false,
};

async fn get_request(
&self,
location: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
let url = self.path_url(location);
let method = match head {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let url = self.path_url(path);
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
};
Expand All @@ -311,7 +306,7 @@ impl GetClient for Client {
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
path: path.to_string(),
}
}
_ => Error::Request { source }.into(),
Expand All @@ -322,7 +317,7 @@ impl GetClient for Client {
if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
href: path.to_string(),
}),
});
}
Expand Down
4 changes: 0 additions & 4 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ impl ObjectStore for HttpStore {
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete(location).await
}
Expand Down
12 changes: 11 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,13 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
}

/// Return the metadata for the specified location
async fn head(&self, location: &Path) -> Result<ObjectMeta>;
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions {
head: true,
..Default::default()
};
Ok(self.get_opts(location, options).await?.meta)
}

/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;
Expand Down Expand Up @@ -716,6 +722,10 @@ pub struct GetOptions {
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
/// Request transfer of no content
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
pub head: bool,
}

impl GetOptions {
Expand Down
37 changes: 4 additions & 33 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,35 +419,6 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let path = self.config.path_to_filesystem(location)?;
let location = location.clone();

maybe_spawn_blocking(move || {
let metadata = match metadata(&path) {
Err(e) => Err(match e.kind() {
ErrorKind::NotFound => Error::NotFound {
path: path.clone(),
source: e,
},
_ => Error::Metadata {
source: e.into(),
path: location.to_string(),
},
}),
Ok(m) => match !m.is_dir() {
true => Ok(m),
false => Err(Error::NotFound {
path,
source: io::Error::new(ErrorKind::NotFound, "is directory"),
}),
},
}?;
convert_metadata(metadata, location)
})
.await
}

async fn delete(&self, location: &Path) -> Result<()> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
Expand Down Expand Up @@ -1604,15 +1575,15 @@ mod unix_test {
let path = root.path().join(filename);
unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();

let location = Path::from(filename);
integration.head(&location).await.unwrap();

// Need to open read and write side in parallel
let spawned = tokio::task::spawn_blocking(|| {
OpenOptions::new().write(true).open(path).unwrap();
Copy link
Contributor Author

@tustvold tustvold Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This semicolon fixes a subtle bug, where the writer is immediately dropped which then causes subsequent opens to block. I'm actually somewhat surprised it has worked semi-reliably so far. By returning the File, we defer destruction until after we're done with the file

OpenOptions::new().write(true).open(path).unwrap()
});

let location = Path::from(filename);
integration.head(&location).await.unwrap();
Copy link
Contributor Author

@tustvold tustvold Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be moved after the spawn_blocking as it now will open the file, which will block until the writer opens it, I don't see this being an issue as the behaviour of a head request on a file that gets mutated behind the scenes are not exactly very well defined anyway 😅

integration.get(&location).await.unwrap();

spawned.await.unwrap();
}
}
Loading