Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cttso icav2 step function branch #145

Merged
merged 17 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import { getFileManagerStackProps } from './stacks/fileManager';
import { getBsRunsUploadManagerStackProps } from './stacks/bsRunsUploadManager';
import { getICAv2CopyBatchUtilityStackProps } from './stacks/icav2CopyBatchUtility';
import { getBsshIcav2FastqCopyManagerStackProps } from './stacks/bsshIcav2FastqCopyManager';
import {
getCttsov2Icav2PipelineManagerStackProps,
getCttsov2Icav2PipelineTableStackProps,
} from './stacks/cttsov2Icav2PipelineManager';
import { getSchemaStackProps } from './stacks/schema';

interface EnvironmentConfig {
Expand Down Expand Up @@ -41,6 +45,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
sharedStackProps: getSharedStackProps(stage),
tokenServiceStackProps: getTokenServiceStackProps(),
icaEventPipeStackProps: getIcaEventPipeStackProps(),
cttsov2Icav2PipelineTableStackProps: getCttsov2Icav2PipelineTableStackProps(),
},
statelessConfig: {
postgresManagerStackProps: getPostgresManagerStackProps(),
Expand All @@ -50,6 +55,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
bsRunsUploadManagerStackProps: getBsRunsUploadManagerStackProps(stage),
icav2CopyBatchUtilityStackProps: getICAv2CopyBatchUtilityStackProps(stage),
bsshIcav2FastqCopyManagerStackProps: getBsshIcav2FastqCopyManagerStackProps(stage),
cttsov2Icav2PipelineManagerStackProps: getCttsov2Icav2PipelineManagerStackProps(stage),
schemaStackProps: getSchemaStackProps(),
},
},
Expand All @@ -65,6 +71,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
sharedStackProps: getSharedStackProps(stage),
tokenServiceStackProps: getTokenServiceStackProps(),
icaEventPipeStackProps: getIcaEventPipeStackProps(),
cttsov2Icav2PipelineTableStackProps: getCttsov2Icav2PipelineTableStackProps(),
},
statelessConfig: {
postgresManagerStackProps: getPostgresManagerStackProps(),
Expand All @@ -74,6 +81,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
bsRunsUploadManagerStackProps: getBsRunsUploadManagerStackProps(stage),
icav2CopyBatchUtilityStackProps: getICAv2CopyBatchUtilityStackProps(stage),
bsshIcav2FastqCopyManagerStackProps: getBsshIcav2FastqCopyManagerStackProps(stage),
cttsov2Icav2PipelineManagerStackProps: getCttsov2Icav2PipelineManagerStackProps(stage),
schemaStackProps: getSchemaStackProps(),
},
},
Expand All @@ -89,6 +97,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
sharedStackProps: getSharedStackProps(stage),
tokenServiceStackProps: getTokenServiceStackProps(),
icaEventPipeStackProps: getIcaEventPipeStackProps(),
cttsov2Icav2PipelineTableStackProps: getCttsov2Icav2PipelineTableStackProps(),
},
statelessConfig: {
postgresManagerStackProps: getPostgresManagerStackProps(),
Expand All @@ -98,6 +107,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
bsRunsUploadManagerStackProps: getBsRunsUploadManagerStackProps(stage),
icav2CopyBatchUtilityStackProps: getICAv2CopyBatchUtilityStackProps(stage),
bsshIcav2FastqCopyManagerStackProps: getBsshIcav2FastqCopyManagerStackProps(stage),
cttsov2Icav2PipelineManagerStackProps: getCttsov2Icav2PipelineManagerStackProps(stage),
schemaStackProps: getSchemaStackProps(),
},
},
Expand Down
36 changes: 36 additions & 0 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,39 @@ export const bsshFastqCopyManagerSSMArn = path.join(
'state_machine_arn'
);
export const bsshFastqCopyManagerSfnName = 'bssh_fastq_copy_manager_sfn';

/*
External resources required by the ctTSO v2 Stack
*/
export const cttsov2Icav2PipelineIdSSMParameterPath =
'/icav2/umccr-prod/tso500_ctdna_2.1_pipeline_id';

/*
Resources generated by the ctTSO v2 Stack
*/
export const cttsov2Icav2PipelineManagerSSMRoot = '/orcabus/ctTSOv2';

// Stateful
export const cttsov2Icav2PipelineManagerDynamodbTableName = 'ctTSOv2ICAv2AnalysesDynamoDBTable';
export const cttsov2DynamoDbTableSSMName = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'dynamodb_table_name'
);
export const cttsov2DynamoDbTableSSMArn = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'dynamodb_table_arn'
);

