Skip to content

Commit

Permalink
Merge pull request #145 from umccr/feature/cttso-icav2-step-function-…
Browse files Browse the repository at this point in the history
…branch

Feature/cttso icav2 step function branch
  • Loading branch information
alexiswl authored May 7, 2024
2 parents 88473e6 + 7f18757 commit 99d6e36
Show file tree
Hide file tree
Showing 27 changed files with 3,418 additions and 0 deletions.
10 changes: 10 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import { getFileManagerStackProps } from './stacks/fileManager';
import { getBsRunsUploadManagerStackProps } from './stacks/bsRunsUploadManager';
import { getICAv2CopyBatchUtilityStackProps } from './stacks/icav2CopyBatchUtility';
import { getBsshIcav2FastqCopyManagerStackProps } from './stacks/bsshIcav2FastqCopyManager';
import {
getCttsov2Icav2PipelineManagerStackProps,
getCttsov2Icav2PipelineTableStackProps,
} from './stacks/cttsov2Icav2PipelineManager';
import { getSchemaStackProps } from './stacks/schema';

interface EnvironmentConfig {
Expand Down Expand Up @@ -41,6 +45,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
sharedStackProps: getSharedStackProps(stage),
tokenServiceStackProps: getTokenServiceStackProps(),
icaEventPipeStackProps: getIcaEventPipeStackProps(),
cttsov2Icav2PipelineTableStackProps: getCttsov2Icav2PipelineTableStackProps(),
},
statelessConfig: {
postgresManagerStackProps: getPostgresManagerStackProps(),
Expand All @@ -50,6 +55,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
bsRunsUploadManagerStackProps: getBsRunsUploadManagerStackProps(stage),
icav2CopyBatchUtilityStackProps: getICAv2CopyBatchUtilityStackProps(stage),
bsshIcav2FastqCopyManagerStackProps: getBsshIcav2FastqCopyManagerStackProps(stage),
cttsov2Icav2PipelineManagerStackProps: getCttsov2Icav2PipelineManagerStackProps(stage),
schemaStackProps: getSchemaStackProps(),
},
},
Expand All @@ -65,6 +71,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
sharedStackProps: getSharedStackProps(stage),
tokenServiceStackProps: getTokenServiceStackProps(),
icaEventPipeStackProps: getIcaEventPipeStackProps(),
cttsov2Icav2PipelineTableStackProps: getCttsov2Icav2PipelineTableStackProps(),
},
statelessConfig: {
postgresManagerStackProps: getPostgresManagerStackProps(),
Expand All @@ -74,6 +81,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
bsRunsUploadManagerStackProps: getBsRunsUploadManagerStackProps(stage),
icav2CopyBatchUtilityStackProps: getICAv2CopyBatchUtilityStackProps(stage),
bsshIcav2FastqCopyManagerStackProps: getBsshIcav2FastqCopyManagerStackProps(stage),
cttsov2Icav2PipelineManagerStackProps: getCttsov2Icav2PipelineManagerStackProps(stage),
schemaStackProps: getSchemaStackProps(),
},
},
Expand All @@ -89,6 +97,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
sharedStackProps: getSharedStackProps(stage),
tokenServiceStackProps: getTokenServiceStackProps(),
icaEventPipeStackProps: getIcaEventPipeStackProps(),
cttsov2Icav2PipelineTableStackProps: getCttsov2Icav2PipelineTableStackProps(),
},
statelessConfig: {
postgresManagerStackProps: getPostgresManagerStackProps(),
Expand All @@ -98,6 +107,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
bsRunsUploadManagerStackProps: getBsRunsUploadManagerStackProps(stage),
icav2CopyBatchUtilityStackProps: getICAv2CopyBatchUtilityStackProps(stage),
bsshIcav2FastqCopyManagerStackProps: getBsshIcav2FastqCopyManagerStackProps(stage),
cttsov2Icav2PipelineManagerStackProps: getCttsov2Icav2PipelineManagerStackProps(stage),
schemaStackProps: getSchemaStackProps(),
},
},
Expand Down
36 changes: 36 additions & 0 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,39 @@ export const bsshFastqCopyManagerSSMArn = path.join(
'state_machine_arn'
);
export const bsshFastqCopyManagerSfnName = 'bssh_fastq_copy_manager_sfn';

/*
External resources required by the ctTSO v2 Stack
*/
export const cttsov2Icav2PipelineIdSSMParameterPath =
'/icav2/umccr-prod/tso500_ctdna_2.1_pipeline_id';

/*
Resources generated by the ctTSO v2 Stack
*/
export const cttsov2Icav2PipelineManagerSSMRoot = '/orcabus/ctTSOv2';

