Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: Migrate from snafu to thiserror #6266

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ humantime = "2.1"
itertools = "0.13.0"
parking_lot = { version = "0.12" }
percent-encoding = "2.1"
snafu = { version = "0.8", default-features = false, features = ["std", "rust_1_61"] }
thiserror = "2.0.2"
tracing = { version = "0.1" }
url = "2.2"
walkdir = "2"
Expand Down
52 changes: 32 additions & 20 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use itertools::Itertools;
use md5::{Digest, Md5};
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -43,46 +42,46 @@ use url::Url;
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
enum Error {
#[snafu(display("Missing bucket name"))]
#[error("Missing bucket name")]
MissingBucketName,

#[snafu(display("Missing AccessKeyId"))]
#[error("Missing AccessKeyId")]
MissingAccessKeyId,

#[snafu(display("Missing SecretAccessKey"))]
#[error("Missing SecretAccessKey")]
MissingSecretAccessKey,

#[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))]
#[error("Unable parse source url. Url: {}, Error: {}", url, source)]
UnableToParseUrl {
source: url::ParseError,
url: String,
},

#[snafu(display(
#[error(
"Unknown url scheme cannot be parsed into storage location: {}",
scheme
))]
)]
UnknownUrlScheme { scheme: String },

#[snafu(display("URL did not match any known pattern for scheme: {}", url))]
#[error("URL did not match any known pattern for scheme: {}", url)]
UrlNotRecognised { url: String },

#[snafu(display("Configuration key: '{}' is not known.", key))]
#[error("Configuration key: '{}' is not known.", key)]
UnknownConfigurationKey { key: String },

#[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))]
#[error("Invalid Zone suffix for bucket '{bucket}'")]
ZoneSuffix { bucket: String },

#[snafu(display("Invalid encryption type: {}. Valid values are \"AES256\", \"sse:kms\", \"sse:kms:dsse\" and \"sse-c\".", passed))]
#[error("Invalid encryption type: {}. Valid values are \"AES256\", \"sse:kms\", \"sse:kms:dsse\" and \"sse-c\".", passed)]
InvalidEncryptionType { passed: String },

#[snafu(display(
#[error(
"Invalid encryption header values. Header: {}, source: {}",
header,
source
))]
)]
InvalidEncryptionHeader {
header: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
Expand Down Expand Up @@ -603,8 +602,15 @@ impl AmazonS3Builder {
/// This is a separate member function to allow fallible computation to
/// be deferred until [`Self::build`] which in turn allows deriving [`Clone`]
fn parse_url(&mut self, url: &str) -> Result<()> {
let parsed = Url::parse(url).context(UnableToParseUrlSnafu { url })?;
let host = parsed.host_str().context(UrlNotRecognisedSnafu { url })?;
let parsed = Url::parse(url).map_err(|source| {
let url = url.into();
Error::UnableToParseUrl { url, source }
})?;

let host = parsed
.host_str()
.ok_or_else(|| Error::UrlNotRecognised { url: url.into() })?;

match parsed.scheme() {
"s3" | "s3a" => self.bucket_name = Some(host.to_string()),
"https" => match host.splitn(4, '.').collect_tuple() {
Expand All @@ -630,9 +636,12 @@ impl AmazonS3Builder {
self.bucket_name = Some(bucket.into());
}
}
_ => return Err(UrlNotRecognisedSnafu { url }.build().into()),
_ => return Err(Error::UrlNotRecognised { url: url.into() }.into()),
},
scheme => return Err(UnknownUrlSchemeSnafu { scheme }.build().into()),
scheme => {
let scheme = scheme.into();
return Err(Error::UnknownUrlScheme { scheme }.into());
}
};
Ok(())
}
Expand Down Expand Up @@ -875,7 +884,7 @@ impl AmazonS3Builder {
self.parse_url(&url)?;
}

let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?;
let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
let copy_if_not_exists = self.copy_if_not_exists.map(|x| x.get()).transpose()?;
Expand Down Expand Up @@ -957,7 +966,10 @@ impl AmazonS3Builder {

let (session_provider, zonal_endpoint) = match self.s3_express.get()? {
true => {
let zone = parse_bucket_az(&bucket).context(ZoneSuffixSnafu { bucket: &bucket })?;
let zone = parse_bucket_az(&bucket).ok_or_else(|| {
let bucket = bucket.clone();
Error::ZoneSuffix { bucket }
})?;

// https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-Regions-and-Zones.html
let endpoint = format!("https://{bucket}.s3express-{zone}.{region}.amazonaws.com");
Expand Down
87 changes: 49 additions & 38 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use reqwest::{Client as ReqwestClient, Method, RequestBuilder, Response};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;

const VERSION_HEADER: &str = "x-amz-version-id";
Expand All @@ -65,56 +64,56 @@ const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
const ALGORITHM: &str = "x-amz-checksum-algorithm";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[snafu(display("Error performing DeleteObjects request: {}", source))]
#[error("Error performing DeleteObjects request: {}", source)]
DeleteObjectsRequest { source: crate::client::retry::Error },

#[snafu(display(
#[error(
"DeleteObjects request failed for key {}: {} (code: {})",
path,
message,
code
))]
)]
DeleteFailed {
path: String,
code: String,
message: String,
},