// Stateless
export const cttsov2Icav2PipelineSfnSSMName = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'state_machine_name'
);
export const cttsov2Icav2PipelineSfnSSMArn = path.join(
cttsov2Icav2PipelineManagerSSMRoot,
'state_machine_arn'
);

export const cttsov2Icav2PipelineWorkflowType = 'cttsov2';
export const cttsov2Icav2PipelineWorkflowTypeVersion = '2.1.1';
export const cttsov2Icav2ServiceVersion = '2024.05.07';
44 changes: 44 additions & 0 deletions config/stacks/cttsov2Icav2PipelineManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
AppStage,
eventBusName,
cttsov2Icav2PipelineIdSSMParameterPath,
icav2CopyBatchUtilityName,
cttsov2Icav2PipelineSfnSSMName,
cttsov2Icav2PipelineSfnSSMArn,
cttsov2Icav2PipelineManagerDynamodbTableName,
icav2AccessTokenSecretName,
cttsov2DynamoDbTableSSMArn,
cttsov2DynamoDbTableSSMName,
cttsov2Icav2PipelineWorkflowType,
cttsov2Icav2ServiceVersion,
cttsov2Icav2PipelineWorkflowTypeVersion,
} from '../constants';
import { Cttsov2Icav2PipelineManagerConfig } from '../../lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack';
import { Cttsov2Icav2PipelineTableConfig } from '../../lib/workload/stateful/stacks/cttso-v2-pipeline-dynamo-db/deploy/stack';

// Stateful
export const getCttsov2Icav2PipelineTableStackProps = (): Cttsov2Icav2PipelineTableConfig => {
return {
cttsov2Icav2DynamodbTableArnSsmParameterPath: cttsov2DynamoDbTableSSMArn,
cttsov2Icav2DynamodbTableNameSsmParameterPath: cttsov2DynamoDbTableSSMName,
dynamodbTableName: cttsov2Icav2PipelineManagerDynamodbTableName,
};
};

// Stateless
export const getCttsov2Icav2PipelineManagerStackProps = (
stage: AppStage
): Cttsov2Icav2PipelineManagerConfig => {
return {
pipelineIdSsmPath: cttsov2Icav2PipelineIdSSMParameterPath,
icav2CopyBatchUtilityStateMachineName: icav2CopyBatchUtilityName,
cttsov2LaunchStateMachineArnSsmParameterPath: cttsov2Icav2PipelineSfnSSMArn,
cttsov2LaunchStateMachineNameSsmParameterPath: cttsov2Icav2PipelineSfnSSMName,
dynamodbTableName: cttsov2Icav2PipelineManagerDynamodbTableName,
eventbusName: eventBusName,
icav2TokenSecretId: icav2AccessTokenSecretName[stage],
workflowType: cttsov2Icav2PipelineWorkflowType,
workflowVersion: cttsov2Icav2PipelineWorkflowTypeVersion,
serviceVersion: cttsov2Icav2ServiceVersion,
};
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Construct } from 'constructs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as path from 'path';
import * as events from 'aws-cdk-lib/aws-events';
import * as events_targets from 'aws-cdk-lib/aws-events-targets';

export interface Icav2AnalysisEventHandlerConstructProps {
/* Names of objects to get */
tableName: string; // Name of the table to get / update / query

/* Names of objects to create */
stateMachineName: string; // Name of the state machine to create

/* Event configurations to push to */
detailType: string; // Detail type of the event to raise
eventBusName: string; // Detail of the eventbus to push the event to
source: string; // Source of the event we push

/* Internal workflowRunStateChange event details */
workflowType: string;
workflowVersion: string;
serviceVersion: string;
}

export class Icav2AnalysisEventHandlerConstruct extends Construct {
public readonly stateMachineObj: sfn.StateMachine;

constructor(scope: Construct, id: string, props: Icav2AnalysisEventHandlerConstructProps) {
super(scope, id);

// Get table object
const table_obj = dynamodb.TableV2.fromTableName(this, 'table_obj', props.tableName);

// Get the event bus object
const eventbus_obj = events.EventBus.fromEventBusName(
this,
'orcabus_eventbus_obj',
props.eventBusName
);

// Build state machine object
this.stateMachineObj = new sfn.StateMachine(this, 'state_machine', {
stateMachineName: props.stateMachineName,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/icav2_get_workflow_status_and_raise_internal_event.asl.json'
)
),
definitionSubstitutions: {
/* Table object */
__table_name__: table_obj.tableName,
/* Event metadata */
__detail_type__: props.detailType,
__eventbus_name__: props.eventBusName,
__eventsource__: props.source,
/* Put event details */
__workflow_type__: props.workflowType,
__workflow_version__: props.workflowVersion,
__service_version_: props.serviceVersion,
},
});