// Stateful
export const cttsov2Icav2PipelineManagerDynamodbTableName = 'ctTSOv2ICAv2AnalysesDynamoDBTable';
export const cttsov2DynamoDbTableSSMName = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'dynamodb_table_name'
);
export const cttsov2DynamoDbTableSSMArn = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'dynamodb_table_arn'
);

// Stateless
export const cttsov2Icav2PipelineSfnSSMName = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'state_machine_name'
);
export const cttsov2Icav2PipelineSfnSSMArn = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'state_machine_arn'
);

export const cttsov2Icav2PipelineWorkflowType = 'cttsov2';
export const cttsov2Icav2PipelineWorkflowTypeVersion = '2.1.1';
export const cttsov2Icav2ServiceVersion = '2024.05.07';
44 changes: 44 additions & 0 deletions config/stacks/cttsov2Icav2PipelineManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
AppStage,
eventBusName,
cttsov2Icav2PipelineIdSSMParameterPath,
icav2CopyBatchUtilityName,
cttsov2Icav2PipelineSfnSSMName,
cttsov2Icav2PipelineSfnSSMArn,
cttsov2Icav2PipelineManagerDynamodbTableName,
icav2AccessTokenSecretName,
cttsov2DynamoDbTableSSMArn,
cttsov2DynamoDbTableSSMName,
cttsov2Icav2PipelineWorkflowType,
cttsov2Icav2ServiceVersion,
cttsov2Icav2PipelineWorkflowTypeVersion,
} from '../constants';
import { Cttsov2Icav2PipelineManagerConfig } from '../../lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack';
import { Cttsov2Icav2PipelineTableConfig } from '../../lib/workload/stateful/stacks/cttso-v2-pipeline-dynamo-db/deploy/stack';

// Stateful
export const getCttsov2Icav2PipelineTableStackProps = (): Cttsov2Icav2PipelineTableConfig => {
return {
cttsov2Icav2DynamodbTableArnSsmParameterPath: cttsov2DynamoDbTableSSMArn,
cttsov2Icav2DynamodbTableNameSsmParameterPath: cttsov2DynamoDbTableSSMName,
dynamodbTableName: cttsov2Icav2PipelineManagerDynamodbTableName,
};
};

