Skip to content

Commit

Permalink
Fixes for nf pipeline
Browse files Browse the repository at this point in the history
* Updated stacky names from mock to stacky
* ICAV2_ACCESS_TOKEN_SECRET_ID an optional env var for fill placeholders lambda
* Add tagResource policy to nf stack
* Fix launch nextflow pipeline sfn
* Fix rule for default Batch event
* Fix batch parameter path link lambda
* Fix override resource requirements
* Add portal run id to engine parameters and to batch parameters
* Add orcabus: true to parameters to allow v1 portal to filter out events
* Fix url extension function
* Fix rna pipeline list flatten
  • Loading branch information
Alexis Lucattini authored and Alexis Lucattini committed Oct 25, 2024
1 parent b0388fd commit 07a114c
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 161 deletions.
70 changes: 49 additions & 21 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ export const oncoanalyserBatchJobDefinitionArn: Record<AppStage, string> = {
[AppStage.PROD]: `arn:aws:batch:${region}:${accountIdAlias.prod}:job-definition/Nextflow-oncoanalyser`, // pragma: allowlist secret
};
export const oncoanalyserStateMachinePrefix = 'oncoanalyser';

export const oncoanalyserPipelineVersionSSMParameterPath =
'/nextflow_stack/oncoanalyser/pipeline_version_tag';

