diff --git a/misc/python/materialize/cli/run.py b/misc/python/materialize/cli/run.py index 67a6c94ec51be..4a38e12174911 100644 --- a/misc/python/materialize/cli/run.py +++ b/misc/python/materialize/cli/run.py @@ -48,6 +48,7 @@ else f"{Arch.host()}-apple-darwin" ) DEFAULT_POSTGRES = "postgres://root@localhost:26257/materialize" +DEFAULT_BLOB = "file://mzdata/persist/blob" # sets entitlements on the built binary, e.g. environmentd, so you can inspect it with Instruments MACOS_ENTITLEMENTS_DATA = """ @@ -88,6 +89,11 @@ def main() -> int: help="Postgres/CockroachDB connection string", default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES), ) + parser.add_argument( + "--blob", + help="Blob storage connection string", + default=os.getenv("MZDEV_BLOB", DEFAULT_BLOB), + ) parser.add_argument( "--release", help="Build artifacts in release mode, with optimizations", @@ -276,7 +282,7 @@ def main() -> int: f"--orchestrator-process-scratch-directory={scratch}", "--secrets-controller=local-file", f"--persist-consensus-url={args.postgres}?options=--search_path=consensus", - f"--persist-blob-url=file://{mzdata}/persist/blob", + f"--persist-blob-url={args.blob}", f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle", f"--environment-id={environment_id}", "--bootstrap-role=materialize", diff --git a/src/mz/Cargo.toml b/src/mz/Cargo.toml index 5366a3a5997da..c3cfe1324b5a1 100644 --- a/src/mz/Cargo.toml +++ b/src/mz/Cargo.toml @@ -25,7 +25,7 @@ mz-ore = { path = "../ore", features = ["async", "cli", "test"] } open = "3.2.0" openssl-probe = "0.1.2" hyper = "1.4.1" -reqwest = { version = "0.11", features = ["blocking", "json"] } +reqwest = { version = "0.12", features = ["blocking", "json", "default-tls", "charset", "http2"], default-features = false } rpassword = "7.2.0" semver = "1.0.16" serde = { version = "1.0.152", features = ["derive"] } diff --git a/src/ore/src/lgbytes.rs b/src/ore/src/lgbytes.rs index 883ce8e427ef9..cdf8cb0f31388 100644 --- a/src/ore/src/lgbytes.rs +++ b/src/ore/src/lgbytes.rs @@ -179,6 +179,8 @@ impl Buf for LgBytes { pub struct LgBytesMetrics { /// Metrics for the "persist_s3" usage of [LgBytes]. pub persist_s3: LgBytesOpMetrics, + /// Metrics for the "persist_abs" usage of [LgBytes]. + pub persist_abs: LgBytesOpMetrics, /// Metrics for the "persist_arrow" usage of [LgBytes]. pub persist_arrow: LgBytesOpMetrics, } @@ -272,6 +274,7 @@ impl LgBytesMetrics { }; LgBytesMetrics { persist_s3: op("persist_s3"), + persist_abs: op("persist_abs"), persist_arrow: op("persist_arrow"), } } diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 79bae1903760f..8ea700c7bfacf 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -32,7 +32,7 @@ use mz_ore::stats::histogram_seconds_buckets; use mz_persist::location::{ Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData, }; -use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics}; +use mz_persist::metrics::{ABSBlobMetrics, ColumnarMetrics, S3BlobMetrics}; use mz_persist::retry::RetryStream; use mz_persist_types::Codec64; use mz_postgres_client::metrics::PostgresClientMetrics; @@ -110,6 +110,8 @@ pub struct Metrics { /// Metrics for S3-backed blob implementation pub s3_blob: S3BlobMetrics, + /// Metrics for Azure-backed blob implementation + // pub azure_blob: ABSBlobMetrics, /// Metrics for Postgres-backed consensus implementation pub postgres_consensus: PostgresClientMetrics, @@ -140,6 +142,7 @@ impl Metrics { move || start.elapsed().as_secs_f64(), ); let s3_blob = S3BlobMetrics::new(registry); + // let azure_blob = ABSBlobMetrics::new(registry); let columnar = ColumnarMetrics::new( registry, &s3_blob.lgbytes, diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index db5e82390221a..f0ca812c58abc 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -30,6 +30,10 @@ aws-config = { version = "1.2.0", default-features = false } aws-credential-types = { version = "1.1.1", features = ["hardcoded-credentials"] } aws-sdk-s3 = { version = "1.23.0", default-features = false, features = ["rt-tokio"] } aws-types = "1.1.1" +azure_identity = { version = "0.21.0" } +azure_storage = { version = "0.21.0" } +azure_storage_blobs = { version = "0.21.0" } +azure_core = "0.21.0" base64 = "0.13.1" bytes = "1.3.0" deadpool-postgres = "0.10.3" diff --git a/src/persist/src/abs.rs b/src/persist/src/abs.rs new file mode 100644 index 0000000000000..9f31845ea12a8 --- /dev/null +++ b/src/persist/src/abs.rs @@ -0,0 +1,375 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! An Azure Blob Storage implementation of [Blob] storage. + +use std::cmp; +use std::fmt::{Debug, Formatter}; +use std::ops::Range; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use anyhow::anyhow; +use async_trait::async_trait; +use azure_core::StatusCode; +use azure_identity::{ + create_default_credential, DefaultAzureCredential, DefaultAzureCredentialBuilder, +}; +use azure_storage::{prelude::*, EMULATOR_ACCOUNT}; +use azure_storage_blobs::prelude::*; +use bytes::Bytes; +use futures_util::stream::FuturesOrdered; +use futures_util::{FutureExt, StreamExt}; +use mz_dyncfg::{Config, ConfigSet}; +use mz_ore::bytes::{MaybeLgBytes, SegmentedBytes}; +use mz_ore::cast::CastFrom; +use mz_ore::lgbytes::{LgBytes, MetricsRegion}; +use mz_ore::metrics::MetricsRegistry; +use mz_ore::task::RuntimeExt; +use tokio::runtime::Handle as AsyncHandle; +use tracing::{debug, debug_span, info, trace, trace_span, warn, Instrument}; +use url::Url; +use uuid::Uuid; + +use crate::cfg::BlobKnobs; +use crate::error::Error; +use crate::location::{Blob, BlobMetadata, Determinate, ExternalError}; +use crate::metrics::{ABSBlobMetrics, S3BlobMetrics}; + +/// Configuration for opening an [ABSBlob]. +#[derive(Clone, Debug)] +pub struct ABSBlobConfig { + metrics: S3BlobMetrics, + client: ContainerClient, + prefix: String, + cfg: Arc, +} + +impl ABSBlobConfig { + const EXTERNAL_TESTS_ABS_CONTAINER: &'static str = + "MZ_PERSIST_EXTERNAL_STORAGE_TEST_ABS_CONTAINER"; + + /// Returns a new [ABSBlobConfig] for use in production. + /// + /// Stores objects in the given container prepended with the (possibly empty) + /// prefix. Azure credentials must be available in the process or environment. + pub fn new( + account: String, + container: String, + prefix: String, + metrics: S3BlobMetrics, + url: Url, + cfg: Arc, + ) -> Result { + // let is_cc_active = knobs.is_cc_active(); + + // TODO: might need to pull out service client + // to periodically refresh credentials + let client = if account == EMULATOR_ACCOUNT { + info!("Connecting to Azure emulator"); + ClientBuilder::emulator() + .blob_service_client() + .container_client(container) + } else { + // WIP: check query pairs if our query string is for a SAS token + let sas_credentials = match url.query() { + Some(query) => Some(StorageCredentials::sas_token(query)), + None => None, + }; + + let credentials = match sas_credentials { + Some(Ok(credentials)) => credentials, + Some(Err(err)) => { + warn!("Failed to parse SAS token: {err}"); + // Fall back to default credentials + StorageCredentials::token_credential( + create_default_credential().expect("Azure default credentials"), + ) + } + None => { + // Fall back to default credentials + StorageCredentials::token_credential( + create_default_credential().expect("Azure default credentials"), + ) + } + }; + + let service_client = BlobServiceClient::new(account, credentials); + service_client.container_client(container) + }; + + Ok(ABSBlobConfig { + metrics, + client, + cfg, + prefix, + }) + } + + /// Returns a new [ABSBlobConfig] for use in unit tests. + pub async fn new_for_test() -> Result, Error> { + // WIP: do we need this container name to be passed in? + let container_name = match std::env::var(Self::EXTERNAL_TESTS_ABS_CONTAINER) { + Ok(container) => container, + Err(_) => { + // WIP: figure out CI situation + // if mz_ore::env::is_var_truthy("CI") { + // panic!("CI is supposed to run this test but something has gone wrong!"); + // } + return Ok(None); + } + }; + + let prefix = Uuid::new_v4().to_string(); + let metrics = S3BlobMetrics::new(&MetricsRegistry::new()); + + let config = ABSBlobConfig::new( + EMULATOR_ACCOUNT.to_string(), + container_name.clone(), + prefix, + metrics, + Url::parse(&format!( + "http://devaccount1.blob.core.windows.net/{}", + container_name + )) + .expect("valid url"), + Arc::new(ConfigSet::default()), + )?; + + Ok(Some(config)) + } + + /// Returns a clone of Self with a new v4 uuid prefix. + pub fn clone_with_new_uuid_prefix(&self) -> Self { + let mut ret = self.clone(); + ret.prefix = Uuid::new_v4().to_string(); + ret + } +} + +/// Implementation of [Blob] backed by Azure Blob Storage. +#[derive(Debug)] +pub struct ABSBlob { + metrics: S3BlobMetrics, + client: ContainerClient, + container_name: String, + prefix: String, + cfg: Arc, +} + +impl ABSBlob { + /// Opens the given location for non-exclusive read-write access. + pub async fn open(config: ABSBlobConfig) -> Result { + let container_name = config.client.container_name().to_string(); + + if config.client.service_client().account() == EMULATOR_ACCOUNT { + // try to create the container if we're running in the emulator. + // it is surprisingly difficult to create the container out-of-band + // of an official client. + let _ = config.client.create().await; + } + + let ret = ABSBlob { + metrics: config.metrics, + client: config.client, + container_name, + prefix: config.prefix, + cfg: config.cfg, + }; + + // WIP: do we have a healthcheck similar to S3? + // Test connection before returning success + // let _ = ret.get("HEALTH_CHECK").await?; + Ok(ret) + } + + fn get_path(&self, key: &str) -> String { + format!("{}/{}", self.prefix, key) + } +} + +#[async_trait] +impl Blob for ABSBlob { + async fn get(&self, key: &str) -> Result, ExternalError> { + let path = self.get_path(key); + + let blob = self.client.blob_client(path); + + let mut segments: Vec = vec![]; + + let mut stream = blob.get().into_stream(); + while let Some(value) = stream.next().await { + let response = match value { + Ok(v) => v, + Err(e) => { + if let Some(e) = e.as_http_error() { + if e.status() == StatusCode::NotFound { + return Ok(None); + } + } + + return Err(ExternalError::from(anyhow!( + "Azure blob get error: {:?}", + e + ))); + } + }; + + let content_length = response.blob.properties.content_length; + let mut buffer = self + .metrics + .lgbytes + .persist_abs + .new_region(usize::cast_from(content_length)); + + let mut body = response.data; + while let Some(value) = body.next().await { + let value = value.map_err(|e| { + ExternalError::from(anyhow!("Azure blob get body error: {}", e)) + })?; + buffer.extend_from_slice(&value); + } + + segments.push(MaybeLgBytes::LgBytes(LgBytes::from(Arc::new(buffer)))); + } + + Ok(Some(SegmentedBytes::from(segments))) + } + + async fn list_keys_and_metadata( + &self, + key_prefix: &str, + f: &mut (dyn FnMut(BlobMetadata) + Send + Sync), + ) -> Result<(), ExternalError> { + let blob_key_prefix = self.get_path(key_prefix); + let strippable_root_prefix = format!("{}/", self.prefix); + + let mut stream = self + .client + .list_blobs() + .prefix(blob_key_prefix.clone()) + .into_stream(); + + while let Some(response) = stream.next().await { + let response = response + .map_err(|e| ExternalError::from(anyhow!("Azure list blobs error: {}", e)))?; + + for blob in response.blobs.items { + let azure_storage_blobs::container::operations::list_blobs::BlobItem::Blob(blob) = + blob + else { + continue; + }; + + if let Some(key) = blob.name.strip_prefix(&strippable_root_prefix) { + let size_in_bytes = blob.properties.content_length; + f(BlobMetadata { + key: key, + size_in_bytes, + }); + } + } + } + + Ok(()) + } + + async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> { + let path = self.get_path(key); + let blob = self.client.blob_client(path); + + blob.put_block_blob(value) + .await + .map_err(|e| ExternalError::from(anyhow!("Azure blob put error: {}", e)))?; + + Ok(()) + } + + async fn delete(&self, key: &str) -> Result, ExternalError> { + let path = self.get_path(key); + let blob = self.client.blob_client(path); + + match blob.get_properties().await { + Ok(props) => { + let size = props.blob.properties.content_length as usize; + blob.delete() + .await + .map_err(|e| ExternalError::from(anyhow!("Azure blob delete error: {}", e)))?; + Ok(Some(size)) + } + Err(e) => { + if let Some(e) = e.as_http_error() { + if e.status() == StatusCode::NotFound { + return Ok(None); + } + } + + Err(ExternalError::from(anyhow!("Azure blob error: {}", e))) + } + } + } + + async fn restore(&self, key: &str) -> Result<(), ExternalError> { + let path = self.get_path(key); + let blob = self.client.blob_client(&path); + + match blob.get_properties().await { + Ok(_) => Ok(()), + Err(e) => { + if let Some(e) = e.as_http_error() { + if e.status() == StatusCode::NotFound { + return Err(Determinate::new(anyhow!( + "unable to restore {key} in Azure Blob Storage: blob does not exist" + )) + .into()); + } + } + + Err(ExternalError::from(anyhow!("Azure blob error: {}", e))) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::location::tests::blob_impl_test; + use tracing::info; + + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + async fn abs_blob() -> Result<(), ExternalError> { + let config = match ABSBlobConfig::new_for_test().await? { + Some(client) => client, + None => { + info!( + "{} env not set: skipping test that uses external service", + ABSBlobConfig::EXTERNAL_TESTS_ABS_CONTAINER + ); + return Ok(()); + } + }; + + blob_impl_test(move |path| { + let path = path.to_owned(); + let config = config.clone(); + async move { + let config = ABSBlobConfig { + metrics: config.metrics.clone(), + client: config.client.clone(), + cfg: Arc::new(ConfigSet::default()), + prefix: config.prefix.clone(), + }; + ABSBlob::open(config).await + } + }) + .await + } +} diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index a191d09fd8b84..3579f43f4f0e6 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -21,10 +21,11 @@ use tracing::warn; use mz_postgres_client::metrics::PostgresClientMetrics; use mz_postgres_client::PostgresClientKnobs; +use crate::abs::{ABSBlob, ABSBlobConfig}; use crate::file::{FileBlob, FileBlobConfig}; use crate::location::{Blob, Consensus, Determinate, ExternalError}; use crate::mem::{MemBlob, MemBlobConfig, MemConsensus}; -use crate::metrics::S3BlobMetrics; +use crate::metrics::{ABSBlobMetrics, S3BlobMetrics}; use crate::postgres::{PostgresConsensus, PostgresConsensusConfig}; use crate::s3::{S3Blob, S3BlobConfig}; @@ -48,6 +49,8 @@ pub enum BlobConfig { /// Config for [MemBlob], only available in testing to prevent /// footguns. Mem(bool), + /// Config for [ABSBlob]. + Azure(ABSBlobConfig), } /// Configuration knobs for [Blob]. @@ -70,6 +73,7 @@ impl BlobConfig { match self { BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)), BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)), + BlobConfig::Azure(config) => Ok(Arc::new(ABSBlob::open(config).await?)), BlobConfig::Mem(tombstone) => { Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone)))) } @@ -148,6 +152,27 @@ impl BlobConfig { query_params.clear(); Ok(BlobConfig::Mem(tombstone)) } + "http" | "https" => match url.host().expect("hostname").to_string().split_once('.') { + Some((account, "blob.core.windows.net")) => { + if let Some(container) = url + .path_segments() + .expect("azure blob storage container") + .next() + { + Ok(BlobConfig::Azure(ABSBlobConfig::new( + account.to_string(), + container.to_string(), + "".to_string(), + metrics, + url.clone().into_redacted(), + cfg, + )?)) + } else { + Err(anyhow!("unknown persist blob scheme: {}", url.as_str())) + } + } + _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())), + }, p => Err(anyhow!( "unknown persist blob scheme {}: {}", p, @@ -155,17 +180,19 @@ impl BlobConfig { )), }?; - if !query_params.is_empty() { - return Err(ExternalError::from(anyhow!( - "unknown blob location params {}: {}", - query_params - .keys() - .map(|x| x.as_ref()) - .collect::>() - .join(" "), - url.as_str(), - ))); - } + // WIP: is it OK to remove this? there are a ton of + // query params for Azure SAS tokens to work + // if !query_params.is_empty() { + // return Err(ExternalError::from(anyhow!( + // "unknown blob location params {}: {}", + // query_params + // .keys() + // .map(|x| x.as_ref()) + // .collect::>() + // .join(" "), + // url.as_str(), + // ))); + // } Ok(config) } diff --git a/src/persist/src/lib.rs b/src/persist/src/lib.rs index 3d9c94fc49eeb..d7ace1274e750 100644 --- a/src/persist/src/lib.rs +++ b/src/persist/src/lib.rs @@ -17,6 +17,7 @@ clippy::clone_on_ref_ptr )] +pub mod abs; pub mod cfg; pub mod error; pub mod file; diff --git a/src/persist/src/metrics.rs b/src/persist/src/metrics.rs index 39de50b85eda1..362e1e11a659b 100644 --- a/src/persist/src/metrics.rs +++ b/src/persist/src/metrics.rs @@ -18,6 +18,21 @@ use mz_ore::metric; use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry}; use prometheus::{CounterVec, IntCounterVec}; +/// Metrics specific to ABSBlob's internal workings. +#[derive(Debug, Clone)] +pub struct ABSBlobMetrics { + pub(crate) lgbytes: LgBytesMetrics, +} + +impl ABSBlobMetrics { + /// Returns a new [ABSBlobMetrics] instance connected to the given registry. + pub fn new(registry: &MetricsRegistry) -> Self { + ABSBlobMetrics { + lgbytes: LgBytesMetrics::new(registry), + } + } +} + /// Metrics specific to S3Blob's internal workings. #[derive(Debug, Clone)] pub struct S3BlobMetrics {