// Stateless
export const getCttsov2Icav2PipelineManagerStackProps = (
stage: AppStage
): Cttsov2Icav2PipelineManagerConfig => {
return {
pipelineIdSsmPath: cttsov2Icav2PipelineIdSSMParameterPath,
icav2CopyBatchUtilityStateMachineName: icav2CopyBatchUtilityName,
cttsov2LaunchStateMachineArnSsmParameterPath: cttsov2Icav2PipelineSfnSSMArn,
cttsov2LaunchStateMachineNameSsmParameterPath: cttsov2Icav2PipelineSfnSSMName,
dynamodbTableName: cttsov2Icav2PipelineManagerDynamodbTableName,
eventbusName: eventBusName,
icav2TokenSecretId: icav2AccessTokenSecretName[stage],
workflowType: cttsov2Icav2PipelineWorkflowType,
workflowVersion: cttsov2Icav2PipelineWorkflowTypeVersion,
serviceVersion: cttsov2Icav2ServiceVersion,
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Construct } from 'constructs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as path from 'path';
import * as events from 'aws-cdk-lib/aws-events';
import * as events_targets from 'aws-cdk-lib/aws-events-targets';

export interface Icav2AnalysisEventHandlerConstructProps {
/* Names of objects to get */
tableName: string; // Name of the table to get / update / query

/* Names of objects to create */
stateMachineName: string; // Name of the state machine to create

/* Event configurations to push to */
detailType: string; // Detail type of the event to raise
eventBusName: string; // Detail of the eventbus to push the event to
source: string; // Source of the event we push

/* Internal workflowRunStateChange event details */
workflowType: string;
workflowVersion: string;
serviceVersion: string;
}

export class Icav2AnalysisEventHandlerConstruct extends Construct {
public readonly stateMachineObj: sfn.StateMachine;

constructor(scope: Construct, id: string, props: Icav2AnalysisEventHandlerConstructProps) {
super(scope, id);

// Get table object
const table_obj = dynamodb.TableV2.fromTableName(this, 'table_obj', props.tableName);

// Get the event bus object
const eventbus_obj = events.EventBus.fromEventBusName(
this,
'orcabus_eventbus_obj',
props.eventBusName
);

// Build state machine object
this.stateMachineObj = new sfn.StateMachine(this, 'state_machine', {
stateMachineName: props.stateMachineName,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/icav2_get_workflow_status_and_raise_internal_event.asl.json'
)
),
definitionSubstitutions: {
/* Table object */
__table_name__: table_obj.tableName,
/* Event metadata */
__detail_type__: props.detailType,
__eventbus_name__: props.eventBusName,
__eventsource__: props.source,
/* Put event details */
__workflow_type__: props.workflowType,
__workflow_version__: props.workflowVersion,
__service_version_: props.serviceVersion,
},
});

/* Grant the state machine read and write access to the table */
table_obj.grantReadWriteData(this.stateMachineObj);

// Create a rule for this state machine
const rule = new events.Rule(this, 'rule', {
eventBus: eventbus_obj,
ruleName: `${props.stateMachineName}-rule`,
eventPattern: {
source: [props.source],
detailType: [props.detailType],
},
});

/* Add rule as a target to the state machine */
rule.addTarget(
new events_targets.SfnStateMachine(this.stateMachineObj, {
input: events.RuleTargetInput.fromEventPath('$.detail'),
})
);

/* Grant the state machine the ability to submit events to the event bus */
eventbus_obj.grantPutEventsTo(this.stateMachineObj.role);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
{
"Comment": "Handle icav2 state change",
"StartAt": "Move event detail",
"States": {
"Move event detail": {
"Type": "Pass",
"Next": "DynamoDB Get UUID from ICAv2 Analysis ID",
"Parameters": {
"event_detail.$": "$"
}
},
"DynamoDB Get UUID from ICAv2 Analysis ID": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.event_detail.id",
"id_type": "icav2_analysis_id"
}
},
"Next": "Check Analysis ID in DataBase",
"ResultPath": "$.get_analysis_id_in_db_step",
"ResultSelector": {
"db_response.$": "$"
}
},
"Check Analysis ID in DataBase": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.get_analysis_id_in_db_step.db_response.Item",
"IsPresent": true
},
"Next": "Success"
}
],
"Default": "DynamoDB Update Status"
},
"DynamoDB Update Status": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.db_uuid",
"id_type": "db_uuid"
},
"UpdateExpression": "SET status = :status",
"ExpressionAttributeValues": {
":status": {
"S.$": "$.get_analysis_id_in_db_step.db_response.status"
}
}
},
"Next": "Wait 1 Second",
"ResultPath": "$.update_analysis_id_in_db_step"
},
"Wait 1 Second": {
"Type": "Wait",
"Seconds": 1,
"Next": "DynamoDB Get UUID Row"
},
"DynamoDB Get UUID Row": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.db_uuid",
"id_type": "db_uuid"
}
},
"Next": "PutEvent",
"ResultPath": "$.update_analysis_id_in_db_step",
"ResultSelector": {
"portalRunId.$": "$.Item.portal_run_id.S",
"timestamp.$": "$$.State.EnteredTime",
"status": "$.Item.analysis_status.S",
"workflowType": "${__workflow_type__}",
"workflowVersion": "${__workflow_version__}",
"payload": {
"refId": "",
"analysisLaunchPayload.$": "$.Item.analysis_launch_payload.S",
"analysisReturnPayload": "$.Item.analysis_return_payload.S",
"stateMachineExecutionArn": "$.Item.state_machine_execution_arn.S",
"analysisId.$": "$.Item.analysis_id.S"
},
"serviceVersion": "${__service_version__}"
}
},
"PutEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::events:putEvents",
"Parameters": {
"Entries": [
{
"Detail.$": "$.update_analysis_id_in_db_step",
"DetailType": "workflowRunStateChange",
"EventBusName": "${__eventbus_name__}",
"Source": "${__eventsource__}"
}
]
},
"Next": "Success"
},
"Success": {
"Type": "Succeed"
}
}
}
32 changes: 32 additions & 0 deletions lib/workload/components/dynamodb-icav2-table/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Construct } from 'constructs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';

export interface DynamodbIcav2PipelineConstructProps {
tableName: string;
}

export class DynamodbIcav2PipelineConstruct extends Construct {
public readonly tableObj: dynamodb.ITableV2;
public readonly tableNameArn: string;

constructor(scope: Construct, id: string, props: DynamodbIcav2PipelineConstructProps) {
super(scope, id);

this.tableObj = new dynamodb.TableV2(this, 'dynamodb_icav2_pipeline_table', {
/* Either a db_uuid or an icav2 analysis id or a portal run id */
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
/* One of 'db_uuid', 'icav2_analysis_id', 'portal_run_id' */
sortKey: {
name: 'id_type',
type: dynamodb.AttributeType.STRING,
},
tableName: props.tableName,
});

// Set outputs
this.tableNameArn = this.tableObj.tableArn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ctTSOV2 Pipeline Dynamo Db

The dynamo db service that stores icav2 cttsov2 analyses

Analysis are stored by their portal_run_id, icav2_analysis_id and their database uuid.


Loading

0 comments on commit 99d6e36

Please sign in to comment.