/* Grant the state machine read and write access to the table */
table_obj.grantReadWriteData(this.stateMachineObj);

// Create a rule for this state machine
const rule = new events.Rule(this, 'rule', {
eventBus: eventbus_obj,
ruleName: `${props.stateMachineName}-rule`,
eventPattern: {
source: [props.source],
detailType: [props.detailType],
},
});

/* Add rule as a target to the state machine */
rule.addTarget(
new events_targets.SfnStateMachine(this.stateMachineObj, {
input: events.RuleTargetInput.fromEventPath('$.detail'),
})
);

/* Grant the state machine the ability to submit events to the event bus */
eventbus_obj.grantPutEventsTo(this.stateMachineObj.role);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
{
"Comment": "Handle icav2 state change",
"StartAt": "Move event detail",
"States": {
"Move event detail": {
"Type": "Pass",
"Next": "DynamoDB Get UUID from ICAv2 Analysis ID",
"Parameters": {
"event_detail.$": "$"
}
},
"DynamoDB Get UUID from ICAv2 Analysis ID": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.event_detail.id",
"id_type": "icav2_analysis_id"
}
},
"Next": "Check Analysis ID in DataBase",
"ResultPath": "$.get_analysis_id_in_db_step",
"ResultSelector": {
"db_response.$": "$"
}
},
"Check Analysis ID in DataBase": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.get_analysis_id_in_db_step.db_response.Item",
"IsPresent": true
},
"Next": "Success"
}
],
"Default": "DynamoDB Update Status"
},
"DynamoDB Update Status": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.db_uuid",
"id_type": "db_uuid"
},
"UpdateExpression": "SET status = :status",
"ExpressionAttributeValues": {
":status": {
"S.$": "$.get_analysis_id_in_db_step.db_response.status"
}
}
},
"Next": "Wait 1 Second",
"ResultPath": "$.update_analysis_id_in_db_step"
},
"Wait 1 Second": {
"Type": "Wait",
"Seconds": 1,
"Next": "DynamoDB Get UUID Row"
},
"DynamoDB Get UUID Row": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.db_uuid",
"id_type": "db_uuid"
}
},
"Next": "PutEvent",
"ResultPath": "$.update_analysis_id_in_db_step",
"ResultSelector": {
"portalRunId.$": "$.Item.portal_run_id.S",
"timestamp.$": "$$.State.EnteredTime",
"status": "$.Item.analysis_status.S",
"workflowType": "${__workflow_type__}",
"workflowVersion": "${__workflow_version__}",
"payload": {
"refId": "",
"analysisLaunchPayload.$": "$.Item.analysis_launch_payload.S",
"analysisReturnPayload": "$.Item.analysis_return_payload.S",
"stateMachineExecutionArn": "$.Item.state_machine_execution_arn.S",
"analysisId.$": "$.Item.analysis_id.S"
},
"serviceVersion": "${__service_version__}"
}
},
"PutEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::events:putEvents",
"Parameters": {
"Entries": [
{
"Detail.$": "$.update_analysis_id_in_db_step",
"DetailType": "workflowRunStateChange",
"EventBusName": "${__eventbus_name__}",
"Source": "${__eventsource__}"
}
]
},
"Next": "Success"
},
"Success": {
"Type": "Succeed"
}
}
}
32 changes: 32 additions & 0 deletions lib/workload/components/dynamodb-icav2-table/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Construct } from 'constructs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';

export interface DynamodbIcav2PipelineConstructProps {
tableName: string;
}

export class DynamodbIcav2PipelineConstruct extends Construct {
public readonly tableObj: dynamodb.ITableV2;
public readonly tableNameArn: string;

constructor(scope: Construct, id: string, props: DynamodbIcav2PipelineConstructProps) {
super(scope, id);

this.tableObj = new dynamodb.TableV2(this, 'dynamodb_icav2_pipeline_table', {
/* Either a db_uuid or an icav2 analysis id or a portal run id */
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
/* One of 'db_uuid', 'icav2_analysis_id', 'portal_run_id' */
sortKey: {
name: 'id_type',
type: dynamodb.AttributeType.STRING,
},
tableName: props.tableName,
});

// Set outputs
this.tableNameArn = this.tableObj.tableArn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ctTSOV2 Pipeline Dynamo Db

The dynamo db service that stores icav2 cttsov2 analyses

Analysis are stored by their portal_run_id, icav2_analysis_id and their database uuid.


Loading
Loading