Skip to content

Commit

Permalink
Rate limit ora compression workflow
Browse files Browse the repository at this point in the history
1. Rate limit the number of concurrent ecs tasks to 400
2. Rate limit the number of concurrent instrument runs either gzip -> md5sum or ora -> md5sum to 10
  • Loading branch information
alexiswl committed Nov 25, 2024
1 parent a0fe733 commit 09b9f43
Show file tree
Hide file tree
Showing 14 changed files with 778 additions and 207 deletions.
2 changes: 1 addition & 1 deletion config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ Resources used by the ora compression pipeline
// Pipeline ID is: 5c1c2fa2-30dc-46ed-9e7f-dc4fefac77b6
// deployed to dev, stg and prod
export const oraCompressionIcav2PipelineIdSSMParameterPath =
'/icav2/umccr-prod/ora_compression_pipeline_id'; // 5c1c2fa2-30dc-46ed-9e7f-dc4fefac77b6
'/icav2/umccr-prod/ora_compression_pipeline_id'; // 0540fca4-cc40-45ac-88e2-d32df69c6954

// Default Reference Uri for compressing ORA files
// Reference URI is icav2://reference-data/dragen-ora/v2/ora_reference_v2.tar.gz
Expand Down
69 changes: 69 additions & 0 deletions lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,45 @@ export class GzipRawMd5sumDecompressionConstruct extends Construct {
icav2SecretObj.grantRead(lambdaObj.currentVersion);
});

// Set up the step function for rate limiting the number of tasks running simultaneously
const rateLimitStateMachine = new sfn.StateMachine(this, 'RateLimitStateMachine', {
stateMachineName: `${props.sfnPrefix}-ecs-limiter-sfn`,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json'
)
),
definitionSubstitutions: {
__cluster_arn__: cluster.clusterArn,
},
});

// Allow the rate limit state machine to list the ECS tasks
// in the cluster
rateLimitStateMachine.addToRolePolicy(
new iam.PolicyStatement({
resources: ['*'],
actions: ['ecs:ListTasks'],
conditions: {
ArnEquals: {
'ecs:cluster': cluster.clusterArn,
},
},
})
);

NagSuppressions.addResourceSuppressions(
rateLimitStateMachine,
[
{
id: 'AwsSolutions-IAM5',
reason: 'Give permissions to resources *, to list tasks with condition on ecs cluster',
},
],
true
);

// Set up step function
// Build state machine object
this.sfnObject = new sfn.StateMachine(this, 'state_machine', {
Expand All @@ -136,9 +175,39 @@ export class GzipRawMd5sumDecompressionConstruct extends Construct {
readIcav2FileContentsLambdaObj.currentVersion.functionArn,
__delete_icav2_cache_uri_lambda_function_arn__:
deleteIcav2CacheUriLambdaObj.currentVersion.functionArn,
/* Child step functions */
__ecs_task_rate_limit_sfn_arn__: rateLimitStateMachine.stateMachineArn,
},
});

rateLimitStateMachine.grantStartExecution(this.sfnObject);
rateLimitStateMachine.grantRead(this.sfnObject);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
this.sfnObject.addToRolePolicy(
new iam.PolicyStatement({
resources: [
`arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`,
],
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
})
);

// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies
// Polling requires permission for states:DescribeExecution
NagSuppressions.addResourceSuppressions(
this.sfnObject,
[
{
id: 'AwsSolutions-IAM5',
reason:
'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations',
},
],
true
);

// Allow the step function to invoke the lambda
[readIcav2FileContentsLambdaObj, deleteIcav2CacheUriLambdaObj].forEach((lambdaObj) => {
lambdaObj.currentVersion.grantInvoke(this.sfnObject);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"Comment": "Task Backoff SFN",
"StartAt": "ListTasks",
"States": {
"ListTasks": {
"Type": "Task",
"Parameters": {
"Cluster": "${__cluster_arn__}"
},
"Resource": "arn:aws:states:::aws-sdk:ecs:listTasks",
"ResultSelector": {
"num_tasks_running.$": "States.ArrayLength($.TaskArns)"
},
"ResultPath": "$.get_num_tasks_step",
"Next": "Over 400 tasks running"
},
"Over 400 tasks running": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.get_num_tasks_step.num_tasks_running",
"NumericGreaterThan": 400,
"Next": "Sleep a minute",
"Comment": "More than 400 tasks running"
}
],
"Default": "Success"
},
"Sleep a minute": {
"Type": "Wait",
"Seconds": 60,
"Next": "ListTasks"
},
"Success": {
"Type": "Succeed"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"States": {
"Convert Fastq List Row GZ URIs to Array": {
"Type": "Pass",
"Next": "Decompress GZIP Files",
"Next": "Wait for Task Availability",
"Parameters": {
"gzip_files_map": [
{
Expand All @@ -19,6 +19,16 @@
},
"ResultPath": "$.fastq_list_row_as_map"
},
"Wait for Task Availability": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "${__ecs_task_rate_limit_sfn_arn__}",
"Input": {}
},
"Next": "Decompress GZIP Files",
"ResultPath": null
},
"Decompress GZIP Files": {
"Type": "Map",
"ItemsPath": "$.fastq_list_row_as_map.gzip_files_map",
Expand Down Expand Up @@ -71,8 +81,15 @@
{
"ErrorEquals": ["ECS.AmazonECSException"],
"BackoffRate": 2,
"IntervalSeconds": 300,
"MaxAttempts": 3,
"IntervalSeconds": 3600,
"MaxAttempts": 5,
"JitterStrategy": "FULL"
},
{
"ErrorEquals": ["States.TaskFailed"],
"BackoffRate": 2,
"IntervalSeconds": 3600,
"MaxAttempts": 5,
"JitterStrategy": "FULL"
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,45 @@ export class OraDecompressionConstruct extends Construct {
}),
});

// Set up the step function for rate limiting the number of tasks running simultaneously
const rateLimitStateMachine = new sfn.StateMachine(this, 'RateLimitStateMachine', {
stateMachineName: `${props.sfnPrefix}-ecs-limiter-sfn`,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json'
)
),
definitionSubstitutions: {
__cluster_arn__: cluster.clusterArn,
},
});