#[snafu(display("Error getting DeleteObjects response body: {}", source))]
#[error("Error getting DeleteObjects response body: {}", source)]
DeleteObjectsResponse { source: reqwest::Error },

#[snafu(display("Got invalid DeleteObjects response: {}", source))]
#[error("Got invalid DeleteObjects response: {}", source)]
InvalidDeleteObjectsResponse {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

#[snafu(display("Error performing list request: {}", source))]
#[error("Error performing list request: {}", source)]
ListRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting list response body: {}", source))]
#[error("Error getting list response body: {}", source)]
ListResponseBody { source: reqwest::Error },

#[snafu(display("Error getting create multipart response body: {}", source))]
#[error("Error getting create multipart response body: {}", source)]
CreateMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Error performing complete multipart request: {}: {}", path, source))]
#[error("Error performing complete multipart request: {}: {}", path, source)]
CompleteMultipartRequest {
source: crate::client::retry::Error,
path: String,
},

#[snafu(display("Error getting complete multipart response body: {}", source))]
#[error("Error getting complete multipart response body: {}", source)]
CompleteMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid list response: {}", source))]
#[error("Got invalid list response: {}", source)]
InvalidListResponse { source: quick_xml::de::DeError },

#[snafu(display("Got invalid multipart response: {}", source))]
#[error("Got invalid multipart response: {}", source)]
InvalidMultipartResponse { source: quick_xml::de::DeError },

#[snafu(display("Unable to extract metadata from headers: {}", source))]
#[error("Unable to extract metadata from headers: {}", source)]
Metadata {
source: crate::client::header::Error,
},
Expand Down Expand Up @@ -263,10 +262,15 @@ impl SessionCredential<'_> {
}
}

#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[snafu(context(false))]
Generic { source: crate::Error },
#[error(transparent)]
Generic {
#[from]
source: crate::Error,
},

#[error("Retry")]
Retry {
source: crate::client::retry::Error,
path: String,
Expand Down Expand Up @@ -426,12 +430,16 @@ impl<'a> Request<'a> {
.payload(self.payload)
.send()
.await
.context(RetrySnafu { path })
.map_err(|source| {
let path = path.into();
RequestError::Retry { source, path }
})
}

pub(crate) async fn do_put(self) -> Result<PutResult> {
let response = self.send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
Ok(get_put_result(response.headers(), VERSION_HEADER)
.map_err(|source| Error::Metadata { source })?)
}
}

