Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/test aws #186

Merged
merged 27 commits into from
Dec 10, 2024
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0c2962e
fix: atlantic added in clap, get status code updated and snos branch …
Nov 23, 2024
19cc437
fix: aws corrections
heemankv Nov 29, 2024
d79b721
Merge remote-tracking branch 'origin/fix/atlantic' into fix/test-aws
Nov 29, 2024
670ad05
fix: atlantic tests and url update
Nov 29, 2024
5f2857b
update: starknet-os version
heemankv Nov 29, 2024
821bac9
verification values udpated for proving
Nov 29, 2024
6fc26bb
dockerfile updated
Nov 29, 2024
337323d
verification logic changed
Nov 30, 2024
cc0b11a
dockerfile updated
Nov 30, 2024
3454571
atlantic query response updated
Nov 30, 2024
6f6a00d
nack->ack and max 10 blocks in update state
apoorvsadana Nov 30, 2024
5c2fe48
fix: aws event bridge setup
heemankv Dec 3, 2024
c9b14d7
lint fix
heemankv Dec 3, 2024
35dc318
fix: lint
heemankv Dec 3, 2024
5c15ce6
update: delete storage setup
heemankv Dec 3, 2024
9074006
lint fixes
heemankv Dec 3, 2024
bce2bf8
fix: coverage
heemankv Dec 3, 2024
fc92c99
update: fixed serialization of trigger message
heemankv Dec 3, 2024
c1e3fe0
code cleanup
heemankv Dec 4, 2024
4dca57b
update: fix worker trigger for aws
heemankv Dec 4, 2024
64a7d28
update: re-fix CI issue
heemankv Dec 5, 2024
687e5b4
random commit to re-trigger CI
heemankv Dec 5, 2024
9eb4b55
update: added more metric
heemankv Dec 6, 2024
62bac48
update: Bucket Region from ENV
heemankv Dec 6, 2024
47a4675
update: fixed location constraint
heemankv Dec 9, 2024
8892386
update: reworked location constraint
heemankv Dec 9, 2024
0e5dc9a
fix: location constraint
heemankv Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: aws event bridge setup
  • Loading branch information
