-
Notifications
You must be signed in to change notification settings - Fork 848
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Support for Microsoft Fabric / OneLake (#4573)
* Changes required for onelake-fix * Fix Unit tests * Add Unit Tests * Add onelake read/write test * Add with_use_fabric , for fabric url check * Final tweaks * Further tweaks * Automatically set use_fabric_endpoint --------- Co-authored-by: Raphael Taylor-Davies <[email protected]>
- Loading branch information
1 parent
65c24d6
commit 230612e
Showing
1 changed file
with
100 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -341,6 +341,10 @@ pub struct MicrosoftAzureBuilder { | |
client_options: ClientOptions, | ||
/// Credentials | ||
credentials: Option<AzureCredentialProvider>, | ||
/// When set to true, fabric url scheme will be used | ||
/// | ||
/// i.e. https://{account_name}.dfs.fabric.microsoft.com | ||
use_fabric_endpoint: ConfigValue<bool>, | ||
} | ||
|
||
/// Configuration keys for [`MicrosoftAzureBuilder`] | ||
|
@@ -430,6 +434,13 @@ pub enum AzureConfigKey { | |
/// - `use_emulator` | ||
UseEmulator, | ||
|
||
/// Use object store with url scheme account.dfs.fabric.microsoft.com | ||
/// | ||
/// Supported keys: | ||
/// - `azure_use_fabric_endpoint` | ||
/// - `use_fabric_endpoint` | ||
UseFabricEndpoint, | ||
|
||
/// Endpoint to request a imds managed identity token | ||
/// | ||
/// Supported keys: | ||
|
@@ -482,6 +493,7 @@ impl AsRef<str> for AzureConfigKey { | |
Self::SasKey => "azure_storage_sas_key", | ||
Self::Token => "azure_storage_token", | ||
Self::UseEmulator => "azure_storage_use_emulator", | ||
Self::UseFabricEndpoint => "azure_use_fabric_endpoint", | ||
Self::MsiEndpoint => "azure_msi_endpoint", | ||
Self::ObjectId => "azure_object_id", | ||
Self::MsiResourceId => "azure_msi_resource_id", | ||
|
@@ -531,6 +543,9 @@ impl FromStr for AzureConfigKey { | |
"azure_federated_token_file" | "federated_token_file" => { | ||
Ok(Self::FederatedTokenFile) | ||
} | ||
"azure_use_fabric_endpoint" | "use_fabric_endpoint" => { | ||
Ok(Self::UseFabricEndpoint) | ||
} | ||
"azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli), | ||
// Backwards compatibility | ||
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), | ||
|
@@ -600,11 +615,16 @@ impl MicrosoftAzureBuilder { | |
/// | ||
/// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs)) | ||
/// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>` | ||
/// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>` | ||
/// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs)) | ||
/// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs)) | ||
/// - `azure://<container>/<path>` (custom) | ||
/// - `https://<account>.dfs.core.windows.net` | ||
/// - `https://<account>.blob.core.windows.net` | ||
/// - `https://<account>.dfs.fabric.microsoft.com` | ||
/// - `https://<account>.dfs.fabric.microsoft.com/<container>` | ||
/// - `https://<account>.blob.fabric.microsoft.com` | ||
/// - `https://<account>.blob.fabric.microsoft.com/<container>` | ||
/// | ||
/// Note: Settings derived from the URL will override any others set on this builder | ||
/// | ||
|
@@ -639,6 +659,7 @@ impl MicrosoftAzureBuilder { | |
} | ||
AzureConfigKey::UseAzureCli => self.use_azure_cli.parse(value), | ||
AzureConfigKey::UseEmulator => self.use_emulator.parse(value), | ||
AzureConfigKey::UseFabricEndpoint => self.use_fabric_endpoint.parse(value), | ||
AzureConfigKey::Client(key) => { | ||
self.client_options = self.client_options.with_config(key, value) | ||
} | ||
|
@@ -692,6 +713,9 @@ impl MicrosoftAzureBuilder { | |
AzureConfigKey::SasKey => self.sas_key.clone(), | ||
AzureConfigKey::Token => self.bearer_token.clone(), | ||
AzureConfigKey::UseEmulator => Some(self.use_emulator.to_string()), | ||
AzureConfigKey::UseFabricEndpoint => { | ||
Some(self.use_fabric_endpoint.to_string()) | ||
} | ||
AzureConfigKey::MsiEndpoint => self.msi_endpoint.clone(), | ||
AzureConfigKey::ObjectId => self.object_id.clone(), | ||
AzureConfigKey::MsiResourceId => self.msi_resource_id.clone(), | ||
|
@@ -724,6 +748,10 @@ impl MicrosoftAzureBuilder { | |
} else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") { | ||
self.container_name = Some(validate(parsed.username())?); | ||
self.account_name = Some(validate(a)?); | ||
} else if let Some(a) = host.strip_suffix(".dfs.fabric.microsoft.com") { | ||
self.container_name = Some(validate(parsed.username())?); | ||
self.account_name = Some(validate(a)?); | ||
self.use_fabric_endpoint = true.into(); | ||
} else { | ||
return Err(UrlNotRecognisedSnafu { url }.build().into()); | ||
} | ||
|
@@ -733,6 +761,21 @@ impl MicrosoftAzureBuilder { | |
| Some((a, "blob.core.windows.net")) => { | ||
self.account_name = Some(validate(a)?); | ||
} | ||
Some((a, "dfs.fabric.microsoft.com")) | ||
| Some((a, "blob.fabric.microsoft.com")) => { | ||
self.account_name = Some(validate(a)?); | ||
// Attempt to infer the container name from the URL | ||
// - https://onelake.dfs.fabric.microsoft.com/<workspaceGUID>/<itemGUID>/Files/test.csv | ||
// - https://onelake.dfs.fabric.microsoft.com/<workspace>/<item>.<itemtype>/<path>/<fileName> | ||
// | ||
// See <https://learn.microsoft.com/en-us/fabric/onelake/onelake-access-api> | ||
if let Some(workspace) = parsed.path_segments().unwrap().next() { | ||
if !workspace.is_empty() { | ||
self.container_name = Some(workspace.to_string()) | ||
} | ||
} | ||
self.use_fabric_endpoint = true.into(); | ||
} | ||
_ => return Err(UrlNotRecognisedSnafu { url }.build().into()), | ||
}, | ||
scheme => return Err(UnknownUrlSchemeSnafu { scheme }.build().into()), | ||
|
@@ -819,6 +862,14 @@ impl MicrosoftAzureBuilder { | |
self | ||
} | ||
|
||
/// Set if Microsoft Fabric url scheme should be used (defaults to false) | ||
/// When disabled the url scheme used is `https://{account}.blob.core.windows.net` | ||
/// When enabled the url scheme used is `https://{account}.dfs.fabric.microsoft.com` | ||
pub fn with_use_fabric_endpoint(mut self, use_fabric_endpoint: bool) -> Self { | ||
self.use_fabric_endpoint = use_fabric_endpoint.into(); | ||
self | ||
} | ||
|
||
/// Sets what protocol is allowed. If `allow_http` is : | ||
/// * false (default): Only HTTPS are allowed | ||
/// * true: HTTP and HTTPS are allowed | ||
|
@@ -885,6 +936,7 @@ impl MicrosoftAzureBuilder { | |
} | ||
|
||
let container = self.container_name.ok_or(Error::MissingContainerName {})?; | ||
|
||
let static_creds = |credential: AzureCredential| -> AzureCredentialProvider { | ||
Arc::new(StaticCredentialProvider::new(credential)) | ||
}; | ||
|
@@ -906,7 +958,11 @@ impl MicrosoftAzureBuilder { | |
(true, url, credential, account_name) | ||
} else { | ||
let account_name = self.account_name.ok_or(Error::MissingAccount {})?; | ||
let account_url = format!("https://{}.blob.core.windows.net", &account_name); | ||
let account_url = match self.use_fabric_endpoint.get()? { | ||
true => format!("https://{}.blob.fabric.microsoft.com", &account_name), | ||
false => format!("https://{}.blob.core.windows.net", &account_name), | ||
}; | ||
|
||
let url = Url::parse(&account_url) | ||
.context(UnableToParseUrlSnafu { url: account_url })?; | ||
|
||
|
@@ -1049,6 +1105,15 @@ mod tests { | |
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert_eq!(builder.container_name, Some("file_system".to_string())); | ||
assert!(!builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder | ||
.parse_url("abfss://[email protected]/") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert_eq!(builder.container_name, Some("file_system".to_string())); | ||
assert!(builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder.parse_url("abfs://container/path").unwrap(); | ||
|
@@ -1067,12 +1132,46 @@ mod tests { | |
.parse_url("https://account.dfs.core.windows.net/") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert!(!builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder | ||
.parse_url("https://account.blob.core.windows.net/") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert!(!builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder | ||
.parse_url("https://account.dfs.fabric.microsoft.com/") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert_eq!(builder.container_name, None); | ||
assert!(builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder | ||
.parse_url("https://account.dfs.fabric.microsoft.com/container") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert_eq!(builder.container_name.as_deref(), Some("container")); | ||
assert!(builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder | ||
.parse_url("https://account.blob.fabric.microsoft.com/") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert_eq!(builder.container_name, None); | ||
assert!(builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let mut builder = MicrosoftAzureBuilder::new(); | ||
builder | ||
.parse_url("https://account.blob.fabric.microsoft.com/container") | ||
.unwrap(); | ||
assert_eq!(builder.account_name, Some("account".to_string())); | ||
assert_eq!(builder.container_name.as_deref(), Some("container")); | ||
assert!(builder.use_fabric_endpoint.get().unwrap()); | ||
|
||
let err_cases = [ | ||
"mailto://account.blob.core.windows.net/", | ||
|