Expand Down Expand Up @@ -535,10 +543,10 @@ impl S3Client {
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.send_retry(&self.config.retry_config)
.await
.context(DeleteObjectsRequestSnafu {})?
.map_err(|source| Error::DeleteObjectsRequest { source })?
.bytes()
.await
.context(DeleteObjectsResponseSnafu {})?;
.map_err(|source| Error::DeleteObjectsResponse { source })?;

let response: BatchDeleteResponse =
quick_xml::de::from_reader(response.reader()).map_err(|err| {
Expand Down Expand Up @@ -635,10 +643,10 @@ impl S3Client {
.await?
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
.map_err(|source| Error::CreateMultipartResponseBody { source })?;

let response: InitiateMultipartUploadResult =
quick_xml::de::from_reader(response.reader()).context(InvalidMultipartResponseSnafu)?;
let response: InitiateMultipartUploadResult = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;

Ok(response.upload_id)
}
Expand Down Expand Up @@ -683,14 +691,14 @@ impl S3Client {
.map(|v| v.to_string());

let e_tag = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
false => get_etag(response.headers()).map_err(|source| Error::Metadata { source })?,
true => {
let response = response
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.context(InvalidMultipartResponseSnafu)?;
.map_err(|source| Error::InvalidMultipartResponse { source })?;
response.e_tag
}
};
Expand Down Expand Up @@ -764,19 +772,21 @@ impl S3Client {
.retry_error_body(true)
.send()
.await
.context(CompleteMultipartRequestSnafu {
path: location.as_ref(),
.map_err(|source| Error::CompleteMultipartRequest {
source,
path: location.as_ref().to_string(),
})?;

let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
let version = get_version(response.headers(), VERSION_HEADER)
.map_err(|source| Error::Metadata { source })?;

let data = response
.bytes()
.await
.context(CompleteMultipartResponseBodySnafu)?;
.map_err(|source| Error::CompleteMultipartResponseBody { source })?;

let response: CompleteMultipartUploadResult =
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
let response: CompleteMultipartUploadResult = quick_xml::de::from_reader(data.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;

Ok(PutResult {
e_tag: Some(response.e_tag),
Expand Down Expand Up @@ -884,13 +894,14 @@ impl ListClient for S3Client {
.with_aws_sigv4(credential.authorizer(), None)
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.map_err(|source| Error::ListRequest { source })?
.bytes()
.await
.context(ListResponseBodySnafu)?;
.map_err(|source| Error::ListResponseBody { source })?;

let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidListResponse { source })?;

let mut response: ListResponse =
quick_xml::de::from_reader(response.reader()).context(InvalidListResponseSnafu)?;
let token = response.next_continuation_token.take();

Ok((response.try_into()?, token))
Expand Down
17 changes: 8 additions & 9 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ use percent_encoding::utf8_percent_encode;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use reqwest::{Client, Method, Request, RequestBuilder, StatusCode};
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::warn;
use url::Url;

#[derive(Debug, Snafu)]
#[derive(Debug, thiserror::Error)]
#[allow(clippy::enum_variant_names)]
enum Error {
#[snafu(display("Error performing CreateSession request: {source}"))]
#[error("Error performing CreateSession request: {source}")]
CreateSessionRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting CreateSession response: {source}"))]
#[error("Error getting CreateSession response: {source}")]
CreateSessionResponse { source: reqwest::Error },

#[snafu(display("Invalid CreateSessionOutput response: {source}"))]
#[error("Invalid CreateSessionOutput response: {source}")]
CreateSessionOutput { source: quick_xml::DeError },
}

Expand Down Expand Up @@ -726,13 +725,13 @@ impl TokenProvider for SessionProvider {
.with_aws_sigv4(Some(authorizer), None)
.send_retry(retry)
.await
.context(CreateSessionRequestSnafu)?
.map_err(|source| Error::CreateSessionRequest { source })?
.bytes()
.await
.context(CreateSessionResponseSnafu)?;
.map_err(|source| Error::CreateSessionResponse { source })?;

let resp: CreateSessionOutput =
quick_xml::de::from_reader(bytes.reader()).context(CreateSessionOutputSnafu)?;
let resp: CreateSessionOutput = quick_xml::de::from_reader(bytes.reader())
.map_err(|source| Error::CreateSessionOutput { source })?;

let creds = resp.credentials;
Ok(TemporaryToken {
Expand Down
Loading
Loading