Skip to content

Commit

Permalink
merge to main
Browse files Browse the repository at this point in the history
  • Loading branch information
raylrui committed Nov 28, 2024
2 parents 33877b2 + 18059a2 commit 529d6ec
Show file tree
Hide file tree
Showing 34 changed files with 1,942 additions and 221 deletions.
24 changes: 20 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,30 @@ start-all-service:
docker compose up --wait -d db

# Insert all dump data in before running servers
@(cd lib/workload/stateless/stacks/metadata-manager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/sequence-run-manager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/workflow-manager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/filemanager && $(MAKE) s3-load)
@(cd lib/workload/stateless/stacks/metadata-manager && $(MAKE) reset-db)
@(cd lib/workload/stateless/stacks/sequence-run-manager && $(MAKE) reset-db)
@(cd lib/workload/stateless/stacks/workflow-manager && $(MAKE) reset-db)
@(cd lib/workload/stateless/stacks/filemanager && $(MAKE) reset-db)

# Running the rest of the µ-service server
docker compose up --wait -d --build

# Commands for pg-dd
dump: PG_DD_COMMAND=dump
dump: pg-dd

upload: PG_DD_COMMAND=upload
upload: pg-dd

download: PG_DD_COMMAND=download
download: pg-dd

load: PG_DD_COMMAND=load
load: pg-dd

pg-dd:
@PG_DD_COMMAND=$(COMMAND) docker compose up pg-dd

stop-all-service:
docker compose down

Expand Down
35 changes: 35 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,38 @@ services:
interval: 10s
timeout: 2s
retries: 5

# Load test data into the database.
pg-dd:
build:
context: ./lib/workload/stateless/stacks/pg-dd
dockerfile: Dockerfile
volumes:
# Store the dumps to the local filesystem.
- ./lib/workload/stateless/stacks/pg-dd/data:/app/data
depends_on:
# Depends on migration from all services, so they must be started first.
- db
- metadata-manager
- workflow-manager
- sequence-run-manager
- filemanager
command: ${PG_DD_COMMAND:-load}
environment:
- PG_DD_URL=postgresql://orcabus:orcabus@db:5432
- PG_DD_DIR=data
- PG_DD_BUCKET=orcabus-test-data-843407916570-ap-southeast-2
- PG_DD_PREFIX=pg-dd

- PG_DD_DATABASE_METADATA_MANAGER=metadata_manager
- PG_DD_DATABASE_SEQUENCE_RUN_MANAGER=sequence_run_manager
- PG_DD_DATABASE_WORKFLOW_MANAGER=workflow_manager
- PG_DD_DATABASE_FILEMANAGER=filemanager
- PG_DD_DATABASE_FILEMANAGER_SQL_DUMP=select * from s3_object order by sequencer limit 10000
- PG_DD_DATABASE_FILEMANAGER_SQL_LOAD=s3_object

- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-access_key_id}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-secret_access_key}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-ap-southeast-2}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN:-session_token}
restart: no
2 changes: 2 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import {
} from './stacks/oraCompressionPipelineManager';
import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionPipelineManager';
import { getWebSocketApiStackProps } from './stacks/clientWebsocketApi';
import { getPgDDProps } from './stacks/pgDD';
interface EnvironmentConfig {
name: string;
region: string;
Expand Down Expand Up @@ -131,6 +132,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
stackyMcStackFaceProps: getGlueStackProps(stage),
fmAnnotatorProps: getFmAnnotatorProps(),
websocketApiStackProps: getWebSocketApiStackProps(stage),
pgDDProps: getPgDDProps(stage),
},
};

Expand Down
2 changes: 1 addition & 1 deletion config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ Resources used by the ora compression pipeline
// Pipeline ID is: 5c1c2fa2-30dc-46ed-9e7f-dc4fefac77b6
// deployed to dev, stg and prod
export const oraCompressionIcav2PipelineIdSSMParameterPath =
'/icav2/umccr-prod/ora_compression_pipeline_id'; // 5c1c2fa2-30dc-46ed-9e7f-dc4fefac77b6
'/icav2/umccr-prod/ora_compression_pipeline_id'; // 0540fca4-cc40-45ac-88e2-d32df69c6954