Expand Down Expand Up @@ -750,28 +751,55 @@ export const sashBatchJobDefinitionArn: Record<AppStage, string> = {
export const sashStateMachinePrefix = 'sash';
export const sashPipelineVersionSSMParameterPath = '/nextflow_stack/sash/pipeline_version_tag';

// Mock Stack
export const mockEventBusName = eventBusName;
export const mockInstrumentRunTableName = 'stacky-instrument-run-table';
export const mockInputMakerTableName = 'stacky-input-maker-table';
export const mockCttsov2InputGlueTableName = 'stacky-cttsov2-workflow-glue-table';
export const mockWgtsQcGlueTableName = 'stacky-wgts-qc-glue-table';
export const mockTnGlueTableName = 'stacky-tn-glue-table';
export const mockWtsGlueTableName = 'stacky-wts-glue-table';
export const mockUmccriseGlueTableName = 'stacky-umccrise-glue-table';
export const mockRnasumGlueTableName = 'stacky-rnasum-glue-table';
export const mockPierianDxGlueTableName = 'stacky-pieriandx-glue-table';
export const mockOncoanalyserGlueTableName = 'stacky-oncoanalyser-glue-table';
export const mockOncoanalyserBothSashGlueTableName = 'stacky-oncoanalyser-both-sash-glue-table';
export const mockWorkflowManagerTableName = 'stacky-workflow-manager-table';

// Stacky Stack
export const stackyEventBusName = eventBusName;
export const stackyInstrumentRunTableName = 'stacky-instrument-run-table';
export const stackyInputMakerTableName = 'stacky-input-maker-table';
export const stackyCttsov2InputGlueTableName = 'stacky-cttsov2-workflow-glue-table';
export const stackyWgtsQcGlueTableName = 'stacky-wgts-qc-glue-table';
export const stackyTnGlueTableName = 'stacky-tn-glue-table';
export const stackyWtsGlueTableName = 'stacky-wts-glue-table';
export const stackyUmccriseGlueTableName = 'stacky-umccrise-glue-table';
export const stackyRnasumGlueTableName = 'stacky-rnasum-glue-table';
export const stackyPierianDxGlueTableName = 'stacky-pieriandx-glue-table';
export const stackyOncoanalyserGlueTableName = 'stacky-oncoanalyser-glue-table';
export const stackyOncoanalyserBothSashGlueTableName = 'stacky-oncoanalyser-both-sash-glue-table';
export const stackyWorkflowManagerTableName = 'stacky-workflow-manager-table';

// dev
// {
// "project_id":"ea19a3f5-ec7c-4940-a474-c31cd91dbad4",
// "project_name": "development"
// }
export const mockIcav2ProjectIdSsmParameterName = '/orcabus/stacky/icav2_project_id_and_name_json';
export const mockPrimaryOutputUriSsmParameterName = '/orcabus/stacky/primary_output_uri'; // icav2://7595e8f2-32d3-4c76-a324-c6a85dae87b5/primary/__instrument_run_id__/__portal_run_id__/
export const mockAnalysisOutputUriSsmParameterName = '/orcabus/stacky/analysis_output_uri'; // icav2://7595e8f2-32d3-4c76-a324-c6a85dae87b5/analysis/__workflow_name__/__portal_run_id__/
export const mockAnalysisLogsUriSsmParameterName = '/orcabus/stacky/analysis_logs_uri'; // icav2://7595e8f2-32d3-4c76-a324-c6a85dae87b5/logs/__workflow_name__/__portal_run_id__/

export const mockAnalysisCacheUriSsmParameterName = '/orcabus/stacky/analysis_cache_uri'; // icav2://7595e8f2-32d3-4c76-a324-c6a85dae87b5/cache/__workflow_name__/__portal_run_id__/
// stg
// {
// "project_id": "157b9e78-b2e1-45a7-bfcd-691159995f7c",
// "project_name": "staging"
// }
// prod
// {
// "project_id": "eba5c946-1677-441d-bbce-6a11baadecbb",
// "project_name": "production"
// }
export const stackyIcav2ProjectIdSsmParameterName =
'/orcabus/stacky/icav2_project_id_and_name_json';

// dev: s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/__instrument_run_id__/__portal_run_id__/
// stg: s3://pipeline-stg-cache-503977275616-ap-southeast-2/byob-icav2/staging/primary/__instrument_run_id__/__portal_run_id__/
// prod: s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/primary/__instrument_run_id__/__portal_run_id__/
export const stackyPrimaryOutputUriSsmParameterName = '/orcabus/stacky/primary_output_uri';

// dev: s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/analysis/__workflow_name__/__portal_run_id__/
// stg: s3://pipeline-stg-cache-503977275616-ap-southeast-2/byob-icav2/staging/analysis/__workflow_name__/__portal_run_id__/
// prod: s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/__workflow_name__/__portal_run_id__/
export const stackyAnalysisOutputUriSsmParameterName = '/orcabus/stacky/analysis_output_uri';

// dev: s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/logs/__workflow_name__/__portal_run_id__/
// stg: s3://pipeline-stg-cache-503977275616-ap-southeast-2/byob-icav2/staging/logs/__workflow_name__/__portal_run_id__/
// prod: s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/logs/__workflow_name__/__portal_run_id__/
export const stackyAnalysisLogsUriSsmParameterName = '/orcabus/stacky/analysis_logs_uri';

// dev: s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/cache/__workflow_name__/__portal_run_id__/
// stg: s3://pipeline-stg-cache-503977275616-ap-southeast-2/byob-icav2/staging/cache/__workflow_name__/__portal_run_id__/
// prod: s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/cache/__workflow_name__/__portal_run_id__/
export const stackyAnalysisCacheUriSsmParameterName = '/orcabus/stacky/analysis_cache_uri';
96 changes: 48 additions & 48 deletions config/stacks/stackyMcStackFace.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,60 @@
import {
mockPrimaryOutputUriSsmParameterName,
mockAnalysisCacheUriSsmParameterName,
mockAnalysisLogsUriSsmParameterName,
mockAnalysisOutputUriSsmParameterName,
mockEventBusName,
mockIcav2ProjectIdSsmParameterName,
mockInputMakerTableName,
mockInstrumentRunTableName,
mockWorkflowManagerTableName,
mockCttsov2InputGlueTableName,
stackyPrimaryOutputUriSsmParameterName,
stackyAnalysisCacheUriSsmParameterName,
stackyAnalysisLogsUriSsmParameterName,
stackyAnalysisOutputUriSsmParameterName,
stackyEventBusName,
stackyIcav2ProjectIdSsmParameterName,
stackyInputMakerTableName,
stackyInstrumentRunTableName,
stackyWorkflowManagerTableName,
stackyCttsov2InputGlueTableName,
icav2AccessTokenSecretName,
mockWgtsQcGlueTableName,
stackyWgtsQcGlueTableName,
AppStage,
mockTnGlueTableName,
mockWtsGlueTableName,
mockUmccriseGlueTableName,
mockRnasumGlueTableName,
mockPierianDxGlueTableName,
stackyTnGlueTableName,
stackyWtsGlueTableName,
stackyUmccriseGlueTableName,
stackyRnasumGlueTableName,
stackyPierianDxGlueTableName,
pieriandxProjectInfoSsmParameterPath,
redcapLambdaFunctionName,
mockOncoanalyserGlueTableName,
mockOncoanalyserBothSashGlueTableName,
stackyOncoanalyserGlueTableName,
stackyOncoanalyserBothSashGlueTableName,
} from '../constants';
import { GlueStackConfig } from '../../lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs';
import { StackyStatefulTablesConfig } from '../../lib/workload/stateful/stacks/stacky-mcstackface-dynamodb';

export const getGlueStackProps = (stage: AppStage): GlueStackConfig => {
return {
/* Events */
eventBusName: mockEventBusName,
eventBusName: stackyEventBusName,

/* Tables */
inputMakerTableName: mockInputMakerTableName,
instrumentRunTableName: mockInstrumentRunTableName,
workflowManagerTableName: mockWorkflowManagerTableName,
cttsov2GlueTableName: mockCttsov2InputGlueTableName,
wgtsQcGlueTableName: mockWgtsQcGlueTableName,
tnGlueTableName: mockTnGlueTableName,
wtsGlueTableName: mockWtsGlueTableName,
umccriseGlueTableName: mockUmccriseGlueTableName,
rnasumGlueTableName: mockRnasumGlueTableName,
pieriandxGlueTableName: mockPierianDxGlueTableName,
oncoanalyserGlueTableName: mockOncoanalyserGlueTableName,
oncoanalyserBothSashGlueTableName: mockOncoanalyserBothSashGlueTableName,
inputMakerTableName: stackyInputMakerTableName,
instrumentRunTableName: stackyInstrumentRunTableName,
workflowManagerTableName: stackyWorkflowManagerTableName,
cttsov2GlueTableName: stackyCttsov2InputGlueTableName,
wgtsQcGlueTableName: stackyWgtsQcGlueTableName,
tnGlueTableName: stackyTnGlueTableName,
wtsGlueTableName: stackyWtsGlueTableName,
umccriseGlueTableName: stackyUmccriseGlueTableName,
rnasumGlueTableName: stackyRnasumGlueTableName,
pieriandxGlueTableName: stackyPierianDxGlueTableName,
oncoanalyserGlueTableName: stackyOncoanalyserGlueTableName,
oncoanalyserBothSashGlueTableName: stackyOncoanalyserBothSashGlueTableName,

/* SSM Parameters */
analysisCacheUriSsmParameterName: mockAnalysisCacheUriSsmParameterName,
analysisOutputUriSsmParameterName: mockAnalysisOutputUriSsmParameterName,
icav2ProjectIdSsmParameterName: mockIcav2ProjectIdSsmParameterName,
analysisLogsUriSsmParameterName: mockAnalysisLogsUriSsmParameterName,
analysisCacheUriSsmParameterName: stackyAnalysisCacheUriSsmParameterName,
analysisOutputUriSsmParameterName: stackyAnalysisOutputUriSsmParameterName,
icav2ProjectIdSsmParameterName: stackyIcav2ProjectIdSsmParameterName,
analysisLogsUriSsmParameterName: stackyAnalysisLogsUriSsmParameterName,

/* Secrets */
icav2AccessTokenSecretName: icav2AccessTokenSecretName[stage],

/* BSSH SSM Parameters */
bsshOutputFastqCopyUriSsmParameterName: mockPrimaryOutputUriSsmParameterName,
bsshOutputFastqCopyUriSsmParameterName: stackyPrimaryOutputUriSsmParameterName,

/* PierianDx SSM Parameters */
pieriandxProjectInfoSsmParameterPath: pieriandxProjectInfoSsmParameterPath,
Expand All @@ -64,17 +64,17 @@ export const getGlueStackProps = (stage: AppStage): GlueStackConfig => {

export const getStatefulGlueStackProps = (): StackyStatefulTablesConfig => {
return {
dynamodbInstrumentRunManagerTableName: mockInstrumentRunTableName,
dynamodbWorkflowManagerTableName: mockWorkflowManagerTableName,
dynamodbInputGlueTableName: mockInputMakerTableName,
dynamodbCttsov2WorkflowGlueTableName: mockCttsov2InputGlueTableName,
dynamodbWgtsQcGlueTableName: mockWgtsQcGlueTableName,
dynamodbTnGlueTableName: mockTnGlueTableName,
dynamodbWtsGlueTableName: mockWtsGlueTableName,
dynamodbUmccriseGlueTableName: mockUmccriseGlueTableName,
dynamodbRnasumGlueTableName: mockRnasumGlueTableName,
dynamodbPieriandxGlueTableName: mockPierianDxGlueTableName,
dynamodbOncoanalyserGlueTableName: mockOncoanalyserGlueTableName,
dynamodbOncoanalyserBothSashGlueTableName: mockOncoanalyserBothSashGlueTableName,
dynamodbInstrumentRunManagerTableName: stackyInstrumentRunTableName,
dynamodbWorkflowManagerTableName: stackyWorkflowManagerTableName,
dynamodbInputGlueTableName: stackyInputMakerTableName,
dynamodbCttsov2WorkflowGlueTableName: stackyCttsov2InputGlueTableName,
dynamodbWgtsQcGlueTableName: stackyWgtsQcGlueTableName,
dynamodbTnGlueTableName: stackyTnGlueTableName,
dynamodbWtsGlueTableName: stackyWtsGlueTableName,
dynamodbUmccriseGlueTableName: stackyUmccriseGlueTableName,
dynamodbRnasumGlueTableName: stackyRnasumGlueTableName,
dynamodbPieriandxGlueTableName: stackyPierianDxGlueTableName,
dynamodbOncoanalyserGlueTableName: stackyOncoanalyserGlueTableName,
dynamodbOncoanalyserBothSashGlueTableName: stackyOncoanalyserBothSashGlueTableName,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import boto3
import typing


if typing.TYPE_CHECKING:
from mypy_boto3_ssm import SSMClient
from mypy_boto3_secretsmanager import SecretsManagerClient
Expand Down Expand Up @@ -191,8 +190,9 @@ def set_icav2_env_vars():


def handler(event, context):
# Set env vars
set_icav2_env_vars()
if environ.get('ICAV2_ACCESS_TOKEN_SECRET_ID', None) is not None:
# Set env vars
set_icav2_env_vars()

# Get the portal run id from the event
portal_run_id: str = event.get('portal_run_id', None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class WfmWorkflowStateChangeNfBatchReadyEventHandlerConstruct extends Con
const submitJobPolicy = new iam.Policy(this, 'submitJobPolicy', {
statements: [
new iam.PolicyStatement({
actions: ['batch:SubmitJob'],
actions: ['batch:SubmitJob', 'batch:TagResource'],
resources: [
props.batchJobDefinitionObj.jobDefinitionArn,
props.batchJobQueueObj.jobQueueArn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@
"Get Default Pipeline Version": {
"Type": "Task",
"Parameters": {
"Name": "__pipeline_version_ssm_path__"
"Name": "${__pipeline_version_ssm_path__}"
},
"Resource": "arn:aws:states:::aws-sdk:ssm:getParameter",
"Next": "Generate Batch Submission Job",
"ResultPath": "$.get_default_pipeline_version_step",
"ResultSelector": {
"default_pipeline_version.$": "$.Parameter.String"
}
"default_pipeline_version.$": "$.Parameter.Value"
},
"ResultPath": "$.get_default_pipeline_version_step",
"Next": "Generate Batch Submission Job"
},
"Generate Batch Submission Job": {
"Type": "Task",
Expand All @@ -71,7 +71,8 @@
"Payload": {
"inputs.$": "$.workflow_inputs.payload.data.inputs",
"engine_parameters.$": "$.workflow_inputs.payload.data.engineParameters",
"default_pipeline_version.$": "$.get_default_pipeline_version_step.default_pipeline_version"
"default_pipeline_version.$": "$.get_default_pipeline_version_step.default_pipeline_version",
"portal_run_id.$": "$.workflow_inputs.portalRunId"
}
},
"Retry": [
Expand Down Expand Up @@ -99,7 +100,7 @@
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob",
"Parameters": {
"JobName": "$.workflow_inputs.workflowRunName",
"JobName.$": "$.workflow_inputs.workflowRunName",
"JobDefinition": "${__job_definition_arn__}",
"JobQueue": "${__job_queue_name__}",
"ContainerOverrides": {
Expand All @@ -111,7 +112,7 @@
"Parameters.$": "$.get_batch_submission_step.parameters"
},
"ResultSelector": {
"jobId.$": "$.Output.jobId"
"jobId.$": "$.JobId"
},
"ResultPath": "$.submit_oncoanalyser_job_step",
"Next": "Append Job Id to Engine Parameters"
Expand All @@ -136,7 +137,7 @@
"UpdateExpression": "SET engine_parameters = :engine_parameters",
"ExpressionAttributeValues": {
":engine_parameters": {
"S.$": "States.StringToJson($.merge_engine_parameters_step.engine_parameters)"
"S.$": "States.JsonToString($.merge_engine_parameters_step.engine_parameters)"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ export interface WfmWorkflowStateChangeNfBatchStateChangeEventHandlerConstructPr
export class WfmWorkflowStateChangeNfBatchStateChangeEventHandlerConstruct extends Construct {
public readonly stateMachineObj: sfn.StateMachine;
private readonly globals = {
defaultEventBusName: 'default',
eventStatus: 'SUBMITTED',
portalRunTablePartitionName: 'portal_run_id',
eventDetailType: 'WorkflowRunStateChange',
serviceVersion: '2024.10.17',
batchEventBusName: 'default',
batchEventSource: 'aws.batch',
batchEventDetailType: 'Batch Job State Change',
};

constructor(
Expand All @@ -56,7 +58,7 @@ export class WfmWorkflowStateChangeNfBatchStateChangeEventHandlerConstruct exten
const defaultEventBus = events.EventBus.fromEventBusName(
this,
'default-event-bus',
this.globals.defaultEventBusName
this.globals.batchEventBusName
);

// Build state machine object
Expand Down Expand Up @@ -94,12 +96,18 @@ export class WfmWorkflowStateChangeNfBatchStateChangeEventHandlerConstruct exten
// Create a rule for this state machine
const rule = new events.Rule(this, 'rule', {
eventBus: defaultEventBus,
ruleName: `${props.stateMachinePrefix}-wrsc-rule`,
ruleName: `${props.stateMachinePrefix}-batch-state-change-rule`,
eventPattern: {
source: [props.triggerLaunchSource],
detailType: [props.detailType],
source: [this.globals.batchEventSource],
detailType: [this.globals.batchEventDetailType],
detail: {
jobDefinition: [{ 'equals-ignore-case': props.batchJobDefinitionObj.jobDefinitionArn }],
// Has a trailing ':1' at the end of the job definition object
jobDefinition: [
{ prefix: { 'equals-ignore-case': props.batchJobDefinitionObj.jobDefinitionArn } },
],
parameters: {
orcabus: [{ exists: true }],
},
},
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,19 @@ def handler(event, context):
)


if __name__ == "__main__":
import json
print(
json.dumps(
handler(
{
"gds_folder_path": "/Runs/231109_A01052_0171_BHLJW7DSX7_r.NULhvzxcSEWmqZw8QljXfQ",
"samplesheet_name": "SampleSheet.V2.1711336300.924772.csv",
"gds_volume_name": "bssh.acddbfda498038ed99fa94fe79523959",
"instrument_run_id": "231109_A01052_0171_BHLJW7DSX7"
},
None
),
indent=2
)
)
# if __name__ == "__main__":
# import json
# print(
# json.dumps(
# handler(
# {
# "gds_folder_path": "/Runs/231109_A01052_0171_BHLJW7DSX7_r.NULhvzxcSEWmqZw8QljXfQ",
# "samplesheet_name": "SampleSheet.V2.1711336300.924772.csv",
# "gds_volume_name": "bssh.acddbfda498038ed99fa94fe79523959",
# "instrument_run_id": "231109_A01052_0171_BHLJW7DSX7"
# },
# None
# ),
# indent=2
# )
# )
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ export class OncoanalyserNfPipelineManagerStack extends cdk.Stack {
Build lambdas
*/
const setBatchParametersLambdaObj = new PythonFunction(this, 'get_batch_parameters', {
entry: path.join(__dirname, '../lambdas/get_outputs_py'),
entry: path.join(__dirname, '../lambdas/generate_batch_parameters_py'),
runtime: lambda.Runtime.PYTHON_3_12,
architecture: lambda.Architecture.ARM_64,
index: 'get_outputs.py',
index: 'generate_batch_parameters.py',
handler: 'handler',
memorySize: 1024,
});
Expand Down
Loading

0 comments on commit 07a114c

Please sign in to comment.