Skip to content

Commit

Permalink
Various stacky fixes for umccrise + rnasum
Browse files Browse the repository at this point in the history
* Use portal_run_id over ref id for database id (since refId isn't always propagated)
* Use idempotency key (set to portal run id) to prevent duplicate analyses
* Various fixes for linkedLibraries
* Various fixes for using new fastqListRow object syntax (and using rgid as the key)
* Add subjectId to wts tags (required by umccrise)
* Fixes to umccrise glue after testing
* Fixes to rnasum glue after testing
* Use new output_prefix syntax for dragen inputs
  • Loading branch information
alexiswl committed Sep 1, 2024
1 parent 492e85f commit d034033
Show file tree
Hide file tree
Showing 68 changed files with 542 additions and 534 deletions.
10 changes: 5 additions & 5 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ External resources required by the wgtsqc Stack
*/

// Deployed under dev/stg/prod
export const wgtsQcIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wgts_qc_4.2.4_pipeline_id'; // 413b3c60-a3f5-42eb-a9df-8a77768a8328
export const wgtsQcIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wgts_qc_4.2.4_pipeline_id'; // 03689516-b7f8-4dca-bba9-8405b85fae45

export const wgtsQcIcav2PipelineWorkflowType = 'wgtsQc';
export const wgtsQcIcav2PipelineWorkflowTypeVersion = '4.2.4';
Expand Down Expand Up @@ -302,7 +302,7 @@ TN Stateless stack
*/

// Deployed under dev/stg/prod
export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // fc82a668-4a60-4acf-a528-38f5ee3ffdf5
export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 0f5575bc-6cf8-4a90-a80e-05088aae8ed7
export const tnIcav2PipelineWorkflowType = 'tumor_normal';
export const tnIcav2PipelineWorkflowTypeVersion = '4.2.4';
export const tnIcav2ServiceVersion = '2024.07.01';
Expand Down Expand Up @@ -378,7 +378,7 @@ WTS Stateless stack
*/

// Deployed under dev/stg/prod
export const wtsIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wts_4.2.4_pipeline_id'; // 66c89437-ec33-4138-8a92-9c018ee533af
export const wtsIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wts_4.2.4_pipeline_id'; // 1e53ae07-08a6-458b-9fa3-9cf7430409a0
export const wtsIcav2PipelineWorkflowType = 'wts';
export const wtsIcav2PipelineWorkflowTypeVersion = '4.2.4';
export const wtsIcav2ServiceVersion = '2024.07.01';
Expand Down Expand Up @@ -469,9 +469,9 @@ UMCCRise Stateless stack
*/

// Deployed in dev/stg/prod
export const rnasumIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/rnasum_1.0.0_pipeline_id'; // bd6e5690-3ccf-4ac4-997d-59462f852f65
export const rnasumIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/rnasum_1.1.0_pipeline_id'; // 69362d8e-8f6f-4d87-84b5-a8c6205b7032
export const rnasumIcav2PipelineWorkflowType = 'rnasum';
export const rnasumIcav2PipelineWorkflowTypeVersion = '2.3.1';
export const rnasumIcav2PipelineWorkflowTypeVersion = '1.1.0';
export const rnasumIcav2ServiceVersion = '2024.07.01';
export const rnasumIcav2ReadyEventSource = 'orcabus.workflowmanager';
export const rnasumIcav2EventSource = 'orcabus.rnasum';
Expand Down
4 changes: 2 additions & 2 deletions config/stacks/umccrisePipelineManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
eventBusName,
icaEventPipeStackName,
icav2AccessTokenSecretName,
dragenIcav2ReferenceUriMappingSSMParameterPath,
umccriseIcav2PipelineIdSSMParameterPath,
umccriseIcav2PipelineManagerDynamodbTableName,
umccriseIcav2PipelineWorkflowType,
Expand All @@ -16,6 +15,7 @@ import {
umccriseDefaultGenomeVersion,
umccriseDynamoDbTableSSMArn,
umccriseDynamoDbTableSSMName,
icav2UmccriseGenomesReferenceUriMappingSSMParameterPath,
} from '../constants';
import { UmccriseIcav2PipelineManagerConfig } from '../../lib/workload/stateless/stacks/umccrise-pipeline-manager/deploy';
import { UmccriseIcav2PipelineTableConfig } from '../../lib/workload/stateful/stacks/umccrise-pipeline-dynamo-db/deploy/stack';
Expand Down Expand Up @@ -53,6 +53,6 @@ export const getUmccriseIcav2PipelineManagerStackProps = (
stateMachinePrefix: umccriseStateMachinePrefix,
/* SSM Workflow Parameters */
defaultReferenceVersion: umccriseDefaultGenomeVersion,
referenceUriSsmPath: dragenIcav2ReferenceUriMappingSSMParameterPath,
referenceUriSsmPath: icav2UmccriseGenomesReferenceUriMappingSSMParameterPath,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.Item.db_uuid.S",
"id_type": "db_uuid"
"id.$": "$.get_analysis_id_in_db_step.db_response.Item.portal_run_id.S",
"id_type": "portal_run_id"
},
"UpdateExpression": "SET analysis_status = :analysis_status",
"ExpressionAttributeValues": {
Expand All @@ -74,7 +74,7 @@
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.Item.db_uuid.S",
"id.$": "$.get_analysis_id_in_db_step.db_response.Item.portal_run_id.S",
"id_type": "event_logger"
},
"UpdateExpression": "SET analysis_status_list = list_append(analysis_status_list, :status), event_timestamp_list = list_append(event_timestamp_list, :event_timestamp)",
Expand Down Expand Up @@ -120,7 +120,7 @@
"Parameters": {
"StateMachineArn": "${__sfn_get_outputs_json__}",
"Input": {
"db_uuid.$": "$.get_analysis_id_in_db_step.db_response.Item.db_uuid.S"
"portal_run_id.$": "$.get_analysis_id_in_db_step.db_response.Item.portal_run_id.S"
}
},
"Next": "Pass",
Expand All @@ -140,26 +140,26 @@
"Wait (database - updates)": {
"Type": "Wait",
"Seconds": 1,
"Next": "Get UUID Row For Payload"
"Next": "Get Portal Run ID For Payload"
},
"Get UUID Row For Payload": {
"Get Portal Run ID For Payload": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.get_analysis_id_in_db_step.db_response.Item.db_uuid.S",
"id_type": "db_uuid"
"id.$": "$.get_analysis_id_in_db_step.db_response.Item.portal_run_id.S",
"id_type": "portal_run_id"
}
},
"ResultSelector": {
"portal_run_id.$": "$.Item.portal_run_id.S",
"portal_run_id.$": "$.Item.id.S",
"timestamp.$": "$$.State.EnteredTime",
"status.$": "$.Item.analysis_status.S",
"workflow_name": "${__workflow_type__}",
"workflow_version": "${__workflow_version__}",
"workflow_run_name.$": "$.Item.workflow_run_name.S",
"linked_libraries.$": "$.Item.linked_libraries.S",
"linked_libraries.$": "States.StringToJson($.Item.linked_libraries.S)",
"payload_version": "${__service_version__}",
"payload_inputs.$": "States.StringToJson($.Item.ready_event_data_inputs.S)",
"payload_tags.$": "States.StringToJson($.Item.tags.S)",
Expand Down Expand Up @@ -196,8 +196,9 @@
"Next": "Push Event to Orcabus",
"Parameters": {
"inputs.$": "$.get_event_data_step.payload_inputs",
"outputs.$": "$.get_event_data_step.payload_outputs",
"engineParameters.$": "$.get_event_data_step.payload_engine_parameters",
"outputs.$": "$.get_event_data_step.payload_outputs"
"tags.$": "$.get_event_data_step.payload_tags"
},
"ResultPath": "$.set_data_event_outputs"
},
Expand All @@ -221,8 +222,7 @@
"payload": {
"version": "${__service_version__}",
"data.$": "$.set_data_event_outputs"
},
"tags.$": "$.get_event_data_step.payload_tags"
}
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def handler(event, context):
project_id = event.get("project_id", None)
user_reference = event.get("user_reference", None)
input_json = json.loads(event.get("input_json", {}))
idempotency_key = event.get("idempotency_key", None)