// Default Reference Uri for compressing ORA files
// Reference URI is icav2://reference-data/dragen-ora/v2/ora_reference_v2.tar.gz
Expand Down
25 changes: 25 additions & 0 deletions config/stacks/pgDD.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {
accountIdAlias,
AppStage,
computeSecurityGroupName,
rdsMasterSecretName,
region,
vpcProps,
} from '../constants';
import { PgDDStackProps } from '../../lib/workload/stateless/stacks/pg-dd/deploy/stack';
import { getDataBucketStackProps } from './dataBucket';

export const getPgDDProps = (stage: AppStage): PgDDStackProps | undefined => {
const bucket = getDataBucketStackProps(stage);
if (bucket.bucketName === undefined) {
return undefined;
} else {
return {
bucket: bucket.bucketName,
prefix: 'pg-dd',
secretArn: `arn:aws:secretsmanager:${region}:${accountIdAlias.beta}:secret:${rdsMasterSecretName}`, // pragma: allowlist secret
lambdaSecurityGroupName: computeSecurityGroupName,
vpcProps,
};
}
};
69 changes: 69 additions & 0 deletions lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,45 @@ export class GzipRawMd5sumDecompressionConstruct extends Construct {
icav2SecretObj.grantRead(lambdaObj.currentVersion);
});

// Set up the step function for rate limiting the number of tasks running simultaneously
const rateLimitStateMachine = new sfn.StateMachine(this, 'RateLimitStateMachine', {
stateMachineName: `${props.sfnPrefix}-ecs-limiter-sfn`,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json'
)
),
definitionSubstitutions: {
__cluster_arn__: cluster.clusterArn,
},
});

// Allow the rate limit state machine to list the ECS tasks
// in the cluster
rateLimitStateMachine.addToRolePolicy(
new iam.PolicyStatement({
resources: ['*'],
actions: ['ecs:ListTasks'],
conditions: {
ArnEquals: {
'ecs:cluster': cluster.clusterArn,
},
},
})
);

NagSuppressions.addResourceSuppressions(
rateLimitStateMachine,
[
{
id: 'AwsSolutions-IAM5',
reason: 'Give permissions to resources *, to list tasks with condition on ecs cluster',
},
],
true
);

// Set up step function
// Build state machine object
this.sfnObject = new sfn.StateMachine(this, 'state_machine', {
Expand All @@ -136,9 +175,39 @@ export class GzipRawMd5sumDecompressionConstruct extends Construct {
readIcav2FileContentsLambdaObj.currentVersion.functionArn,
__delete_icav2_cache_uri_lambda_function_arn__:
deleteIcav2CacheUriLambdaObj.currentVersion.functionArn,
/* Child step functions */
__ecs_task_rate_limit_sfn_arn__: rateLimitStateMachine.stateMachineArn,
},
});

rateLimitStateMachine.grantStartExecution(this.sfnObject);
rateLimitStateMachine.grantRead(this.sfnObject);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
this.sfnObject.addToRolePolicy(
new iam.PolicyStatement({
resources: [
`arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`,
],
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
})
);

// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies
// Polling requires permission for states:DescribeExecution
NagSuppressions.addResourceSuppressions(
this.sfnObject,
[
{
id: 'AwsSolutions-IAM5',
reason:
'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations',
},
],
true
);

