Skip to content

Commit

Permalink
Merge branch 'main' into feature/clap
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Nov 12, 2024
2 parents ef3127c + a9d0e30 commit abf8df2
Show file tree
Hide file tree
Showing 25 changed files with 612 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## Added

- Added cli args support for all the services
- Setup functions added for cloud and db
- panic handling in process job
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] }
aws-credential-types = { version = "1.2.1", features = [
"hardcoded-credentials",
] }
aws-sdk-eventbridge.workspace = true
aws-sdk-s3 = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ impl Alerts for AWSSNS {
self.client.publish().topic_arn(self.topic_arn.clone()).message(message_body).send().await?;
Ok(())
}

async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> {
let response = self.client.create_topic().name(topic_name).send().await?;
let topic_arn = response.topic_arn().expect("Topic Not found");
log::info!("SNS topic created. Topic ARN: {}", topic_arn);
Ok(())
}
}
12 changes: 12 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
use async_trait::async_trait;
use mockall::automock;

use crate::cli::alert::AlertParams;

pub mod aws_sns;

#[automock]
#[async_trait]
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>;
async fn setup(&self, params: AlertParams) -> color_eyre::Result<()> {
match params {
AlertParams::AWSSNS(aws_sns_params) => {
let sns_topic_name = aws_sns_params.get_topic_name();
self.create_alert(&sns_topic_name).await?;
}
}
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/orchestrator/src/cli/aws_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct AWSConfigCliArgs {
pub aws_region: String,
}

#[derive(Debug, Clone)]
pub struct AWSConfigParams {
pub aws_access_key_id: String,
pub aws_secret_access_key: String,
Expand Down
22 changes: 22 additions & 0 deletions crates/orchestrator/src/cli/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use clap::Args;

/// Parameters used to config AWS SNS.
#[derive(Debug, Clone, Args)]
#[group()]
pub struct AWSEventBridgeCliArgs {
/// Use the AWS Event Bridge client
#[arg(long)]
pub aws_event_bridge: bool,

/// The name of the S3 bucket.
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-target-queue-name"))]
pub target_queue_name: Option<String>,

/// The cron time for the event bridge trigger rule.
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("madara-orchestrator-event-bridge-cron-time"))]
pub cron_time: Option<String>,

/// The name of the event bridge trigger rule.
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-trigger-rule-name"))]
pub trigger_rule_name: Option<String>,
}
8 changes: 8 additions & 0 deletions crates/orchestrator/src/cli/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::alerts::aws_sns::AWSSNSValidatedArgs;

pub mod event_bridge;

#[derive(Clone, Debug)]
pub enum CronParams {
AWSSNS(AWSSNSValidatedArgs),
}
43 changes: 43 additions & 0 deletions crates/orchestrator/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::time::Duration;

use alert::AlertParams;
use aws_config::{AWSConfigCliArgs, AWSConfigParams};
use clap::{ArgGroup, Parser};
use cron::event_bridge::AWSEventBridgeCliArgs;
use da::DaParams;
use database::DatabaseParams;
use ethereum_da_client::EthereumDaValidatedArgs;
Expand All @@ -16,6 +19,8 @@ use url::Url;

use crate::alerts::aws_sns::AWSSNSValidatedArgs;
use crate::config::ServiceParams;
use crate::cron::event_bridge::AWSEventBridgeValidatedArgs;
use crate::cron::CronParams;
use crate::data_storage::aws_s3::AWSS3ValidatedArgs;
use crate::database::mongodb::MongoDBValidatedArgs;
use crate::queue::sqs::AWSSQSValidatedArgs;
Expand All @@ -24,6 +29,7 @@ use crate::telemetry::InstrumentationParams;

