Skip to content

Commit

Permalink
Fix missing pipeline values
Browse files Browse the repository at this point in the history
The BNA Pipeline database entries were missing some information, and
store a run cost always equal to zero.

This patch addresses these issues.

Signed-off-by: Rémy Greinhofer <[email protected]>
  • Loading branch information
rgreinho committed Dec 9, 2024
1 parent 6527eb5 commit 1b145b2
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 31 deletions.
3 changes: 2 additions & 1 deletion lambdas/src/bna-fargate-run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ async fn function_handler(event: LambdaEvent<TaskInput>) -> Result<TaskOutput, E
.state_machine_id(StateMachineId(state_machine_id))
.start_time(Utc::now())
.step(Step::Setup)
.sqs_message(serde_json::to_string(analysis_parameters)?),
.sqs_message(serde_json::to_string(analysis_parameters)?)
.s3_bucket(aws_s3.destination.clone()),
)
.send()
.await?;
Expand Down
89 changes: 59 additions & 30 deletions lambdas/src/bna-save-results.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use aws_config::BehaviorVersion;
use aws_config::{BehaviorVersion, SdkConfig};
use aws_smithy_types_convert::date_time::DateTimeExt;
use bnaclient::types::{
builder::{self},
Expand Down Expand Up @@ -118,30 +118,8 @@ async fn function_handler(event: LambdaEvent<TaskInput>) -> Result<(), Error> {

// TODO: Patch city census.

// Configure the ECS client.
info!("Configure the ECS client...");
let ecs_client = aws_sdk_ecs::Client::new(&config);

// Compute the time it took to run the fargate task.
info!("describing fargate task {}", fargate.task_arn);
let describe_tasks = ecs_client
.describe_tasks()
.cluster(fargate.ecs_cluster_arn.clone())
.tasks(fargate.task_arn.clone())
.send()
.await?;
let task_info = describe_tasks.tasks().first().unwrap();
let started_at = task_info
.started_at()
.expect("the task must have started at this point");
let stopped_at = task_info
.started_at()
.expect("the task must have stopped at this point");
let fargate_time = FargateTime::new(*started_at, *stopped_at);

// Compute the price.
let cost = fargate_time.cost(FARGATE_COST_PER_SEC);
info!(cost = ?cost);
// Compute the time it took to run the fargate task and its cost;
let (started_at, stopped_at, cost) = update_fargate_details(&config, &fargate).await;

// Update the pipeline status.
info!("updating pipeline...");
Expand All @@ -153,9 +131,10 @@ async fn function_handler(event: LambdaEvent<TaskInput>) -> Result<(), Error> {
.body(
AnalysisPatch::builder()
.cost(cost.to_f64().expect("no overflow"))
.start_time(start_time)
.end_time(end_time)
.step(Step::Setup),
.results_posted(true)
.start_time(start_time)
.step(Step::Cleanup),
)
.send()
.await?;
Expand Down Expand Up @@ -340,6 +319,41 @@ async fn fetch_s3_object_as_bytes(
Ok(buffer)
}

async fn update_fargate_details(
config: &SdkConfig,
fargate: &Fargate,
) -> (
aws_sdk_s3::primitives::DateTime,
aws_sdk_s3::primitives::DateTime,
Decimal,
) {
let ecs_client = aws_sdk_ecs::Client::new(config);
info!("describing fargate task {}", fargate.task_arn);
let describe_tasks = ecs_client
.describe_tasks()
.cluster(fargate.ecs_cluster_arn.clone())
.tasks(fargate.task_arn.clone())
.send()
.await
.expect("the ECS task must be described");
let task_info = describe_tasks.tasks().first().unwrap();
let started_at = task_info
.started_at()
.expect("the task must have started at this point");
let stopped_at = task_info
.stopped_at()
.expect("the task must have stopped at this point");
let fargate_time = FargateTime::new(*started_at, *stopped_at);
let elapsed = fargate_time.elapsed();
info!(elapsed = ?elapsed);

// Compute the price.
let cost = fargate_time.cost(FARGATE_COST_PER_SEC);
info!(cost = ?cost);

(*started_at, *stopped_at, cost)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
Expand Down Expand Up @@ -570,19 +584,19 @@ mod tests {
#[test]
fn test_fargate_elapsed() {
let started_at = DateTime::from_str(
"1000-01-02T01:23:10.0Z",
"2024-11-22T16:33:07.624354Z",
aws_sdk_s3::primitives::DateTimeFormat::DateTime,
)
.unwrap();
let stopped_at = DateTime::from_str(
"1000-01-02T01:23:20.0Z",
"2024-11-22T16:34:59.322Z",
aws_sdk_s3::primitives::DateTimeFormat::DateTime,
)
.unwrap();

let fargate_time = FargateTime::new(started_at, stopped_at);
let elapsed = fargate_time.elapsed();
assert_eq!(elapsed, 10_i64);
assert_eq!(elapsed, 112_i64);
}

#[test]
Expand Down Expand Up @@ -722,4 +736,19 @@ mod tests {
// .unwrap();
// dbg!(r);
// }

// #[test(tokio::test)]
// async fn test_fargate_details() {
// let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
// let fargate = Fargate {
// ecs_cluster_arn: "".to_string(),
// task_arn: "".to_string(),
// last_status: "PROVISIONING".to_string(),
// };
// let (start, stop, cost) = update_fargate_details(&config, &fargate).await;
// dbg!(start);
// dbg!(stop);
// dbg!(cost);
// assert!(cost > Decimal::ZERO);
// }
}

0 comments on commit 1b145b2

Please sign in to comment.