// Allow the step function to invoke the lambda
[readIcav2FileContentsLambdaObj, deleteIcav2CacheUriLambdaObj].forEach((lambdaObj) => {
lambdaObj.currentVersion.grantInvoke(this.sfnObject);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"Comment": "Task Backoff SFN",
"StartAt": "ListTasks",
"States": {
"ListTasks": {
"Type": "Task",
"Parameters": {
"Cluster": "${__cluster_arn__}"
},
"Resource": "arn:aws:states:::aws-sdk:ecs:listTasks",
"ResultSelector": {
"num_tasks_running.$": "States.ArrayLength($.TaskArns)"
},
"ResultPath": "$.get_num_tasks_step",
"Next": "Over 400 tasks running"
},
"Over 400 tasks running": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.get_num_tasks_step.num_tasks_running",
"NumericGreaterThan": 400,
"Next": "Sleep a minute",
"Comment": "More than 400 tasks running"
}
],
"Default": "Success"
},
"Sleep a minute": {
"Type": "Wait",
"Seconds": 60,
"Next": "ListTasks"
},
"Success": {
"Type": "Succeed"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"States": {
"Convert Fastq List Row GZ URIs to Array": {
"Type": "Pass",
"Next": "Decompress GZIP Files",
"Next": "Wait for Task Availability",
"Parameters": {
"gzip_files_map": [
{
Expand All @@ -19,6 +19,16 @@
},
"ResultPath": "$.fastq_list_row_as_map"
},
"Wait for Task Availability": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "${__ecs_task_rate_limit_sfn_arn__}",
"Input": {}
},
"Next": "Decompress GZIP Files",
"ResultPath": null
},
"Decompress GZIP Files": {
"Type": "Map",
"ItemsPath": "$.fastq_list_row_as_map.gzip_files_map",
Expand Down Expand Up @@ -71,8 +81,15 @@
{
"ErrorEquals": ["ECS.AmazonECSException"],
"BackoffRate": 2,
"IntervalSeconds": 300,
"MaxAttempts": 3,
"IntervalSeconds": 3600,
"MaxAttempts": 5,
"JitterStrategy": "FULL"
},
{
"ErrorEquals": ["States.TaskFailed"],
"BackoffRate": 2,
"IntervalSeconds": 3600,
"MaxAttempts": 5,
"JitterStrategy": "FULL"
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,45 @@ export class OraDecompressionConstruct extends Construct {
}),
});

// Set up the step function for rate limiting the number of tasks running simultaneously
const rateLimitStateMachine = new sfn.StateMachine(this, 'RateLimitStateMachine', {
stateMachineName: `${props.sfnPrefix}-ecs-limiter-sfn`,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json'
)
),
definitionSubstitutions: {
__cluster_arn__: cluster.clusterArn,
},
});

// Allow the rate limit state machine to list the ECS tasks
// in the cluster
rateLimitStateMachine.addToRolePolicy(
new iam.PolicyStatement({
resources: ['*'],
actions: ['ecs:ListTasks'],
conditions: {
ArnEquals: {
'ecs:cluster': cluster.clusterArn,
},
},
})
);

NagSuppressions.addResourceSuppressions(
rateLimitStateMachine,
[
{
id: 'AwsSolutions-IAM5',
reason: 'Give permissions to resources *, to list tasks with condition on ecs cluster',
},
],
true
);

// Set up step function
// Build state machine object
this.sfnObject = new sfn.StateMachine(this, 'state_machine', {
Expand All @@ -96,9 +135,40 @@ export class OraDecompressionConstruct extends Construct {
__ora_container_name__: oraDecompressionContainer.containerName,
__subnets__: cluster.vpc.privateSubnets.map((subnet) => subnet.subnetId).join(','),
__sg_group__: securityGroup.securityGroupId,
/* Child step functions */
__ecs_task_rate_limit_sfn_arn__: rateLimitStateMachine.stateMachineArn,
},
});

// Grant permissions to the state machine
rateLimitStateMachine.grantStartExecution(this.sfnObject);
rateLimitStateMachine.grantRead(this.sfnObject);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
this.sfnObject.addToRolePolicy(
new iam.PolicyStatement({
resources: [
`arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`,
],
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
})
);

// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies
// Polling requires permission for states:DescribeExecution
NagSuppressions.addResourceSuppressions(
this.sfnObject,
[
{
id: 'AwsSolutions-IAM5',
reason:
'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations',
},
],
true
);

// Allow step function to run the ECS task
taskDefinition.grantRun(this.sfnObject);

Expand Down
Loading

0 comments on commit 529d6ec

Please sign in to comment.