diff --git a/src/config/loading/mod.rs b/src/config/loading/mod.rs index 8be90c6c66c75..981671f65e4f1 100644 --- a/src/config/loading/mod.rs +++ b/src/config/loading/mod.rs @@ -145,6 +145,7 @@ pub async fn load_from_paths_with_provider_and_secrets( debug!(message = "Secret placeholders found, retrieving secrets from configured backends."); let resolved_secrets = secrets_backends_loader .retrieve(&mut signal_handler.subscribe()) + .await .map_err(|e| vec![e])?; load_builder_from_paths_with_secrets(config_paths, resolved_secrets)? } else { diff --git a/src/config/loading/secret.rs b/src/config/loading/secret.rs index 2e7bd1ee3a8f4..266d811c9b62e 100644 --- a/src/config/loading/secret.rs +++ b/src/config/loading/secret.rs @@ -3,6 +3,7 @@ use std::{ io::Read, }; +use futures::TryFutureExt; use indexmap::IndexMap; use once_cell::sync::Lazy; use regex::{Captures, Regex}; @@ -51,31 +52,35 @@ impl SecretBackendLoader { } } - pub(crate) fn retrieve( + pub(crate) async fn retrieve( &mut self, signal_rx: &mut signal::SignalRx, ) -> Result, String> { - let secrets = self.secret_keys.iter().flat_map(|(backend_name, keys)| { - match self.backends.get_mut(&ComponentKey::from(backend_name.clone())) { - None => { - vec![Err(format!("Backend \"{}\" is required for secret retrieval but was not found in config.", backend_name))] - }, - Some(backend) => { - debug!(message = "Retrieving secret from a backend.", backend = ?backend_name); - match backend.retrieve(keys.clone(), signal_rx) { - Err(e) => { - vec![Err(format!("Error while retrieving secret from backend \"{}\": {}.", backend_name, e))] - }, - Ok(s) => { - s.into_iter().map(|(k, v)| { - trace!(message = "Successfully retrieved a secret.", backend = ?backend_name, secret_key = ?k); - Ok((format!("{}.{}", backend_name, k), v)) - }).collect::>>() - } - } - }, - } - }).collect::, String>>()?; + let mut secrets: HashMap = HashMap::new(); + + for (backend_name, keys) in &self.secret_keys { + let backend = self.backends.get_mut(&ComponentKey::from(backend_name.clone())).ok_or_else(|| format!("Backend \"{}\" is required for secret retrieval but was not found in config.", backend_name))?; + + debug!(message = "Retrieving secrets from a backend.", backend = ?backend_name, keys = ?keys); + let backend_secrets = backend + .retrieve(keys.clone(), signal_rx) + .map_ok(|backend_secrets| { + backend_secrets.into_iter().map(|(k, v)| { + trace!(message = "Successfully retrieved a secret.", backend = ?backend_name, key = ?k); + (format!("{}.{}", backend_name, k), v) + }).collect::>() + }) + .map_err(|e| { + format!( + "Error while retrieving secret from backend \"{}\": {}.", + backend_name, e + ) + }) + .await?; + + secrets.extend(backend_secrets); + } + Ok(secrets) } diff --git a/src/config/secret.rs b/src/config/secret.rs index 7bf7d66fd4d42..90bc331ec0bd9 100644 --- a/src/config/secret.rs +++ b/src/config/secret.rs @@ -8,7 +8,7 @@ use crate::signal; /// Generalized interface to a secret backend. #[enum_dispatch] pub trait SecretBackend: NamedComponent + core::fmt::Debug + Send + Sync { - fn retrieve( + async fn retrieve( &mut self, secret_keys: HashSet, signal_rx: &mut signal::SignalRx, diff --git a/src/config/unit_test/mod.rs b/src/config/unit_test/mod.rs index 6d11aae481f5f..5354fbe316b58 100644 --- a/src/config/unit_test/mod.rs +++ b/src/config/unit_test/mod.rs @@ -105,6 +105,7 @@ pub async fn build_unit_tests_main( let config_builder = if secrets_backends_loader.has_secrets_to_retrieve() { let resolved_secrets = secrets_backends_loader .retrieve(&mut signal_handler.subscribe()) + .await .map_err(|e| vec![e])?; loading::load_builder_from_paths_with_secrets(paths, resolved_secrets)? } else { diff --git a/src/secrets/aws_secrets_manager.rs b/src/secrets/aws_secrets_manager.rs index 7267cc1ab588f..f3c221b42eee0 100644 --- a/src/secrets/aws_secrets_manager.rs +++ b/src/secrets/aws_secrets_manager.rs @@ -1,7 +1,6 @@ use std::collections::{HashMap, HashSet}; use aws_sdk_secretsmanager::{config, Client}; -use futures::executor; use vector_lib::configurable::{component::GenerateConfig, configurable_component}; use crate::aws::{create_client, AwsAuthentication, ClientBuilder, RegionOrEndpoint}; @@ -52,30 +51,26 @@ impl GenerateConfig for AwsSecretsManagerBackend { } impl SecretBackend for AwsSecretsManagerBackend { - fn retrieve( + async fn retrieve( &mut self, secret_keys: HashSet, _: &mut signal::SignalRx, ) -> crate::Result> { - let client = executor::block_on(async { - create_client::( - &self.auth, - self.region.region(), - self.region.endpoint(), - &ProxyConfig::default(), - &self.tls, - &None, - ) - .await - })?; + let client = create_client::( + &self.auth, + self.region.region(), + self.region.endpoint(), + &ProxyConfig::default(), + &self.tls, + &None, + ) + .await?; - let get_secret_value_response = executor::block_on(async { - client - .get_secret_value() - .secret_id(&self.secret_id) - .send() - .await - })?; + let get_secret_value_response = client + .get_secret_value() + .secret_id(&self.secret_id) + .send() + .await?; let secret_string = get_secret_value_response .secret_string diff --git a/src/secrets/exec.rs b/src/secrets/exec.rs index 0c192adffd583..bf300c061013c 100644 --- a/src/secrets/exec.rs +++ b/src/secrets/exec.rs @@ -58,7 +58,7 @@ struct ExecResponse { } impl SecretBackend for ExecBackend { - fn retrieve( + async fn retrieve( &mut self, secret_keys: HashSet, signal_rx: &mut signal::SignalRx, diff --git a/src/secrets/test.rs b/src/secrets/test.rs index 920fbe1ba2adc..3185d93a5feb2 100644 --- a/src/secrets/test.rs +++ b/src/secrets/test.rs @@ -15,7 +15,7 @@ pub struct TestBackend { impl_generate_config_from_default!(TestBackend); impl SecretBackend for TestBackend { - fn retrieve( + async fn retrieve( &mut self, secret_keys: HashSet, _: &mut signal::SignalRx,