From a9d0e3044efd6966607655b1e1c2d28975f2710f Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Tue, 12 Nov 2024 10:56:56 +0530 Subject: [PATCH] Feat : setup scripts (#181) * feat : added basic scripts for setup and functions * changelog * chore : refactor * chore : refactor implementation * chore : refactored code according to comments * feat : refactor * feat : refactor cron provider --- CHANGELOG.md | 1 + Cargo.lock | 1 + crates/orchestrator/Cargo.toml | 1 + crates/orchestrator/src/alerts/aws_sns/mod.rs | 7 ++ crates/orchestrator/src/alerts/mod.rs | 7 ++ crates/orchestrator/src/cron/event_bridge.rs | 111 ++++++++++++++++++ crates/orchestrator/src/cron/mod.rs | 57 +++++++++ .../src/data_storage/aws_s3/mod.rs | 2 +- crates/orchestrator/src/data_storage/mod.rs | 6 +- crates/orchestrator/src/lib.rs | 5 + crates/orchestrator/src/queue/mod.rs | 81 ++++++++++++- crates/orchestrator/src/queue/sqs/mod.rs | 60 +++++++++- crates/orchestrator/src/setup/mod.rs | 97 +++++++++++++++ crates/orchestrator/src/tests/config.rs | 4 +- crates/utils/src/settings/mod.rs | 2 +- e2e-tests/tests.rs | 2 +- madara-bootstrapper | 2 +- 17 files changed, 436 insertions(+), 10 deletions(-) create mode 100644 crates/orchestrator/src/cron/event_bridge.rs create mode 100644 crates/orchestrator/src/cron/mod.rs create mode 100644 crates/orchestrator/src/setup/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 51a28afb..b7282e47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- setup functions added for cloud and db - panic handling in process job - upgrade ETH L1 bridge for withdrawals to work - added makefile and submodules diff --git a/Cargo.lock b/Cargo.lock index ecd06598..4909a7c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6936,6 +6936,7 @@ dependencies = [ "async-trait", "aws-config", "aws-credential-types", + "aws-sdk-eventbridge", "aws-sdk-s3", "aws-sdk-sns", "aws-sdk-sqs", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index c2c258e0..a62bc239 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -20,6 +20,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] } aws-credential-types = { version = "1.2.1", features = [ "hardcoded-credentials", ] } +aws-sdk-eventbridge.workspace = true aws-sdk-s3 = { workspace = true, features = ["behavior-version-latest"] } aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] } aws-sdk-sqs = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs index c78845ba..3416be91 100644 --- a/crates/orchestrator/src/alerts/aws_sns/mod.rs +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -32,4 +32,11 @@ impl Alerts for AWSSNS { self.client.publish().topic_arn(self.topic_arn.clone()).message(message_body).send().await?; Ok(()) } + + async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> { + let response = self.client.create_topic().name(topic_name).send().await?; + let topic_arn = response.topic_arn().expect("Topic Not found"); + log::info!("SNS topic created. Topic ARN: {}", topic_arn); + Ok(()) + } } diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs index 1e36129d..30220cf8 100644 --- a/crates/orchestrator/src/alerts/mod.rs +++ b/crates/orchestrator/src/alerts/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use mockall::automock; +use utils::settings::Settings; pub mod aws_sns; @@ -8,4 +9,10 @@ pub mod aws_sns; pub trait Alerts: Send + Sync { /// To send an alert message to our alert service async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>; + async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>; + async fn setup(&self, settings_provider: Box) -> color_eyre::Result<()> { + let sns_topic_name = settings_provider.get_settings_or_panic("ALERT_TOPIC_NAME"); + self.create_alert(&sns_topic_name).await?; + Ok(()) + } } diff --git a/crates/orchestrator/src/cron/event_bridge.rs b/crates/orchestrator/src/cron/event_bridge.rs new file mode 100644 index 00000000..9feeb70a --- /dev/null +++ b/crates/orchestrator/src/cron/event_bridge.rs @@ -0,0 +1,111 @@ +use std::time::Duration; + +use async_trait::async_trait; +use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target}; +use aws_sdk_sqs::types::QueueAttributeName; + +use crate::cron::Cron; +use crate::setup::SetupConfig; + +pub struct AWSEventBridge {} + +#[async_trait] +#[allow(unreachable_patterns)] +impl Cron for AWSEventBridge { + async fn create_cron( + &self, + config: &SetupConfig, + cron_time: Duration, + trigger_rule_name: String, + ) -> color_eyre::Result<()> { + let config = match config { + SetupConfig::AWS(config) => config, + _ => panic!("Unsupported Event Bridge configuration"), + }; + let event_bridge_client = aws_sdk_eventbridge::Client::new(config); + event_bridge_client + .put_rule() + .name(&trigger_rule_name) + .schedule_expression(duration_to_rate_string(cron_time)) + .state(RuleState::Enabled) + .send() + .await?; + + Ok(()) + } + async fn add_cron_target_queue( + &self, + config: &SetupConfig, + target_queue_name: String, + message: String, + trigger_rule_name: String, + ) -> color_eyre::Result<()> { + let config = match config { + SetupConfig::AWS(config) => config, + _ => panic!("Unsupported Event Bridge configuration"), + }; + let event_bridge_client = aws_sdk_eventbridge::Client::new(config); + let sqs_client = aws_sdk_sqs::Client::new(config); + let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?; + + let queue_attributes = sqs_client + .get_queue_attributes() + .queue_url(queue_url.queue_url.unwrap()) + .attribute_names(QueueAttributeName::QueueArn) + .send() + .await?; + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap(); + + // Create the EventBridge target with the input transformer + let input_transformer = + InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?; + + event_bridge_client + .put_targets() + .rule(trigger_rule_name) + .targets( + Target::builder() + .id(uuid::Uuid::new_v4().to_string()) + .arn(queue_arn) + .input_transformer(input_transformer) + .build()?, + ) + .send() + .await?; + + Ok(()) + } +} + +fn duration_to_rate_string(duration: Duration) -> String { + let total_secs = duration.as_secs(); + let total_mins = duration.as_secs() / 60; + let total_hours = duration.as_secs() / 3600; + let total_days = duration.as_secs() / 86400; + + if total_days > 0 { + format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" }) + } else if total_hours > 0 { + format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" }) + } else if total_mins > 0 { + format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" }) + } else { + format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" }) + } +} + +#[cfg(test)] +mod event_bridge_utils_test { + use rstest::rstest; + + use super::*; + + #[rstest] + fn test_duration_to_rate_string() { + assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)"); + } +} diff --git a/crates/orchestrator/src/cron/mod.rs b/crates/orchestrator/src/cron/mod.rs new file mode 100644 index 00000000..c2c9085d --- /dev/null +++ b/crates/orchestrator/src/cron/mod.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use async_trait::async_trait; +use lazy_static::lazy_static; + +use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType}; +use crate::setup::SetupConfig; + +pub mod event_bridge; + +lazy_static! { + pub static ref CRON_DURATION: Duration = Duration::from_mins(1); + // TODO : we can take this from clap. + pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue"); + pub static ref WORKER_TRIGGERS: Vec = vec![ + WorkerTriggerType::Snos, + WorkerTriggerType::Proving, + WorkerTriggerType::DataSubmission, + WorkerTriggerType::UpdateState + ]; + pub static ref WORKER_TRIGGER_RULE_NAME: String = String::from("worker_trigger_scheduled"); +} + +#[async_trait] +pub trait Cron { + async fn create_cron( + &self, + config: &SetupConfig, + cron_time: Duration, + trigger_rule_name: String, + ) -> color_eyre::Result<()>; + async fn add_cron_target_queue( + &self, + config: &SetupConfig, + target_queue_name: String, + message: String, + trigger_rule_name: String, + ) -> color_eyre::Result<()>; + async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> { + self.create_cron(&config, *CRON_DURATION, WORKER_TRIGGER_RULE_NAME.clone()).await?; + for triggers in WORKER_TRIGGERS.iter() { + self.add_cron_target_queue( + &config, + TARGET_QUEUE_NAME.clone(), + get_worker_trigger_message(triggers.clone())?, + WORKER_TRIGGER_RULE_NAME.clone(), + ) + .await?; + } + Ok(()) + } +} + +fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result { + let message = WorkerTriggerMessage { worker: worker_trigger_type }; + Ok(serde_json::to_string(&message)?) +} diff --git a/crates/orchestrator/src/data_storage/aws_s3/mod.rs b/crates/orchestrator/src/data_storage/aws_s3/mod.rs index f57d46ae..e1f11a1c 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/mod.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/mod.rs @@ -82,7 +82,7 @@ impl DataStorage for AWSS3 { Ok(()) } - async fn build_test_bucket(&self, bucket_name: &str) -> Result<()> { + async fn create_bucket(&self, bucket_name: &str) -> Result<()> { self.client.create_bucket().bucket(bucket_name).send().await?; Ok(()) } diff --git a/crates/orchestrator/src/data_storage/mod.rs b/crates/orchestrator/src/data_storage/mod.rs index 6a7ba7f4..f85b74f2 100644 --- a/crates/orchestrator/src/data_storage/mod.rs +++ b/crates/orchestrator/src/data_storage/mod.rs @@ -21,7 +21,11 @@ use utils::settings::Settings; pub trait DataStorage: Send + Sync { async fn get_data(&self, key: &str) -> Result; async fn put_data(&self, data: Bytes, key: &str) -> Result<()>; - async fn build_test_bucket(&self, bucket_name: &str) -> Result<()>; + async fn create_bucket(&self, bucket_name: &str) -> Result<()>; + async fn setup(&self, settings_provider: Box) -> Result<()> { + let bucket_name = settings_provider.get_settings_or_panic("STORAGE_BUCKET_NAME"); + self.create_bucket(&bucket_name).await + } } /// **DataStorageConfig** : Trait method to represent the config struct needed for diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index b580f273..86d432b5 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(duration_constructors)] + /// Contains the trait implementations for alerts pub mod alerts; /// Config of the service. Contains configurations for DB, Queues and other services. @@ -5,6 +7,7 @@ pub mod config; pub mod constants; /// Controllers for the routes pub mod controllers; +pub mod cron; /// Contains the trait that implements the fetching functions /// for blob and SNOS data from cloud for a particular block. pub mod data_storage; @@ -20,6 +23,8 @@ pub mod metrics; pub mod queue; /// Contains the routes for the service pub mod routes; +/// Contains setup functions to set up db and cloud. +pub mod setup; /// Contains telemetry collection services. (Metrics/Logs/Traces) pub mod telemetry; #[cfg(test)] diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index a5f513d5..926e493d 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -6,11 +6,82 @@ use std::time::Duration; use async_trait::async_trait; use color_eyre::Result as EyreResult; +use lazy_static::lazy_static; use mockall::automock; use omniqueue::{Delivery, QueueError}; use crate::config::Config; use crate::jobs::JobError; +use crate::setup::SetupConfig; + +#[derive(Clone)] +pub struct DlqConfig<'a> { + pub max_receive_count: i32, + pub dlq_name: &'a str, +} + +#[derive(Clone)] +pub struct QueueConfig<'a> { + pub name: String, + pub visibility_timeout: i32, + pub dlq_config: Option>, +} + +lazy_static! { + pub static ref JOB_HANDLE_FAILURE_QUEUE: String = String::from("madara_orchestrator_job_handle_failure_queue"); + pub static ref QUEUES: Vec> = vec![ + QueueConfig { + name: String::from("madara_orchestrator_snos_job_processing_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_snos_job_verification_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_proving_job_processing_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_proving_job_verification_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_data_submission_job_processing_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_data_submission_job_verification_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_update_state_job_processing_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_update_state_job_verification_queue"), + visibility_timeout: 300, + dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE }) + }, + QueueConfig { + name: String::from("madara_orchestrator_job_handle_failure_queue"), + visibility_timeout: 300, + dlq_config: None + }, + QueueConfig { + name: String::from("madara_orchestrator_worker_trigger_queue"), + visibility_timeout: 300, + dlq_config: None + }, + ]; +} /// Queue Provider Trait /// @@ -21,7 +92,15 @@ use crate::jobs::JobError; #[async_trait] pub trait QueueProvider: Send + Sync { async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> EyreResult<()>; - async fn consume_message_from_queue(&self, queue: String) -> std::result::Result; + async fn consume_message_from_queue(&self, queue: String) -> Result; + async fn create_queue<'a>(&self, queue_config: &QueueConfig<'a>, config: &SetupConfig) -> EyreResult<()>; + async fn setup(&self, config: SetupConfig) -> EyreResult<()> { + // Creating the queues : + for queue in QUEUES.iter() { + self.create_queue(queue, &config).await?; + } + Ok(()) + } } pub async fn init_consumers(config: Arc) -> Result<(), JobError> { diff --git a/crates/orchestrator/src/queue/sqs/mod.rs b/crates/orchestrator/src/queue/sqs/mod.rs index dc20d21d..dc392881 100644 --- a/crates/orchestrator/src/queue/sqs/mod.rs +++ b/crates/orchestrator/src/queue/sqs/mod.rs @@ -2,6 +2,9 @@ use std::collections::HashMap; use std::time::Duration; use async_trait::async_trait; +use aws_sdk_sqs::types::QueueAttributeName; +use aws_sdk_sqs::Client; +use color_eyre::eyre::eyre; use color_eyre::Result; use lazy_static::lazy_static; use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer}; @@ -14,7 +17,9 @@ use crate::queue::job_queue::{ PROVING_JOB_VERIFICATION_QUEUE, SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE, UPDATE_STATE_JOB_PROCESSING_QUEUE, UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, }; -use crate::queue::QueueProvider; +use crate::queue::{QueueConfig, QueueProvider}; +use crate::setup::SetupConfig; + pub struct SqsQueue; lazy_static! { @@ -35,6 +40,7 @@ lazy_static! { ]); } +#[allow(unreachable_patterns)] #[async_trait] impl QueueProvider for SqsQueue { async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> Result<()> { @@ -54,6 +60,58 @@ impl QueueProvider for SqsQueue { let mut consumer = get_consumer(queue_url).await?; consumer.receive().await } + + async fn create_queue<'a>(&self, queue_config: &QueueConfig<'a>, config: &SetupConfig) -> Result<()> { + let config = match config { + SetupConfig::AWS(config) => config, + _ => panic!("Unsupported SQS configuration"), + }; + let sqs_client = Client::new(config); + let res = sqs_client.create_queue().queue_name(&queue_config.name).send().await?; + let queue_url = res.queue_url().ok_or_else(|| eyre!("Not able to get queue url from result"))?; + + let mut attributes = HashMap::new(); + attributes.insert(QueueAttributeName::VisibilityTimeout, queue_config.visibility_timeout.to_string()); + + if let Some(dlq_config) = &queue_config.dlq_config { + let dlq_url = Self::get_queue_url_from_client(dlq_config.dlq_name, &sqs_client).await?; + let dlq_arn = Self::get_queue_arn(&sqs_client, &dlq_url).await?; + let policy = format!( + r#"{{"deadLetterTargetArn":"{}","maxReceiveCount":"{}"}}"#, + dlq_arn, &dlq_config.max_receive_count + ); + attributes.insert(QueueAttributeName::RedrivePolicy, policy); + } + + sqs_client.set_queue_attributes().queue_url(queue_url).set_attributes(Some(attributes)).send().await?; + + Ok(()) + } +} + +impl SqsQueue { + /// To get the queue url from the given queue name + async fn get_queue_url_from_client(queue_name: &str, sqs_client: &Client) -> Result { + Ok(sqs_client + .get_queue_url() + .queue_name(queue_name) + .send() + .await? + .queue_url() + .ok_or_else(|| eyre!("Unable to get queue url from the given queue_name."))? + .to_string()) + } + + async fn get_queue_arn(client: &Client, queue_url: &str) -> Result { + let attributes = client + .get_queue_attributes() + .queue_url(queue_url) + .attribute_names(QueueAttributeName::QueueArn) + .send() + .await?; + + Ok(attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap().to_string()) + } } /// To fetch the queue URL from the environment variables diff --git a/crates/orchestrator/src/setup/mod.rs b/crates/orchestrator/src/setup/mod.rs new file mode 100644 index 00000000..317453a2 --- /dev/null +++ b/crates/orchestrator/src/setup/mod.rs @@ -0,0 +1,97 @@ +use std::process::Command; +use std::sync::Arc; + +use aws_config::environment::EnvironmentVariableCredentialsProvider; +use aws_config::{from_env, Region, SdkConfig}; +use aws_credential_types::provider::ProvideCredentials; +use utils::env_utils::get_env_var_or_panic; +use utils::settings::env::EnvSettingsProvider; + +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; +use crate::config::{get_aws_config, ProviderConfig}; +use crate::cron::Cron; +use crate::data_storage::aws_s3::AWSS3; +use crate::data_storage::DataStorage; +use crate::queue::QueueProvider; + +#[derive(Clone)] +pub enum SetupConfig { + AWS(SdkConfig), +} + +pub enum ConfigType { + AWS, +} + +async fn setup_config(client_type: ConfigType) -> SetupConfig { + match client_type { + ConfigType::AWS => { + let region_provider = Region::new(get_env_var_or_panic("AWS_REGION")); + let creds = EnvironmentVariableCredentialsProvider::new().provide_credentials().await.unwrap(); + SetupConfig::AWS(from_env().region(region_provider).credentials_provider(creds).load().await) + } + } +} + +// TODO : move this to main.rs after moving to clap. +pub async fn setup_cloud() -> color_eyre::Result<()> { + log::info!("Setting up cloud."); + let settings_provider = EnvSettingsProvider {}; + let provider_config = Arc::new(ProviderConfig::AWS(Box::new(get_aws_config(&settings_provider).await))); + + log::info!("Setting up data storage."); + match get_env_var_or_panic("DATA_STORAGE").as_str() { + "s3" => { + let s3 = Box::new(AWSS3::new_with_settings(&settings_provider, provider_config.clone()).await); + s3.setup(Box::new(settings_provider.clone())).await? + } + _ => panic!("Unsupported Storage Client"), + } + log::info!("Data storage setup completed ✅"); + + log::info!("Setting up queues"); + match get_env_var_or_panic("QUEUE_PROVIDER").as_str() { + "sqs" => { + let config = setup_config(ConfigType::AWS).await; + let sqs = Box::new(crate::queue::sqs::SqsQueue {}); + sqs.setup(config).await? + } + _ => panic!("Unsupported Queue Client"), + } + log::info!("Queues setup completed ✅"); + + log::info!("Setting up cron"); + match get_env_var_or_panic("CRON_PROVIDER").as_str() { + "event_bridge" => { + let config = setup_config(ConfigType::AWS).await; + let event_bridge = Box::new(crate::cron::event_bridge::AWSEventBridge {}); + event_bridge.setup(config).await? + } + _ => panic!("Unsupported Event Bridge Client"), + } + log::info!("Cron setup completed ✅"); + + log::info!("Setting up alerts."); + match get_env_var_or_panic("ALERTS").as_str() { + "sns" => { + let sns = Box::new(AWSSNS::new_with_settings(&settings_provider, provider_config).await); + sns.setup(Box::new(settings_provider)).await? + } + _ => panic!("Unsupported Alert Client"), + } + log::info!("Alerts setup completed ✅"); + + Ok(()) +} + +pub async fn setup_db() -> color_eyre::Result<()> { + // We run the js script in the folder root: + log::info!("Setting up database."); + + Command::new("node").arg("migrate-mongo-config.js").output()?; + + log::info!("Database setup completed ✅"); + + Ok(()) +} diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 1737614b..637a64f9 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -367,9 +367,7 @@ pub mod implement_client { ConfigType::Actual => { let storage = get_storage_client(provider_config).await; match get_env_var_or_panic("DATA_STORAGE").as_str() { - "s3" => { - storage.as_ref().build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap() - } + "s3" => storage.as_ref().create_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(), _ => panic!("Unsupported Storage Client"), } storage diff --git a/crates/utils/src/settings/mod.rs b/crates/utils/src/settings/mod.rs index 05b0d787..b6fa3bba 100644 --- a/crates/utils/src/settings/mod.rs +++ b/crates/utils/src/settings/mod.rs @@ -6,6 +6,6 @@ pub enum SettingsProviderError { Internal(#[source] Box), } -pub trait Settings { +pub trait Settings: Send { fn get_settings_or_panic(&self, name: &'static str) -> String; } diff --git a/e2e-tests/tests.rs b/e2e-tests/tests.rs index 252ac151..602e383c 100644 --- a/e2e-tests/tests.rs +++ b/e2e-tests/tests.rs @@ -437,6 +437,6 @@ pub async fn put_job_data_in_db_proving(mongo_db: &MongoDbServer, l2_block_numbe /// To set up s3 files needed for e2e test (test_orchestrator_workflow) #[allow(clippy::borrowed_box)] pub async fn setup_s3(s3_client: &Box) -> color_eyre::Result<()> { - s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); + s3_client.create_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); Ok(()) } diff --git a/madara-bootstrapper b/madara-bootstrapper index f717bf17..b0b64750 160000 --- a/madara-bootstrapper +++ b/madara-bootstrapper @@ -1 +1 @@ -Subproject commit f717bf179581da53d68fee03b50ef78e0628ee20 +Subproject commit b0b647500c2ae3e3b0d99e345fa652989bca4726