# Get the output uris
analysis_output_uri = event.get("analysis_output_uri", None)
Expand Down Expand Up @@ -303,7 +304,9 @@ def handler(event, context):
# Generate the inputs and analysis object
# Call the object to launch it
logger.info("Launching the ICAv2 Analysis")
analysis_launch_obj: Analysis = analysis_obj()
analysis_launch_obj: Analysis = analysis_obj(
idempotency_key=idempotency_key
)

# Save the analysis
logger.info("Saving the analysis")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
wrapica>=2.27.1.post20240806222234,<2.28.0
wrapica>=2.27.1.post20240830140737,<2.28.0
boto3>=1.28
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"Next": "Add Technical Tags"
}
],
"Default": "Get PortalRunId DataBase Entry"
"Default": "Get Push Event Info From DB"
},
"Add Technical Tags": {
"Type": "Pass",
Expand All @@ -60,19 +60,16 @@
"Next": "Wait 1 Second (pre-input-generation)",
"Branches": [
{
"StartAt": "Initialise DB Item",
"StartAt": "Initialise Portal Run Id Item",
"States": {
"Initialise DB Item": {
"Initialise Portal Run Id Item": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "${__table_name__}",
"Item": {
"id.$": "$.workflow_inputs.payload.refId",
"id_type": "db_uuid",
"portal_run_id": {
"S.$": "$.workflow_inputs.portalRunId"
},
"id.$": "$.workflow_inputs.portalRunId",
"id_type": "portal_run_id",
"workflow_run_name": {
"S.$": "$.workflow_inputs.workflowRunName"
},
Expand Down Expand Up @@ -145,8 +142,8 @@
"Item": {
"id.$": "$.workflow_inputs.workflowRunName",
"id_type": "workflow_run_name",
"db_uuid": {
"S.$": "$.workflow_inputs.payload.refId"
"portal_run_id": {
"S.$": "$.workflow_inputs.portalRunId"
}
}
},
Expand All @@ -155,27 +152,6 @@
}
}
},
{
"StartAt": "Put Portal Partition Key Item",
"States": {
"Put Portal Partition Key Item": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "${__table_name__}",
"Item": {
"id.$": "$.workflow_inputs.portalRunId",
"id_type": "portal_run_id",
"db_uuid": {
"S.$": "$.workflow_inputs.payload.refId"
}
}
},
"ResultPath": "$.dynamo_db_put_item_step",
"End": true
}
}
},
{
"StartAt": "Add Event Logger Item",
"States": {
Expand All @@ -185,10 +161,10 @@
"Parameters": {
"TableName": "${__table_name__}",
"Item": {
"id.$": "$.workflow_inputs.payload.refId",
"id.$": "$.workflow_inputs.portalRunId",
"id_type": "event_logger",
"db_uuid": {
"S.$": "$.workflow_inputs.payload.refId"
"portal_run_id": {
"S.$": "$.workflow_inputs.portalRunId"
},
"event_timestamp_list": {
"L": [
Expand Down Expand Up @@ -231,7 +207,7 @@
"Parameters": {
"StateMachineArn": "${__set_input_json_state_machine_arn__}",
"Input": {
"db_uuid.$": "$.workflow_inputs.payload.refId"
"portal_run_id.$": "$.workflow_inputs.portalRunId"
}
},
"ResultPath": null,
Expand Down Expand Up @@ -279,8 +255,8 @@
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.workflow_inputs.payload.refId",
"id_type": "db_uuid"
"id.$": "$.workflow_inputs.portalRunId",
"id_type": "portal_run_id"
},
"UpdateExpression": "SET analysis_pipeline_id = :analysis_pipeline_id, engine_parameters = :engine_parameters",
"ExpressionAttributeValues": {
Expand Down Expand Up @@ -319,8 +295,8 @@
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.workflow_inputs.payload.refId",
"id_type": "db_uuid"
"id.$": "$.workflow_inputs.portalRunId",
"id_type": "portal_run_id"
}
},
"ResultSelector": {
Expand Down Expand Up @@ -358,7 +334,8 @@
"analysis_output_uri.$": "$.get_parameters_from_payload.analysis_output_uri",
"ica_logs_uri.$": "$.get_parameters_from_payload.ica_logs_uri",
"technical_tags.$": "$.add_technical_tags_step.technical_tags",
"user_tags.$": "$.get_parameters_from_payload.user_tags"
"user_tags.$": "$.get_parameters_from_payload.user_tags",
"idempotency_key.$": "$.workflow_inputs.portalRunId"
},
"FunctionName": "${__launch_icav2_pipeline_lambda_function_name__}"
},
Expand Down Expand Up @@ -389,18 +366,16 @@
"Type": "Parallel",
"Branches": [
{
"StartAt": "Update Db Uuid Item",
"StartAt": "Update Portal Run ID Item",
"States": {
"Update Db Uuid Item": {
"Update Portal Run ID Item": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id": {
"S.$": "$.workflow_inputs.payload.refId"
},
"id_type": "db_uuid"
"id.$": "$.workflow_inputs.portalRunId",
"id_type": "portal_run_id"
},
"UpdateExpression": "SET analysis_id = :analysis_id, analysis_status = :analysis_status, analysis_return_payload = :analysis_return_payload, analysis_launch_payload = :analysis_launch_payload, engine_parameters = :engine_parameters",
"ExpressionAttributeValues": {
Expand Down Expand Up @@ -437,8 +412,8 @@
"Item": {
"id.$": "$.launch_nextflow_object_step.analysis_id",
"id_type": "icav2_analysis_id",
"db_uuid": {
"S.$": "$.workflow_inputs.payload.refId"
"portal_run_id": {
"S.$": "$.workflow_inputs.portalRunId"
}
}
},
Expand Down Expand Up @@ -469,9 +444,9 @@
"Type": "Wait",
"Seconds": 1,
"Comment": "Wait for databases to sync before continuing",
"Next": "Get PortalRunId DataBase Entry"
"Next": "Get Push Event Info From DB"
},
"Get PortalRunId DataBase Entry": {
"Get Push Event Info From DB": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
Expand All @@ -481,25 +456,9 @@
"id_type": "portal_run_id"
}
},
"Next": "Get Push Event Info From DB",
"ResultPath": "$.database_event_data",
"ResultSelector": {
"db_uuid.$": "$.Item.db_uuid.S"
}
},
"Get Push Event Info From DB": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.database_event_data.db_uuid",
"id_type": "db_uuid"
}
},
"Next": "Push event to orcabus",
"ResultSelector": {
"portalRunId.$": "$.Item.portal_run_id.S",
"portalRunId.$": "$.Item.id.S",
"timestamp.$": "$$.State.EnteredTime",
"status.$": "$.Item.analysis_status.S",
"workflowName": "${__workflow_type__}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class DynamodbNonPartitionedPipelineConstruct extends Construct {
super(scope, id);

this.tableObj = new dynamodb.TableV2(this, 'dynamodb_partitioned_pipeline_table', {
/* Either a db_uuid or an icav2 analysis id or a portal run id */
/* Either a portal run id or an icav2 analysis id */
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface WorkflowRunStateChangeInternalInputMakerProps {
/* Object name prefixes */
stateMachinePrefix: string;
lambdaPrefix: string;
rulePrefix: string;
/* Table configs */
tableObj: dynamodb.ITableV2;
tablePartitionName: string;
Expand Down Expand Up @@ -204,6 +205,7 @@ export class WorkflowDraftRunStateChangeToWorkflowRunStateChangeReadyConstruct e
Part 4 - Set up a rule to trigger the state machine
*/
const rule = new events.Rule(this, 'workflowrunstatechangeparser_event_rule', {
ruleName: `${props.rulePrefix}-rule`,
eventBus: props.eventBusObj,
eventPattern: {
source: [props.triggerSource],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
wrapica>=2.27.1.post20240806222234
wrapica>=2.27.1.post20240830140737
boto3>=1.34.0
Loading

0 comments on commit d034033

Please sign in to comment.