heemankv committed Dec 3, 2024
commit 5c2fe48dad4c14373ee171f1daef30ba1c5f3327
81 changes: 65 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -41,6 +41,8 @@ aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] }
aws-sdk-eventbridge = { version = "1.41.0", features = [
"behavior-version-latest",
] }
aws-sdk-iam = "1.52.0"
aws-sdk-scheduler = "1.49.0"
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-credential-types = { version = "1.2.1", features = [
"hardcoded-credentials",
2 changes: 2 additions & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -89,6 +89,8 @@ tracing = { workspace = true }
tracing-core = { workspace = true, default-features = false }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
aws-sdk-iam = { workspace = true }
aws-sdk-scheduler = { workspace = true }

[features]
default = ["ethereum", "with_mongodb", "with_sqs"]
2 changes: 1 addition & 1 deletion crates/orchestrator/src/cli/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,6 @@ pub struct AWSEventBridgeCliArgs {
pub cron_time: Option<String>,

/// The name of the event bridge trigger rule.
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-trigger-rule-name"), help = "The name of the event bridge trigger rule.")]
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-worker-trigger"), help = "The name of the event bridge trigger rule.")]
pub trigger_rule_name: Option<String>,
}
125 changes: 97 additions & 28 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::time::Duration;

use async_std::task::sleep;
use async_trait::async_trait;
use aws_config::SdkConfig;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_eventbridge::Client as EventBridgeClient;
use aws_sdk_scheduler::{types::{FlexibleTimeWindow, FlexibleTimeWindowMode, Target}, Client as SchedulerClient};
use aws_sdk_sqs::types::QueueAttributeName;
use aws_sdk_sqs::Client as SqsClient;
use color_eyre::eyre::Ok;

use crate::cron::Cron;
use crate::queue::job_queue::WorkerTriggerType;

use super::{get_worker_trigger_message, TriggerArns};

#[derive(Clone, Debug)]
pub struct AWSEventBridgeValidatedArgs {
@@ -20,8 +24,9 @@ pub struct AWSEventBridge {
target_queue_name: String,
cron_time: Duration,
trigger_rule_name: String,
client: EventBridgeClient,
client: SchedulerClient,
queue_client: SqsClient,
iam_client: aws_sdk_iam::Client,
}

impl AWSEventBridge {
@@ -30,27 +35,18 @@ impl AWSEventBridge {
target_queue_name: params.target_queue_name.clone(),
cron_time: params.cron_time,
trigger_rule_name: params.trigger_rule_name.clone(),
client: aws_sdk_eventbridge::Client::new(aws_config),
client: aws_sdk_scheduler::Client::new(aws_config),
queue_client: aws_sdk_sqs::Client::new(aws_config),
iam_client: aws_sdk_iam::Client::new(aws_config),
}
}
}

#[async_trait]
#[allow(unreachable_patterns)]
impl Cron for AWSEventBridge {
async fn create_cron(&self) -> color_eyre::Result<()> {
self.client
.put_rule()
.name(&self.trigger_rule_name)
.schedule_expression(duration_to_rate_string(self.cron_time))
.state(RuleState::Enabled)
.send()
.await?;

Ok(())
}
async fn add_cron_target_queue(&self, message: String) -> color_eyre::Result<()> {
async fn create_cron(&self) -> color_eyre::Result<TriggerArns> {
// Get Queue Info
let queue_url = self.queue_client.get_queue_url().queue_name(&self.target_queue_name).send().await?;

let queue_attributes = self
@@ -62,20 +58,93 @@ impl Cron for AWSEventBridge {
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();

// Create the EventBridge target with the input transformer
let input_transformer =
InputTransformer::builder().input_paths_map("time","$.time").input_template(message).build()?;
// Create IAM role for EventBridge
let role_name = format!("worker-trigger-role-{}", uuid::Uuid::new_v4());
heemankv marked this conversation as resolved.
Show resolved Hide resolved
let assume_role_policy = r#"{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "scheduler.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}"#;

let create_role_resp = self.iam_client
.create_role()
.role_name(&role_name)
.assume_role_policy_document(assume_role_policy)
.send()
.await?;

let role_arn = create_role_resp.role().unwrap().arn();

// Create policy document for SQS access
let policy_document = format!(r#"{{
"Version": "2012-10-17",
"Statement": [{{
"Effect": "Allow",
"Action": [
"sqs:SendMessage"
],
"Resource": "{}"
}}]
}}"#, queue_arn);

let policy_name = format!("worker-trigger-policy-{}", uuid::Uuid::new_v4());
heemankv marked this conversation as resolved.
Show resolved Hide resolved

// Create and attach the policy
let policy_resp = self.iam_client
.create_policy()
.policy_name(&policy_name)
.policy_document(&policy_document)
.send()
.await?;

let policy_arn = policy_resp.policy().unwrap().arn().unwrap().to_string();

// Attach the policy to the role
self.iam_client
.attach_role_policy()
.role_name(&role_name)
.policy_arn(&policy_arn)
.send()
.await?;

sleep(Duration::from_secs(100)).await;

Ok(TriggerArns {
queue_arn : queue_arn.to_string(),
role_arn : role_arn.to_string()
})
}

async fn add_cron_target_queue(&self, trigger_type: &WorkerTriggerType, trigger_arns : &TriggerArns) -> color_eyre::Result<()> {
let trigger_name = format!("{}-{}",self.trigger_rule_name, trigger_type);

// Set flexible time window (you can adjust this as needed)
let flexible_time_window = FlexibleTimeWindow::builder()
.mode(FlexibleTimeWindowMode::Off)
.build()?;

let message = get_worker_trigger_message(trigger_type.clone())?;

// Create target for SQS queue
let target = Target::builder()
.arn(trigger_arns.queue_arn.clone())
.role_arn(trigger_arns.role_arn.clone())
.input(message)
.build()?;

// Create the schedule
self.client
.put_targets()
.rule(&self.trigger_rule_name)
.targets(
Target::builder()
.id(uuid::Uuid::new_v4().to_string())
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.create_schedule()
.name(trigger_name)
.schedule_expression_timezone("UTC")
.flexible_time_window(flexible_time_window)
.schedule_expression(duration_to_rate_string(self.cron_time))
.target(target)
.send()
.await?;

15 changes: 10 additions & 5 deletions crates/orchestrator/src/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -14,14 +14,19 @@ lazy_static! {
];
}

#[derive(Debug, Clone)]
pub struct TriggerArns {
queue_arn : String,
role_arn : String
}
#[async_trait]
pub trait Cron {
async fn create_cron(&self) -> color_eyre::Result<()>;
async fn add_cron_target_queue(&self, message: String) -> color_eyre::Result<()>;
async fn create_cron(&self) -> color_eyre::Result<TriggerArns>;
async fn add_cron_target_queue(&self, trigger_type: &WorkerTriggerType, trigger_arns : &TriggerArns) -> color_eyre::Result<()>;
async fn setup(&self) -> color_eyre::Result<()> {
self.create_cron().await?;
for triggers in WORKER_TRIGGERS.iter() {
self.add_cron_target_queue(get_worker_trigger_message(triggers.clone())?).await?;
let trigger_arns = self.create_cron().await?;
for trigger in WORKER_TRIGGERS.iter() {
self.add_cron_target_queue(trigger, &trigger_arns).await?;
}
Ok(())
}
Loading