// Allow the rate limit state machine to list the ECS tasks
// in the cluster
rateLimitStateMachine.addToRolePolicy(
new iam.PolicyStatement({
resources: ['*'],
actions: ['ecs:ListTasks'],
conditions: {
ArnEquals: {
'ecs:cluster': cluster.clusterArn,
},
},
})
);

NagSuppressions.addResourceSuppressions(
rateLimitStateMachine,
[
{
id: 'AwsSolutions-IAM5',
reason: 'Give permissions to resources *, to list tasks with condition on ecs cluster',
},
],
true
);

// Set up step function
// Build state machine object
this.sfnObject = new sfn.StateMachine(this, 'state_machine', {
Expand All @@ -96,9 +135,40 @@ export class OraDecompressionConstruct extends Construct {
__ora_container_name__: oraDecompressionContainer.containerName,
__subnets__: cluster.vpc.privateSubnets.map((subnet) => subnet.subnetId).join(','),
__sg_group__: securityGroup.securityGroupId,
/* Child step functions */
__ecs_task_rate_limit_sfn_arn__: rateLimitStateMachine.stateMachineArn,
},
});

// Grant permissions to the state machine
rateLimitStateMachine.grantStartExecution(this.sfnObject);
rateLimitStateMachine.grantRead(this.sfnObject);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
this.sfnObject.addToRolePolicy(
new iam.PolicyStatement({
resources: [
`arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`,
],
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
})
);

// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies
// Polling requires permission for states:DescribeExecution
NagSuppressions.addResourceSuppressions(
this.sfnObject,
[
{
id: 'AwsSolutions-IAM5',
reason:
'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations',
},
],
true
);

// Allow step function to run the ECS task
taskDefinition.grantRun(this.sfnObject);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
},
"Convert Fastq List Row ORA Validation to Array": {
"Type": "Pass",
"Next": "Decompress ORA Files",
"Next": "Wait for Task Availability",
"Parameters": {
"ora_files_map": [
{
Expand All @@ -32,7 +32,7 @@
},
"Convert Fastq List Row ORA to Array": {
"Type": "Pass",
"Next": "Decompress ORA Files",
"Next": "Wait for Task Availability",
"Parameters": {
"ora_files_map": [
{
Expand All @@ -49,6 +49,16 @@
},
"ResultPath": "$.fastq_list_row_as_map"
},
"Wait for Task Availability": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "${__ecs_task_rate_limit_sfn_arn__}",
"Input": {}
},
"Next": "Decompress ORA Files",
"ResultPath": null
},
"Decompress ORA Files": {
"Type": "Map",
"ItemsPath": "$.fastq_list_row_as_map.ora_files_map",
Expand Down Expand Up @@ -117,8 +127,15 @@
{
"ErrorEquals": ["ECS.AmazonECSException"],
"BackoffRate": 2,
"IntervalSeconds": 300,
"MaxAttempts": 3,
"IntervalSeconds": 3600,
"MaxAttempts": 5,
"JitterStrategy": "FULL"
},
{
"ErrorEquals": ["States.TaskFailed"],
"BackoffRate": 2,
"IntervalSeconds": 3600,
"MaxAttempts": 2,
"JitterStrategy": "FULL"
}
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"Comment": "Task Backoff SFN",
"StartAt": "ListTasks",
"States": {
"ListTasks": {
"Type": "Task",
"Parameters": {
"Cluster": "${__cluster_arn__}"
},
"Resource": "arn:aws:states:::aws-sdk:ecs:listTasks",
"ResultSelector": {
"num_tasks_running.$": "States.ArrayLength($.TaskArns)"
},
"ResultPath": "$.get_num_tasks_step",
"Next": "Over 400 tasks running"
},
"Over 400 tasks running": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.get_num_tasks_step.num_tasks_running",
"NumericGreaterThan": 400,
"Next": "Sleep a minute",
"Comment": "More than 400 tasks running"
}
],
"Default": "Success"
},
"Sleep a minute": {
"Type": "Wait",
"Seconds": 60,
"Next": "ListTasks"
},
"Success": {
"Type": "Succeed"
}
}
}
Loading

0 comments on commit 09b9f43

Please sign in to comment.