From 2684c3d188289235b42d85a9fbfce28d1865d741 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 23 Dec 2023 15:40:11 +0100 Subject: [PATCH 1/8] refactor: move current signing to new AzureAuthrizer --- object_store/src/azure/credential.rs | 78 ++++++++++++++++++---------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 2b8788d333b2..72e33fcf9866 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -25,15 +25,12 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use chrono::{DateTime, Utc}; -use reqwest::header::ACCEPT; -use reqwest::{ - header::{ - HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LANGUAGE, - CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, - IF_UNMODIFIED_SINCE, RANGE, - }, - Client, Method, RequestBuilder, +use reqwest::header::{ + HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LANGUAGE, + CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, + IF_UNMODIFIED_SINCE, RANGE, }; +use reqwest::{Client, Method, Request, RequestBuilder}; use serde::Deserialize; use snafu::{ResultExt, Snafu}; use std::borrow::Cow; @@ -137,33 +134,37 @@ pub mod authority_hosts { pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com"; } -pub(crate) trait CredentialExt { - /// Apply authorization to requests against azure storage accounts - /// - fn with_azure_authorization(self, credential: &AzureCredential, account: &str) -> Self; +pub struct AzureAuthorizer<'a> { + credential: &'a AzureCredential, + account: &'a str, } -impl CredentialExt for RequestBuilder { - fn with_azure_authorization(mut self, credential: &AzureCredential, account: &str) -> Self { +impl<'a> AzureAuthorizer<'a> { + pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self { + AzureAuthorizer { + credential, + account, + } + } + + pub fn authorize(&self, request: &mut Request) { // rfc2822 string should never contain illegal characters let date = Utc::now(); let date_str = date.format(RFC1123_FMT).to_string(); // we formatted the data string ourselves, so unwrapping should be fine let date_val = HeaderValue::from_str(&date_str).unwrap(); - self = self - .header(DATE, &date_val) - .header(&VERSION, &AZURE_VERSION); + request.headers_mut().insert(DATE, date_val); + request + .headers_mut() + .insert(&VERSION, AZURE_VERSION.clone()); - match credential { + match self.credential { AzureCredential::AccessKey(key) => { - let (client, request) = self.build_split(); - let mut request = request.expect("request valid"); - let signature = generate_authorization( request.headers(), request.url(), request.method(), - account, + self.account, key, ); @@ -173,15 +174,40 @@ impl CredentialExt for RequestBuilder { AUTHORIZATION, HeaderValue::from_str(signature.as_str()).unwrap(), ); - - Self::from_parts(client, request) } - AzureCredential::BearerToken(token) => self.bearer_auth(token), - AzureCredential::SASToken(query_pairs) => self.query(&query_pairs), + AzureCredential::BearerToken(token) => { + request.headers_mut().append( + AUTHORIZATION, + HeaderValue::from_str(format!("Bearer {}", token).as_str()).unwrap(), + ); + } + AzureCredential::SASToken(query_pairs) => { + request + .url_mut() + .query_pairs_mut() + .extend_pairs(query_pairs); + } } } } +pub(crate) trait CredentialExt { + /// Apply authorization to requests against azure storage accounts + /// + fn with_azure_authorization(self, credential: &AzureCredential, account: &str) -> Self; +} + +impl CredentialExt for RequestBuilder { + fn with_azure_authorization(self, credential: &AzureCredential, account: &str) -> Self { + let (client, request) = self.build_split(); + let mut request = request.expect("request valid"); + + AzureAuthorizer::new(credential, account).authorize(&mut request); + + Self::from_parts(client, request) + } +} + /// Generate signed key for authorization via access keys /// fn generate_authorization( From 1c0f33b47feb5e19bf529e50e48b8f8296f8e8e9 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 25 Dec 2023 22:39:18 +0100 Subject: [PATCH 2/8] feat: generate signed urls with master key --- object_store/src/azure/credential.rs | 142 +++++++++++++++++++++++++-- object_store/src/azure/mod.rs | 57 ++++++++++- 2 files changed, 190 insertions(+), 9 deletions(-) diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 72e33fcf9866..5270c60aef48 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -24,7 +24,7 @@ use crate::RetryConfig; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, SecondsFormat, Utc}; use reqwest::header::{ HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, IF_NONE_MATCH, @@ -34,13 +34,15 @@ use reqwest::{Client, Method, Request, RequestBuilder}; use serde::Deserialize; use snafu::{ResultExt, Snafu}; use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; use std::process::Command; use std::str; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use url::Url; -static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06"); +static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03"); static VERSION: HeaderName = HeaderName::from_static("x-ms-version"); pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type"); pub(crate) static DELETE_SNAPSHOTS: HeaderName = HeaderName::from_static("x-ms-delete-snapshots"); @@ -80,6 +82,9 @@ pub enum Error { #[snafu(display("Failed to parse azure cli response: {source}"))] AzureCliResponse { source: serde_json::Error }, + + #[snafu(display("Generating SAS keys with SAS tokens auth is not supported"))] + SASforSASNotSupported, } pub type Result = std::result::Result; @@ -134,12 +139,15 @@ pub mod authority_hosts { pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com"; } +/// Authorize a [`Request`] with an [`AzureAuthorizer`] +#[derive(Debug)] pub struct AzureAuthorizer<'a> { credential: &'a AzureCredential, account: &'a str, } impl<'a> AzureAuthorizer<'a> { + /// Create a new [`AzureAuthorizer`] pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self { AzureAuthorizer { credential, @@ -147,6 +155,7 @@ impl<'a> AzureAuthorizer<'a> { } } + /// Authorize `request` pub fn authorize(&self, request: &mut Request) { // rfc2822 string should never contain illegal characters let date = Utc::now(); @@ -189,6 +198,24 @@ impl<'a> AzureAuthorizer<'a> { } } } + + pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) -> Result<()> { + match self.credential { + AzureCredential::AccessKey(key) => { + let (str_to_sign, query_pairs) = + string_to_sign_service_sas(url, &method, self.account, expires_in); + let auth = hmac_sha256(&key.0, str_to_sign); + url.query_pairs_mut().extend_pairs(query_pairs); + url.query_pairs_mut() + .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); + } + AzureCredential::BearerToken(token) => { + todo!() + } + AzureCredential::SASToken(_) => return Err(Error::SASforSASNotSupported), + }; + Ok(()) + } } pub(crate) trait CredentialExt { @@ -231,6 +258,80 @@ fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName) -> &'a str { .unwrap_or_default() } +fn string_to_sign_service_sas( + u: &Url, + method: &Method, + account: &str, + expires_in: Duration, +) -> (String, HashMap<&'static str, String>) { + let signed_resource = if u + .query() + .map(|q| q.contains("comp=list")) + .unwrap_or_default() + { + "c" + } else { + "b" + } + .to_string(); + + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#permissions-for-a-directory-container-or-blob + let signed_permissions = match *method { + // read and list permissions + Method::GET => match signed_resource.as_str() { + "c" => "rl", + "b" => "r", + _ => unreachable!(), + }, + // write permissions (also allows crating a new blob in a sub-key) + Method::PUT => "w", + // delete permissions + Method::DELETE => "d", + // other methods are not used in any of the current operations + _ => "", + } + .to_string(); + let signed_start = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true); + let signed_expiry = (Utc::now() + expires_in).to_rfc3339_opts(SecondsFormat::Secs, true); + let canonicalized_resource = if u.host_str().unwrap_or_default().contains(account) { + format!("/blob/{}{}", account, u.path()) + } else { + // NOTE: in case of the emulator, the account name is not part of the host + // but the path starts with the account name + format!("/blob{}", u.path()) + }; + + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later + let string_to_sign = format!( + "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + signed_permissions, + signed_start, + signed_expiry, + canonicalized_resource, + "", // signed identifier + "", // signed ip + "", // signed protocol + &AZURE_VERSION.to_str().unwrap(), // signed version + signed_resource, // signed resource + "", // signed snapshot time + "", // signed encryption scope + "", // rscc - response header: Cache-Control + "", // rscd - response header: Content-Disposition + "", // rsce - response header: Content-Encoding + "", // rscl - response header: Content-Language + "", // rsct - response header: Content-Type + ); + + let mut pairs = HashMap::new(); + pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string()); + pairs.insert("sp", signed_permissions); + pairs.insert("st", signed_start); + pairs.insert("se", signed_expiry); + pairs.insert("sr", signed_resource); + + (string_to_sign, pairs) +} + /// fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> String { // content length must only be specified if != 0 @@ -258,7 +359,7 @@ fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> Str add_if_exists(h, &IF_UNMODIFIED_SINCE), add_if_exists(h, &RANGE), canonicalize_header(h), - canonicalized_resource(account, u) + canonicalize_resource(account, u) ) } @@ -283,7 +384,7 @@ fn canonicalize_header(headers: &HeaderMap) -> String { } /// -fn canonicalized_resource(account: &str, uri: &Url) -> String { +fn canonicalize_resource(account: &str, uri: &Url) -> String { let mut can_res: String = String::new(); can_res.push('/'); can_res.push_str(account); @@ -707,14 +808,20 @@ impl CredentialProvider for AzureCliCredential { #[cfg(test)] mod tests { - use super::*; - use crate::client::mock_server::MockServer; + use bytes::Bytes; use futures::executor::block_on; use hyper::body::to_bytes; use hyper::{Body, Response}; use reqwest::{Client, Method}; use tempfile::NamedTempFile; + use super::*; + use crate::azure::MicrosoftAzureBuilder; + use crate::client::mock_server::MockServer; + use crate::path::Path; + use crate::signer::Signer; + use crate::ObjectStore; + #[tokio::test] async fn test_managed_identity() { let server = MockServer::new(); @@ -822,4 +929,27 @@ mod tests { &AzureCredential::BearerToken("TOKEN".into()) ); } + + #[tokio::test] + async fn test_service_sas() { + crate::test_util::maybe_skip_integration!(); + let integration = MicrosoftAzureBuilder::from_env() + .with_container_name("test-bucket") + .build() + .unwrap(); + + let data = Bytes::from("hello world"); + let path = Path::from("file.txt"); + integration.put(&path, data.clone()).await.unwrap(); + + let signed = integration + .signed_url(Method::GET, &path, Duration::from_secs(60)) + .await + .unwrap(); + + let resp = reqwest::get(signed).await.unwrap(); + let loaded = resp.bytes().await.unwrap(); + + assert_eq!(data, loaded); + } } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index af0a4cefa13b..c6096c7c9d53 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -27,22 +27,26 @@ //! a way to drop old blocks. Instead unused blocks are automatically cleaned up //! after 7 days. use crate::{ - multipart::{PartId, PutPart, WriteMultiPart}, + multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}, path::Path, + signer::Signer, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, Result, }; use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; +use reqwest::Method; use std::fmt::Debug; use std::sync::Arc; +use std::time::Duration; use tokio::io::AsyncWrite; +use url::Url; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -pub use credential::authority_hosts; +pub use credential::{authority_hosts, AzureAuthorizer}; mod builder; mod client; @@ -50,7 +54,6 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; -use crate::multipart::MultiPartStore; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -67,6 +70,11 @@ impl MicrosoftAzure { pub fn credentials(&self) -> &AzureCredentialProvider { &self.client.config().credentials } + + /// Create a full URL to the resource specified by `path` with this instance's configuration. + fn path_url(&self, path: &Path) -> url::Url { + self.client.config().path_url(path) + } } impl std::fmt::Display for MicrosoftAzure { @@ -128,6 +136,49 @@ impl ObjectStore for MicrosoftAzure { } } +#[async_trait] +impl Signer for MicrosoftAzure { + /// Create a URL containing the relevant [Service SAS] query parameters that authorize a request + /// via `method` to the resource at `path` valid for the duration specified in `expires_in`. + /// + /// [Service SAS]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas + /// + /// # Example + /// + /// This example returns a URL that will enable a user to upload a file to + /// "some-folder/some-file.txt" in the next hour. + /// + /// ``` + /// # async fn example() -> Result<(), Box> { + /// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, signer::Signer}; + /// # use reqwest::Method; + /// # use std::time::Duration; + /// # + /// let azure = MicrosoftAzureBuilder::new() + /// .with_account("my-account") + /// .with_access_key("my-access-key") + /// .with_container_name("my-container") + /// .build()?; + /// + /// let url = azure.signed_url( + /// Method::PUT, + /// &Path::from("some-folder/some-file.txt"), + /// Duration::from_secs(60 * 60) + /// ).await?; + /// # Ok(()) + /// # } + /// ``` + async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { + let credential = self.credentials().get_credential().await?; + let authorizer = AzureAuthorizer::new(&credential, &self.client.config().account); + + let mut url = self.path_url(path); + authorizer.sign(method, &mut url, expires_in)?; + + Ok(url) + } +} + /// Relevant docs: /// In Azure Blob Store, parts are "blocks" /// put_multipart_part -> PUT block From 5d68d759cf4d0b67af2857ee8345bef62b9ca693 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 29 Dec 2023 00:35:15 +0100 Subject: [PATCH 3/8] feat: sign with user delegated keys --- .gitignore | 1 + object_store/src/azure/client.rs | 80 ++++++++++++++++- object_store/src/azure/credential.rs | 130 ++++++++++++++++++++++++--- object_store/src/azure/mod.rs | 64 ++++++++++++- 4 files changed, 257 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 52ad19cb077d..c897cf24022b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ parquet/data.parquet justfile .prettierignore .env +.editorconfig # local azurite file __azurite* __blobstorage__ diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 3c71e69da00c..1c23367d410c 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -101,6 +101,15 @@ pub(crate) enum Error { #[snafu(display("ETag required for conditional update"))] MissingETag, + + #[snafu(display("Error requesting user delegation key: {}", source))] + DelegationKeyRequest { source: crate::client::retry::Error }, + + #[snafu(display("Error getting user delegation key response body: {}", source))] + DelegationKeyResponseBody { source: reqwest::Error }, + + #[snafu(display("Got invalid user delegation key response: {}", source))] + DelegationKeyResponse { source: quick_xml::de::DeError }, } impl From for crate::Error { @@ -324,6 +333,45 @@ impl AzureClient { Ok(()) } + /// Make a Get User Delegation Key request + /// + pub async fn get_user_delegation_key( + &self, + start: &DateTime, + end: &DateTime, + ) -> Result { + let credential = self.get_credential().await?; + let url = self.config.service.clone(); + + let start = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let expiry = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + + let mut body = String::new(); + body.push_str("\n\n"); + body.push_str(&format!( + "\t{start}\n\t{expiry}\n" + )); + body.push_str(""); + + let response = self + .client + .request(Method::POST, url) + .body(body) + .query(&[("restype", "service"), ("comp", "userdelegationkey")]) + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(DelegationKeyRequestSnafu)? + .bytes() + .await + .context(DelegationKeyResponseBodySnafu)?; + + let response: UserDelegationKey = + quick_xml::de::from_reader(response.reader()).context(DelegationKeyResponseSnafu)?; + + Ok(response) + } + #[cfg(test)] pub async fn get_blob_tagging(&self, path: &Path) -> Result { let credential = self.get_credential().await?; @@ -600,6 +648,18 @@ impl BlockList { } } +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct UserDelegationKey { + pub signed_oid: String, + pub signed_tid: String, + pub signed_start: String, + pub signed_expiry: String, + pub signed_service: String, + pub signed_version: String, + pub value: String, +} + #[cfg(test)] mod tests { use bytes::Bytes; @@ -757,8 +817,7 @@ mod tests { "; - let mut _list_blobs_response_internal: ListResultInternal = - quick_xml::de::from_str(S).unwrap(); + let _list_blobs_response_internal: ListResultInternal = quick_xml::de::from_str(S).unwrap(); } #[test] @@ -778,4 +837,21 @@ mod tests { assert_eq!(res, S) } + + #[test] + fn test_delegated_key_response() { + const S: &str = r#" + + String containing a GUID value + String containing a GUID value + String formatted as ISO date + String formatted as ISO date + b + String specifying REST api version to use to create the user delegation key + String containing the user delegation key +"#; + + let _delegated_key_response_internal: UserDelegationKey = + quick_xml::de::from_str(S).unwrap(); + } } diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 5270c60aef48..7b9966463744 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -42,6 +42,8 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use url::Url; +use super::client::UserDelegationKey; + static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03"); static VERSION: HeaderName = HeaderName::from_static("x-ms-version"); pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type"); @@ -143,14 +145,20 @@ pub mod authority_hosts { #[derive(Debug)] pub struct AzureAuthorizer<'a> { credential: &'a AzureCredential, + delegation_key: Option<&'a UserDelegationKey>, account: &'a str, } impl<'a> AzureAuthorizer<'a> { /// Create a new [`AzureAuthorizer`] - pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self { + pub fn new( + credential: &'a AzureCredential, + delegation_key: Option<&'a UserDelegationKey>, + account: &'a str, + ) -> Self { AzureAuthorizer { credential, + delegation_key, account, } } @@ -199,20 +207,40 @@ impl<'a> AzureAuthorizer<'a> { } } - pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) -> Result<()> { + /// Sign a url with a shared access signature (SAS). + pub(crate) fn sign( + &self, + method: Method, + url: &mut Url, + start: &DateTime, + end: &DateTime, + ) -> Result<()> { + if let Some(delegation_key) = self.delegation_key { + let (str_to_sign, query_pairs) = string_to_sign_user_delegation_sas( + url, + &method, + self.account, + start, + end, + delegation_key, + ); + let signing_key = AzureAccessKey::try_new(&delegation_key.value)?; + let auth = hmac_sha256(signing_key.0, str_to_sign); + url.query_pairs_mut().extend_pairs(query_pairs); + url.query_pairs_mut() + .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); + return Ok(()); + } match self.credential { AzureCredential::AccessKey(key) => { let (str_to_sign, query_pairs) = - string_to_sign_service_sas(url, &method, self.account, expires_in); + string_to_sign_service_sas(url, &method, self.account, start, end); let auth = hmac_sha256(&key.0, str_to_sign); url.query_pairs_mut().extend_pairs(query_pairs); url.query_pairs_mut() .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); } - AzureCredential::BearerToken(token) => { - todo!() - } - AzureCredential::SASToken(_) => return Err(Error::SASforSASNotSupported), + _ => return Err(Error::SASforSASNotSupported), }; Ok(()) } @@ -229,7 +257,7 @@ impl CredentialExt for RequestBuilder { let (client, request) = self.build_split(); let mut request = request.expect("request valid"); - AzureAuthorizer::new(credential, account).authorize(&mut request); + AzureAuthorizer::new(credential, None, account).authorize(&mut request); Self::from_parts(client, request) } @@ -258,12 +286,13 @@ fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName) -> &'a str { .unwrap_or_default() } -fn string_to_sign_service_sas( +fn string_to_sign_sas( u: &Url, method: &Method, account: &str, - expires_in: Duration, -) -> (String, HashMap<&'static str, String>) { + start: &DateTime, + end: &DateTime, +) -> (String, String, String, String, String) { let signed_resource = if u .query() .map(|q| q.contains("comp=list")) @@ -291,8 +320,8 @@ fn string_to_sign_service_sas( _ => "", } .to_string(); - let signed_start = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true); - let signed_expiry = (Utc::now() + expires_in).to_rfc3339_opts(SecondsFormat::Secs, true); + let signed_start = start.to_rfc3339_opts(SecondsFormat::Secs, true); + let signed_expiry = end.to_rfc3339_opts(SecondsFormat::Secs, true); let canonicalized_resource = if u.host_str().unwrap_or_default().contains(account) { format!("/blob/{}{}", account, u.path()) } else { @@ -301,6 +330,25 @@ fn string_to_sign_service_sas( format!("/blob{}", u.path()) }; + ( + signed_resource, + signed_permissions, + signed_start, + signed_expiry, + canonicalized_resource, + ) +} + +fn string_to_sign_service_sas( + u: &Url, + method: &Method, + account: &str, + start: &DateTime, + end: &DateTime, +) -> (String, HashMap<&'static str, String>) { + let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) = + string_to_sign_sas(u, method, account, start, end); + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later let string_to_sign = format!( "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", @@ -332,6 +380,62 @@ fn string_to_sign_service_sas( (string_to_sign, pairs) } +fn string_to_sign_user_delegation_sas( + u: &Url, + method: &Method, + account: &str, + start: &DateTime, + end: &DateTime, + delegation_key: &UserDelegationKey, +) -> (String, HashMap<&'static str, String>) { + let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) = + string_to_sign_sas(u, method, account, start, end); + + // https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later + let string_to_sign = format!( + "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", + signed_permissions, + signed_start, + signed_expiry, + canonicalized_resource, + delegation_key.signed_oid, // signed key object id + delegation_key.signed_tid, // signed key tenant id + delegation_key.signed_start, // signed key start + delegation_key.signed_expiry, // signed key expiry + delegation_key.signed_service, // signed key service + delegation_key.signed_version, // signed key version + "", // signed authorized user object id + "", // signed unauthorized user object id + "", // signed correlation id + "", // signed ip + "", // signed protocol + &AZURE_VERSION.to_str().unwrap(), // signed version + signed_resource, // signed resource + "", // signed snapshot time + "", // signed encryption scope + "", // rscc - response header: Cache-Control + "", // rscd - response header: Content-Disposition + "", // rsce - response header: Content-Encoding + "", // rscl - response header: Content-Language + "", // rsct - response header: Content-Type + ); + + let mut pairs = HashMap::new(); + pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string()); + pairs.insert("sp", signed_permissions); + pairs.insert("st", signed_start); + pairs.insert("se", signed_expiry); + pairs.insert("sr", signed_resource); + pairs.insert("skoid", delegation_key.signed_oid.clone()); + pairs.insert("sktid", delegation_key.signed_tid.clone()); + pairs.insert("skt", delegation_key.signed_start.clone()); + pairs.insert("ske", delegation_key.signed_expiry.clone()); + pairs.insert("sks", delegation_key.signed_service.clone()); + pairs.insert("skv", delegation_key.signed_version.clone()); + + (string_to_sign, pairs) +} + /// fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> String { // content length must only be specified if != 0 diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index c6096c7c9d53..2a2f2b61188a 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -46,7 +46,7 @@ use url::Url; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -pub use credential::{authority_hosts, AzureAuthorizer}; +pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer}; mod builder; mod client; @@ -170,10 +170,24 @@ impl Signer for MicrosoftAzure { /// ``` async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { let credential = self.credentials().get_credential().await?; - let authorizer = AzureAuthorizer::new(&credential, &self.client.config().account); + let signed_start = chrono::Utc::now(); + let signed_expiry = signed_start + expires_in; + let delegation_key = match credential.as_ref() { + AzureCredential::BearerToken(_) => Some( + self.client + .get_user_delegation_key(&signed_start, &signed_expiry) + .await?, + ), + _ => None, + }; + let authorizer = AzureAuthorizer::new( + &credential, + delegation_key.as_ref(), + &self.client.config().account, + ); let mut url = self.path_url(path); - authorizer.sign(method, &mut url, expires_in)?; + authorizer.sign(method, &mut url, &signed_start, &signed_expiry)?; Ok(url) } @@ -262,6 +276,50 @@ mod tests { .await } + #[ignore = "Used for manual testing against a real storage account."] + #[tokio::test] + async fn test_user_delegation_key() { + let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap(); + let container = std::env::var("AZURE_CONTAINER_NAME").unwrap(); + let client_id = std::env::var("AZURE_CLIENT_ID").unwrap(); + let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap(); + let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap(); + let integration = MicrosoftAzureBuilder::new() + .with_account(account) + .with_container_name(container) + .with_client_id(client_id) + .with_client_secret(client_secret) + .with_tenant_id(&tenant_id) + .build() + .unwrap(); + + let start = chrono::Utc::now(); + let end = start + chrono::Duration::days(1); + + let key = integration + .client + .get_user_delegation_key(&start, &end) + .await + .unwrap(); + + assert!(key.value.len() > 0); + assert_eq!(key.signed_tid, tenant_id); + + let data = Bytes::from("hello world"); + let path = Path::from("file.txt"); + integration.put(&path, data.clone()).await.unwrap(); + + let signed = integration + .signed_url(Method::GET, &path, Duration::from_secs(60)) + .await + .unwrap(); + + let resp = reqwest::get(signed).await.unwrap(); + let loaded = resp.bytes().await.unwrap(); + + assert_eq!(data, loaded); + } + #[test] fn azure_test_config_get_value() { let azure_client_id = "object_store:fake_access_key_id".to_string(); From 156ab15a985263a8bf8262524af981b40c8a77f7 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 29 Dec 2023 22:07:26 +0100 Subject: [PATCH 4/8] chore: clippy --- object_store/src/azure/mod.rs | 2 +- object_store/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 2a2f2b61188a..856fced2021a 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -302,7 +302,7 @@ mod tests { .await .unwrap(); - assert!(key.value.len() > 0); + assert!(!key.value.is_empty()); assert_eq!(key.signed_tid, tenant_id); let data = Bytes::from("hello world"); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 3a841667ff97..632e949582da 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1886,7 +1886,7 @@ mod tests { // We can abort an in-progress write let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap(); - if let Some(chunk) = data.get(0) { + if let Some(chunk) = data.first() { writer.write_all(chunk).await.unwrap(); let _ = writer.write(chunk).await.unwrap(); } From f8aa334862762788ba5682f85f1eae8260999db3 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 4 Jan 2024 15:11:24 +0100 Subject: [PATCH 5/8] pr feedback --- object_store/src/aws/credential.rs | 6 +- object_store/src/aws/mod.rs | 4 +- object_store/src/azure/client.rs | 37 +++++++++- object_store/src/azure/credential.rs | 105 ++++++++++++++------------- object_store/src/azure/mod.rs | 55 ++++++-------- object_store/src/signer.rs | 18 ++++- 6 files changed, 133 insertions(+), 92 deletions(-) diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index d290da838d78..13cab16e65d4 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -177,7 +177,7 @@ impl<'a> AwsAuthorizer<'a> { request.headers_mut().insert(AUTH_HEADER, authorization_val); } - pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) { + pub(crate) fn sign(&self, method: &Method, url: &mut Url, expires_in: Duration) { let date = self.date.unwrap_or_else(Utc::now); let scope = self.scope(date); @@ -212,7 +212,7 @@ impl<'a> AwsAuthorizer<'a> { let string_to_sign = self.string_to_sign( date, &scope, - &method, + method, url, &canonical_headers, &signed_headers, @@ -766,7 +766,7 @@ mod tests { }; let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap(); - authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400)); + authorizer.sign(&Method::GET, &mut url, Duration::from_secs(86400)); assert_eq!( url, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 0985263459b2..e2a55e51e452 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -136,14 +136,14 @@ impl Signer for AmazonS3 { /// .build()?; /// /// let url = s3.signed_url( - /// Method::PUT, + /// &Method::PUT, /// &Path::from("some-folder/some-file.txt"), /// Duration::from_secs(60 * 60) /// ).await?; /// # Ok(()) /// # } /// ``` - async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { + async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result { let credential = self.credentials().get_credential().await?; let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config().region); diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 1c23367d410c..ad799c424b9b 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -46,6 +46,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use url::Url; const VERSION_HEADER: &str = "x-ms-version-id"; @@ -110,6 +111,9 @@ pub(crate) enum Error { #[snafu(display("Got invalid user delegation key response: {}", source))] DelegationKeyResponse { source: quick_xml::de::DeError }, + + #[snafu(display("Generating SAS keys with SAS tokens auth is not supported"))] + SASforSASNotSupported, } impl From for crate::Error { @@ -335,7 +339,7 @@ impl AzureClient { /// Make a Get User Delegation Key request /// - pub async fn get_user_delegation_key( + async fn get_user_delegation_key( &self, start: &DateTime, end: &DateTime, @@ -372,6 +376,35 @@ impl AzureClient { Ok(response) } + pub async fn signer(&self, expires_in: Duration) -> Result { + let credential = self.get_credential().await?; + let signed_start = chrono::Utc::now(); + let signed_expiry = signed_start + expires_in; + match credential.as_ref() { + AzureCredential::BearerToken(_) => { + let key = self + .get_user_delegation_key(&signed_start, &signed_expiry) + .await?; + let signing_key = AzureAccessKey::try_new(&key.value)?; + Ok(AzureSigner::new( + signing_key, + self.config.account.clone(), + signed_start, + signed_expiry, + Some(key), + )) + } + AzureCredential::AccessKey(key) => Ok(AzureSigner::new( + key.to_owned(), + self.config.account.clone(), + signed_start, + signed_expiry, + None, + )), + _ => Err(Error::SASforSASNotSupported.into()), + } + } + #[cfg(test)] pub async fn get_blob_tagging(&self, path: &Path) -> Result { let credential = self.get_credential().await?; @@ -650,7 +683,7 @@ impl BlockList { #[derive(Debug, Clone, PartialEq, Deserialize)] #[serde(rename_all = "PascalCase")] -pub struct UserDelegationKey { +pub(crate) struct UserDelegationKey { pub signed_oid: String, pub signed_tid: String, pub signed_start: String, diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 7b9966463744..3d72159fe7b3 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -101,7 +101,7 @@ impl From for crate::Error { } /// A shared Azure Storage Account Key -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct AzureAccessKey(Vec); impl AzureAccessKey { @@ -141,24 +141,63 @@ pub mod authority_hosts { pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com"; } +pub(crate) struct AzureSigner { + signing_key: AzureAccessKey, + start: DateTime, + end: DateTime, + account: String, + delegation_key: Option, +} + +impl AzureSigner { + pub fn new( + signing_key: AzureAccessKey, + account: String, + start: DateTime, + end: DateTime, + delegation_key: Option, + ) -> Self { + Self { + signing_key, + account, + start, + end, + delegation_key, + } + } + + pub fn sign(&self, method: &Method, url: &mut Url) -> Result<()> { + let (str_to_sign, query_pairs) = match &self.delegation_key { + Some(delegation_key) => string_to_sign_user_delegation_sas( + url, + &method, + &self.account, + &self.start, + &self.end, + delegation_key, + ), + None => string_to_sign_service_sas(url, method, &self.account, &self.start, &self.end), + }; + let auth = hmac_sha256(&self.signing_key.0, str_to_sign); + url.query_pairs_mut().extend_pairs(query_pairs); + url.query_pairs_mut() + .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); + Ok(()) + } +} + /// Authorize a [`Request`] with an [`AzureAuthorizer`] #[derive(Debug)] pub struct AzureAuthorizer<'a> { credential: &'a AzureCredential, - delegation_key: Option<&'a UserDelegationKey>, account: &'a str, } impl<'a> AzureAuthorizer<'a> { /// Create a new [`AzureAuthorizer`] - pub fn new( - credential: &'a AzureCredential, - delegation_key: Option<&'a UserDelegationKey>, - account: &'a str, - ) -> Self { + pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self { AzureAuthorizer { credential, - delegation_key, account, } } @@ -206,44 +245,6 @@ impl<'a> AzureAuthorizer<'a> { } } } - - /// Sign a url with a shared access signature (SAS). - pub(crate) fn sign( - &self, - method: Method, - url: &mut Url, - start: &DateTime, - end: &DateTime, - ) -> Result<()> { - if let Some(delegation_key) = self.delegation_key { - let (str_to_sign, query_pairs) = string_to_sign_user_delegation_sas( - url, - &method, - self.account, - start, - end, - delegation_key, - ); - let signing_key = AzureAccessKey::try_new(&delegation_key.value)?; - let auth = hmac_sha256(signing_key.0, str_to_sign); - url.query_pairs_mut().extend_pairs(query_pairs); - url.query_pairs_mut() - .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); - return Ok(()); - } - match self.credential { - AzureCredential::AccessKey(key) => { - let (str_to_sign, query_pairs) = - string_to_sign_service_sas(url, &method, self.account, start, end); - let auth = hmac_sha256(&key.0, str_to_sign); - url.query_pairs_mut().extend_pairs(query_pairs); - url.query_pairs_mut() - .append_pair("sig", BASE64_STANDARD.encode(auth).as_str()); - } - _ => return Err(Error::SASforSASNotSupported), - }; - Ok(()) - } } pub(crate) trait CredentialExt { @@ -257,7 +258,7 @@ impl CredentialExt for RequestBuilder { let (client, request) = self.build_split(); let mut request = request.expect("request valid"); - AzureAuthorizer::new(credential, None, account).authorize(&mut request); + AzureAuthorizer::new(credential, account).authorize(&mut request); Self::from_parts(client, request) } @@ -339,6 +340,9 @@ fn string_to_sign_sas( ) } +/// Create a string to be signed for authorization via [service sas]. +/// +/// [service sas]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later fn string_to_sign_service_sas( u: &Url, method: &Method, @@ -349,7 +353,6 @@ fn string_to_sign_service_sas( let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) = string_to_sign_sas(u, method, account, start, end); - // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later let string_to_sign = format!( "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", signed_permissions, @@ -380,6 +383,9 @@ fn string_to_sign_service_sas( (string_to_sign, pairs) } +/// Create a string to be signed for authorization via [user delegation sas]. +/// +/// [user delegation sas]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later fn string_to_sign_user_delegation_sas( u: &Url, method: &Method, @@ -391,7 +397,6 @@ fn string_to_sign_user_delegation_sas( let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) = string_to_sign_sas(u, method, account, start, end); - // https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later let string_to_sign = format!( "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}", signed_permissions, @@ -1047,7 +1052,7 @@ mod tests { integration.put(&path, data.clone()).await.unwrap(); let signed = integration - .signed_url(Method::GET, &path, Duration::from_secs(60)) + .signed_url(&Method::GET, &path, Duration::from_secs(60)) .await .unwrap(); diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 856fced2021a..e2f4234b4893 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -161,36 +161,35 @@ impl Signer for MicrosoftAzure { /// .build()?; /// /// let url = azure.signed_url( - /// Method::PUT, + /// &Method::PUT, /// &Path::from("some-folder/some-file.txt"), /// Duration::from_secs(60 * 60) /// ).await?; /// # Ok(()) /// # } /// ``` - async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { - let credential = self.credentials().get_credential().await?; - let signed_start = chrono::Utc::now(); - let signed_expiry = signed_start + expires_in; - let delegation_key = match credential.as_ref() { - AzureCredential::BearerToken(_) => Some( - self.client - .get_user_delegation_key(&signed_start, &signed_expiry) - .await?, - ), - _ => None, - }; - - let authorizer = AzureAuthorizer::new( - &credential, - delegation_key.as_ref(), - &self.client.config().account, - ); + async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result { let mut url = self.path_url(path); - authorizer.sign(method, &mut url, &signed_start, &signed_expiry)?; - + let signer = self.client.signer(expires_in).await?; + signer.sign(method, &mut url)?; Ok(url) } + + async fn signed_urls( + &self, + method: &Method, + paths: &[Path], + expires_in: Duration, + ) -> Result> { + let mut urls = Vec::with_capacity(paths.len()); + let signer = self.client.signer(expires_in).await?; + for path in paths { + let mut url = self.path_url(path); + signer.sign(&method, &mut url)?; + urls.push(url); + } + Ok(urls) + } } /// Relevant docs: @@ -293,24 +292,12 @@ mod tests { .build() .unwrap(); - let start = chrono::Utc::now(); - let end = start + chrono::Duration::days(1); - - let key = integration - .client - .get_user_delegation_key(&start, &end) - .await - .unwrap(); - - assert!(!key.value.is_empty()); - assert_eq!(key.signed_tid, tenant_id); - let data = Bytes::from("hello world"); let path = Path::from("file.txt"); integration.put(&path, data.clone()).await.unwrap(); let signed = integration - .signed_url(Method::GET, &path, Duration::from_secs(60)) + .signed_url(&Method::GET, &path, Duration::from_secs(60)) .await .unwrap(); diff --git a/object_store/src/signer.rs b/object_store/src/signer.rs index ed92e28799e5..0dd81c8d660e 100644 --- a/object_store/src/signer.rs +++ b/object_store/src/signer.rs @@ -30,5 +30,21 @@ pub trait Signer: Send + Sync + fmt::Debug + 'static { /// the URL should be valid, return a signed [`Url`] created with the object store /// implementation's credentials such that the URL can be handed to something that doesn't have /// access to the object store's credentials, to allow limited access to the object store. - async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result; + async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result; + + /// Generate signed urls for multiple paths. + /// + /// See [`Signer::signed_url`] for more details. + async fn signed_urls( + &self, + method: &Method, + paths: &[Path], + expires_in: Duration, + ) -> Result> { + let mut urls = Vec::with_capacity(paths.len()); + for path in paths { + urls.push(self.signed_url(method, path, expires_in).await?); + } + Ok(urls) + } } From 9ccc92b1e852903cfee425ff8bf945a47f4494a8 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 4 Jan 2024 15:29:01 +0100 Subject: [PATCH 6/8] chore: clippy --- object_store/src/azure/credential.rs | 2 +- object_store/src/azure/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 3d72159fe7b3..21473c9d79e0 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -170,7 +170,7 @@ impl AzureSigner { let (str_to_sign, query_pairs) = match &self.delegation_key { Some(delegation_key) => string_to_sign_user_delegation_sas( url, - &method, + method, &self.account, &self.start, &self.end, diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index e2f4234b4893..dc80fddd2a97 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -185,7 +185,7 @@ impl Signer for MicrosoftAzure { let signer = self.client.signer(expires_in).await?; for path in paths { let mut url = self.path_url(path); - signer.sign(&method, &mut url)?; + signer.sign(method, &mut url)?; urls.push(url); } Ok(urls) From 82d3bd026b8e1f1c6af9b85e4c75cfb3835fa55e Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 4 Jan 2024 15:52:18 +0100 Subject: [PATCH 7/8] pr feedback II --- object_store/src/aws/credential.rs | 6 +++--- object_store/src/aws/mod.rs | 4 ++-- object_store/src/azure/client.rs | 4 ++++ object_store/src/azure/credential.rs | 14 +++----------- object_store/src/azure/mod.rs | 12 ++++++------ object_store/src/signer.rs | 6 +++--- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index 13cab16e65d4..d290da838d78 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -177,7 +177,7 @@ impl<'a> AwsAuthorizer<'a> { request.headers_mut().insert(AUTH_HEADER, authorization_val); } - pub(crate) fn sign(&self, method: &Method, url: &mut Url, expires_in: Duration) { + pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) { let date = self.date.unwrap_or_else(Utc::now); let scope = self.scope(date); @@ -212,7 +212,7 @@ impl<'a> AwsAuthorizer<'a> { let string_to_sign = self.string_to_sign( date, &scope, - method, + &method, url, &canonical_headers, &signed_headers, @@ -766,7 +766,7 @@ mod tests { }; let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap(); - authorizer.sign(&Method::GET, &mut url, Duration::from_secs(86400)); + authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400)); assert_eq!( url, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index e2a55e51e452..0985263459b2 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -136,14 +136,14 @@ impl Signer for AmazonS3 { /// .build()?; /// /// let url = s3.signed_url( - /// &Method::PUT, + /// Method::PUT, /// &Path::from("some-folder/some-file.txt"), /// Duration::from_secs(60 * 60) /// ).await?; /// # Ok(()) /// # } /// ``` - async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result { + async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { let credential = self.credentials().get_credential().await?; let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config().region); diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index ad799c424b9b..865e0a1a939c 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -376,6 +376,10 @@ impl AzureClient { Ok(response) } + /// Creat an AzureSigner for generating SAS tokens (pre-signed urls). + /// + /// Depending on the type of credential, this will either use the account key or a user delegation key. + /// Since delegation keys are acquired ad-hoc, the signer aloows for signing multiple urls with the same key. pub async fn signer(&self, expires_in: Duration) -> Result { let credential = self.get_credential().await?; let signed_start = chrono::Utc::now(); diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index 21473c9d79e0..a51e3ed637f5 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -294,16 +294,8 @@ fn string_to_sign_sas( start: &DateTime, end: &DateTime, ) -> (String, String, String, String, String) { - let signed_resource = if u - .query() - .map(|q| q.contains("comp=list")) - .unwrap_or_default() - { - "c" - } else { - "b" - } - .to_string(); + // NOTE: for now only blob signing is supported. + let signed_resource = "b".to_string(); // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#permissions-for-a-directory-container-or-blob let signed_permissions = match *method { @@ -1052,7 +1044,7 @@ mod tests { integration.put(&path, data.clone()).await.unwrap(); let signed = integration - .signed_url(&Method::GET, &path, Duration::from_secs(60)) + .signed_url(Method::GET, &path, Duration::from_secs(60)) .await .unwrap(); diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index dc80fddd2a97..323f4d7e0a99 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -161,23 +161,23 @@ impl Signer for MicrosoftAzure { /// .build()?; /// /// let url = azure.signed_url( - /// &Method::PUT, + /// Method::PUT, /// &Path::from("some-folder/some-file.txt"), /// Duration::from_secs(60 * 60) /// ).await?; /// # Ok(()) /// # } /// ``` - async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result { + async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { let mut url = self.path_url(path); let signer = self.client.signer(expires_in).await?; - signer.sign(method, &mut url)?; + signer.sign(&method, &mut url)?; Ok(url) } async fn signed_urls( &self, - method: &Method, + method: Method, paths: &[Path], expires_in: Duration, ) -> Result> { @@ -185,7 +185,7 @@ impl Signer for MicrosoftAzure { let signer = self.client.signer(expires_in).await?; for path in paths { let mut url = self.path_url(path); - signer.sign(method, &mut url)?; + signer.sign(&method, &mut url)?; urls.push(url); } Ok(urls) @@ -297,7 +297,7 @@ mod tests { integration.put(&path, data.clone()).await.unwrap(); let signed = integration - .signed_url(&Method::GET, &path, Duration::from_secs(60)) + .signed_url(Method::GET, &path, Duration::from_secs(60)) .await .unwrap(); diff --git a/object_store/src/signer.rs b/object_store/src/signer.rs index 0dd81c8d660e..da55c689aef5 100644 --- a/object_store/src/signer.rs +++ b/object_store/src/signer.rs @@ -30,20 +30,20 @@ pub trait Signer: Send + Sync + fmt::Debug + 'static { /// the URL should be valid, return a signed [`Url`] created with the object store /// implementation's credentials such that the URL can be handed to something that doesn't have /// access to the object store's credentials, to allow limited access to the object store. - async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result; + async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result; /// Generate signed urls for multiple paths. /// /// See [`Signer::signed_url`] for more details. async fn signed_urls( &self, - method: &Method, + method: Method, paths: &[Path], expires_in: Duration, ) -> Result> { let mut urls = Vec::with_capacity(paths.len()); for path in paths { - urls.push(self.signed_url(method, path, expires_in).await?); + urls.push(self.signed_url(method.clone(), path, expires_in).await?); } Ok(urls) } From 60f1bf60ef6d6c9d8f3a7365e9bf8c3835c64c88 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 4 Jan 2024 16:54:46 +0100 Subject: [PATCH 8/8] fix: move sigining test --- object_store/src/aws/mod.rs | 1 + object_store/src/azure/credential.rs | 28 ---------------------------- object_store/src/azure/mod.rs | 1 + object_store/src/lib.rs | 23 +++++++++++++++++++++++ 4 files changed, 25 insertions(+), 28 deletions(-) diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 0985263459b2..e4467e452937 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -346,6 +346,7 @@ mod tests { rename_and_copy(&integration).await; stream_get(&integration).await; multipart(&integration, &integration).await; + signing(&integration).await; tagging(&integration, !config.disable_tagging, |p| { let client = Arc::clone(&integration.client); diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs index a51e3ed637f5..bfbbde826046 100644 --- a/object_store/src/azure/credential.rs +++ b/object_store/src/azure/credential.rs @@ -909,7 +909,6 @@ impl CredentialProvider for AzureCliCredential { #[cfg(test)] mod tests { - use bytes::Bytes; use futures::executor::block_on; use hyper::body::to_bytes; use hyper::{Body, Response}; @@ -917,11 +916,7 @@ mod tests { use tempfile::NamedTempFile; use super::*; - use crate::azure::MicrosoftAzureBuilder; use crate::client::mock_server::MockServer; - use crate::path::Path; - use crate::signer::Signer; - use crate::ObjectStore; #[tokio::test] async fn test_managed_identity() { @@ -1030,27 +1025,4 @@ mod tests { &AzureCredential::BearerToken("TOKEN".into()) ); } - - #[tokio::test] - async fn test_service_sas() { - crate::test_util::maybe_skip_integration!(); - let integration = MicrosoftAzureBuilder::from_env() - .with_container_name("test-bucket") - .build() - .unwrap(); - - let data = Bytes::from("hello world"); - let path = Path::from("file.txt"); - integration.put(&path, data.clone()).await.unwrap(); - - let signed = integration - .signed_url(Method::GET, &path, Duration::from_secs(60)) - .await - .unwrap(); - - let resp = reqwest::get(signed).await.unwrap(); - let loaded = resp.bytes().await.unwrap(); - - assert_eq!(data, loaded); - } } diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 323f4d7e0a99..712b7a36c56a 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -266,6 +266,7 @@ mod tests { stream_get(&integration).await; put_opts(&integration, true).await; multipart(&integration, &integration).await; + signing(&integration).await; let validate = !integration.client.config().disable_tagging; tagging(&integration, validate, |p| { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 632e949582da..846ac3e46d22 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -2115,6 +2115,29 @@ mod tests { assert_eq!(meta.size, chunk_size * 2); } + #[cfg(any(feature = "azure", feature = "aws"))] + pub(crate) async fn signing(integration: &T) + where + T: ObjectStore + crate::signer::Signer, + { + use reqwest::Method; + use std::time::Duration; + + let data = Bytes::from("hello world"); + let path = Path::from("file.txt"); + integration.put(&path, data.clone()).await.unwrap(); + + let signed = integration + .signed_url(Method::GET, &path, Duration::from_secs(60)) + .await + .unwrap(); + + let resp = reqwest::get(signed).await.unwrap(); + let loaded = resp.bytes().await.unwrap(); + + assert_eq!(data, loaded); + } + #[cfg(any(feature = "aws", feature = "azure"))] pub(crate) async fn tagging(storage: &dyn ObjectStore, validate: bool, get_tags: F) where