pub mod alert;
pub mod aws_config;
pub mod cron;
pub mod da;
pub mod database;
pub mod instrumentation;
Expand Down Expand Up @@ -74,6 +80,12 @@ pub mod storage;
.required(true)
.multiple(false)
),
group(
ArgGroup::new("cron")
.args(&["aws_event_bridge"])
.required(true)
.multiple(false)
),
)]
pub struct RunCmd {
// AWS Config
Expand Down Expand Up @@ -115,6 +127,10 @@ pub struct RunCmd {
#[clap(flatten)]
pub sharp_args: prover::sharp::SharpCliArgs,

// Cron
#[clap(flatten)]
pub aws_event_bridge_args: AWSEventBridgeCliArgs,

// SNOS
#[clap(flatten)]
pub snos_args: snos::SNOSCliArgs,
Expand Down Expand Up @@ -302,6 +318,33 @@ impl RunCmd {
}
}

pub fn validate_cron_params(&self) -> Result<CronParams, String> {
if self.aws_event_bridge_args.aws_event_bridge {
Ok(CronParams::EventBridge(AWSEventBridgeValidatedArgs {
target_queue_name: self
.aws_event_bridge_args
.target_queue_name
.clone()
.expect("Target queue name is required"),
cron_time: Duration::from_secs(
self.aws_event_bridge_args
.cron_time
.clone()
.expect("Cron time is required")
.parse::<u64>()
.expect("Failed to parse cron time"),
),
trigger_rule_name: self
.aws_event_bridge_args
.trigger_rule_name
.clone()
.expect("Trigger rule name is required"),
}))
} else {
Err("Only AWS Event Bridge is supported as of now".to_string())
}
}

pub fn validate_instrumentation_params(&self) -> Result<InstrumentationParams, String> {
Ok(InstrumentationParams {
otel_service_name: self
Expand Down
124 changes: 124 additions & 0 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::time::Duration;

use async_trait::async_trait;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_sqs::types::QueueAttributeName;

use crate::cron::Cron;
use crate::setup::SetupConfig;

#[derive(Clone, Debug)]
pub struct AWSEventBridgeValidatedArgs {
pub target_queue_name: String,
pub cron_time: Duration,
pub trigger_rule_name: String,
}

pub struct AWSEventBridge {}

impl AWSEventBridge {
pub fn new_with_params(_params: &AWSEventBridgeValidatedArgs) -> Self {
Self {}
}
}

#[async_trait]
#[allow(unreachable_patterns)]
impl Cron for AWSEventBridge {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
event_bridge_client
.put_rule()
.name(&trigger_rule_name)
.schedule_expression(duration_to_rate_string(cron_time))
.state(RuleState::Enabled)
.send()
.await?;

Ok(())
}
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
let sqs_client = aws_sdk_sqs::Client::new(config);
let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(queue_url.queue_url.unwrap())
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();

// Create the EventBridge target with the input transformer
let input_transformer =
InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?;

event_bridge_client
.put_targets()
.rule(trigger_rule_name)
.targets(
Target::builder()
.id(uuid::Uuid::new_v4().to_string())
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.send()
.await?;

Ok(())
}
}

fn duration_to_rate_string(duration: Duration) -> String {
let total_secs = duration.as_secs();
let total_mins = duration.as_secs() / 60;
let total_hours = duration.as_secs() / 3600;
let total_days = duration.as_secs() / 86400;

if total_days > 0 {
format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" })
} else if total_hours > 0 {
format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" })
} else if total_mins > 0 {
format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" })
} else {
format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" })
}
}

#[cfg(test)]
mod event_bridge_utils_test {
use rstest::rstest;

use super::*;

#[rstest]
fn test_duration_to_rate_string() {
assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)");
assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)");
assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)");
assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)");
assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)");
}
}
63 changes: 63 additions & 0 deletions crates/orchestrator/src/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::time::Duration;

use async_trait::async_trait;
use event_bridge::AWSEventBridgeValidatedArgs;
use lazy_static::lazy_static;

use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType};
use crate::setup::SetupConfig;

pub mod event_bridge;

#[derive(Clone, Debug)]
pub enum CronParams {
EventBridge(AWSEventBridgeValidatedArgs),
}

lazy_static! {
pub static ref CRON_DURATION: Duration = Duration::from_mins(1);
// TODO : we can take this from clap.
pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue");
pub static ref WORKER_TRIGGERS: Vec<WorkerTriggerType> = vec![
WorkerTriggerType::Snos,
WorkerTriggerType::Proving,
WorkerTriggerType::DataSubmission,
WorkerTriggerType::UpdateState
];
pub static ref WORKER_TRIGGER_RULE_NAME: String = String::from("worker_trigger_scheduled");
}

#[async_trait]
pub trait Cron {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> {
self.create_cron(&config, *CRON_DURATION, WORKER_TRIGGER_RULE_NAME.clone()).await?;
for triggers in WORKER_TRIGGERS.iter() {
self.add_cron_target_queue(
&config,
TARGET_QUEUE_NAME.clone(),
get_worker_trigger_message(triggers.clone())?,
WORKER_TRIGGER_RULE_NAME.clone(),
)
.await?;
}
Ok(())
}
}

fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result<String> {
let message = WorkerTriggerMessage { worker: worker_trigger_type };
Ok(serde_json::to_string(&message)?)
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl DataStorage for AWSS3 {
Ok(())
}

async fn build_test_bucket(&self, bucket_name: &str) -> Result<()> {
async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
self.client.create_bucket().bucket(bucket_name).send().await?;
Ok(())
}
Expand Down
Loading

0 comments on commit abf8df2

Please sign in to comment.