From b5bc093e5cd853b5c22cddfb2016234f94094794 Mon Sep 17 00:00:00 2001 From: MokaGuys Date: Wed, 18 Oct 2023 15:52:55 +0100 Subject: [PATCH] changes to split up TSO runs, use latest TSO app, latest duty_csv app --- automate_demultiplex_config.py | 22 +++-- upload_and_setoff_workflows.py | 150 +++++++++++++++++++++------------ 2 files changed, 108 insertions(+), 64 deletions(-) diff --git a/automate_demultiplex_config.py b/automate_demultiplex_config.py index 24602d93..8bcf26b1 100644 --- a/automate_demultiplex_config.py +++ b/automate_demultiplex_config.py @@ -56,6 +56,12 @@ "999999_A01229_0182_AHM2TSO500", ] +# TSO500 batch size (for splitting samplesheet) +if testing: + batch_size = 2 +else: + batch_size = 16 + # path to log file which records the output of the upload agent upload_and_setoff_workflow_logfile = ( "{document_root}/automate_demultiplexing_logfiles/upload_agent_script_logfiles/" @@ -134,7 +140,7 @@ # MokaSNP ID mokasnp_pipeline_ID = "5091" # TSO500 pipeline ID -TSO_pipeline_ID = "5237" +TSO_pipeline_ID = "5288" #TSO v1.6 # -- Moka WES test status-- # Test Status = NextSEQ sequencing @@ -170,8 +176,8 @@ congenica_app_path = "Apps/congenica_upload_v1.3.2" congenica_SFTP_upload_app = "applet-GFfJpj80jy1x1Bz1P1Bk3vQf" -# TSO500 app TODO update to new version of app v1.6.0 -tso500_app = "applet-GPgkz0j0jy1Yf4XxkXjVgKfv" # Apps/TSO500_v1.5.1 +# TSO500 app +tso500_app = "applet-GZgv0Jj0jy1Yfbx3QvqyKjzp" # Apps/TSO500_v1.6.0 tso500_app_name = "TSO500_v1.6.0" tso500_docker_image = ( "project-ByfFPz00jy1fk6PjpZ95F27J:file-Fz9Zyx00b5j8xKVkKv4fZ6JB" @@ -1256,7 +1262,7 @@ }, "Pan4969": { # TSO500 no UTRs. TERT promoter "TSO500": True, - "sambamba_bedfile": "Pan5130dataSambamba.bed", + "sambamba_bedfile": "Pan5205dataSambamba.bed", "clinical_coverage_depth": 100, "multiqc_coverage_level": 100, "coverage_min_basecall_qual": 25, @@ -1265,7 +1271,7 @@ "Pan5085": { # TSO500 High throughput Synnovis. no UTRs. TERT promoter "TSO500": True, "TSO500_high_throughput": True, - "sambamba_bedfile": "Pan5130dataSambamba.bed", + "sambamba_bedfile": "Pan5205dataSambamba.bed", "clinical_coverage_depth": 100, "multiqc_coverage_level": 100, "coverage_min_basecall_qual": 25, @@ -1274,7 +1280,7 @@ "Pan5112": { # TSO500 High throughput BSPS. no UTRs. TERT promoter "TSO500": True, "TSO500_high_throughput": True, - "sambamba_bedfile": "Pan5130dataSambamba.bed", + "sambamba_bedfile": "Pan5205dataSambamba.bed", "clinical_coverage_depth": 100, "multiqc_coverage_level": 100, "coverage_min_basecall_qual": 25, @@ -1284,7 +1290,7 @@ "Pan5114": { # TSO500 High throughput Control. no UTRs. TERT promoter "TSO500": True, "TSO500_high_throughput": True, - "sambamba_bedfile": "Pan5130dataSambamba.bed", + "sambamba_bedfile": "Pan5205dataSambamba.bed", "clinical_coverage_depth": 100, "multiqc_coverage_level": 100, "coverage_min_basecall_qual": 25, @@ -1765,7 +1771,7 @@ } duty_csv_id = ( - "project-ByfFPz00jy1fk6PjpZ95F27J:applet-GQg9J280jy1Zf79KGx9gk5K3" + "project-ByfFPz00jy1fk6PjpZ95F27J:applet-GZYx3Kj0kKj3YBV7qgK6VjXQ" ) duty_csv_inputs = { # tso_pannumbers should not include the dry lab pan number diff --git a/upload_and_setoff_workflows.py b/upload_and_setoff_workflows.py index 5840e4bb..a458a896 100644 --- a/upload_and_setoff_workflows.py +++ b/upload_and_setoff_workflows.py @@ -114,6 +114,11 @@ def __init__(self, runfolder): + self.runfolder_name + "_congenica_upload_commands.sh" ) + self.TSO500_post_run_command_script = ( + config.DNA_Nexus_workflow_logfolder + + self.runfolder_name + + "_TSO_post_run_commands.sh" + ) #TODO copy lines above to create separate dx run commands output script for TSO (to be run by duty binfx) self.nexus_project_name = "" self.nexus_path = "" @@ -340,7 +345,7 @@ def quarterback(self): if TSO500_sample_list: self.list_of_processed_samples, self.fastq_string = ( TSO500_sample_list, - self.runfolder_obj.runfolder_samplesheet_path, #TODO this sets the fastq_string to be the samplesheet path + self.runfolder_obj.runfolder_samplesheet_path, #this sets the fastq_string to be the samplesheet path ) else: @@ -376,7 +381,7 @@ def quarterback(self): # pass path to function which checks files were uploaded without error if TSO500_sample_list: # split TSO samplesheet to multiple sheets with <=16 samples/sheet - self.TSO500_samplesheets_list = self.split_tso500_sampleheet(): + self.TSO500_samplesheets_list = self.split_TSO500_sampleheet() backup_attempt_count = 1 while backup_attempt_count < 5: self.loggers.script.info( @@ -393,8 +398,6 @@ def quarterback(self): backup_attempt_count += 1 #upload fastqs. if TSO500 run, this uploads the samplesheet to the project root - #TODO make this an else for the above if TSO500_sample_list. then split and upload TSO samplesheets separately. - # TODO check whether upload_fastqs() output or related variables are used elsewhere self.look_for_upload_errors(self.upload_fastqs()) # upload cluster density files and check upload was successful. @@ -634,7 +637,7 @@ def check_for_TSO500(self): open(self.loggers.upload_agent.filepath, "w").close() return sample_list - def split_tso500_sampleheet(self): + def split_TSO500_sampleheet(self): """ take TSO500 samplesheet and split in to parts with <=16 samples/sheet write samplesheets to runfolder @@ -642,7 +645,8 @@ def split_tso500_sampleheet(self): they'll be uploaded to DNAnexus, can access from there for dx run cmds) """ # samplesheet in the runfolder - samplesheet_file = self.runfolder_samplesheet_name + samplesheet_file = os.path.join(self.runfolder_obj.runfolderpath, self.runfolder_obj.runfolder_samplesheet_name) + # Read all lines from the sample sheet with open(samplesheet_file) as samplesheet: all_lines = samplesheet.readlines() @@ -652,8 +656,8 @@ def split_tso500_sampleheet(self): # sample lines start with "TSO". This excludes empty lines below the samples list, i.e. lines containing ",,,,,,," samples = [sample for sample in all_lines[25:] if sample.startswith("TSO")] - # Split samples into batches of 16 - batches = [samples[i:i + 16] for i in range(0, len(samples), 16)] + # Split samples into batches (size specified in config) + batches = [samples[i:i + config.batch_size] for i in range(0, len(samples), config.batch_size)] # Write batches to separate files named "PartXofY", and add samplesheet to list samplesheet_list = [] @@ -661,11 +665,11 @@ def split_tso500_sampleheet(self): samplesheet_base_name = samplesheet_file.split(".csv")[0] for samplesheet_count, batch in enumerate(batches, start=1): #capture samplesheet file path to write samplesheet paths to the runfolder - samplesheet_filename = "%sPart%sof%s.csv" % (samplesheet_base_name,samplesheet_count,number_of_batches) + samplesheet_filepath = "%sPart%sof%s.csv" % (samplesheet_base_name,samplesheet_count,number_of_batches) # capture samplesheet name to write to list- use runfolder name samplesheet_name = "%s_SampleSheetPart%sof%s.csv" % (self.runfolder_obj.runfolder_name,samplesheet_count,number_of_batches) samplesheet_list.append(samplesheet_name) - with open(samplesheet_filename, "a") as new_samplesheet: + with open(samplesheet_filepath, "a") as new_samplesheet: new_samplesheet.writelines(samplesheet_header) new_samplesheet.writelines(batch) @@ -1608,51 +1612,23 @@ def start_building_dx_run_cmds(self): for cmd in self.determine_exome_depth_requirements(pannnumber_list): commands_list.append(cmd) + # write TSO commands if a TSO run. if TSO500: - commands_list.append("#The TSOapp is set off first. This utilises the --wait flag, so the bash script waits until this job finishes before running the coverage, hap.py and fastqc commands using the samplesheet to determine expected files and their locations ") - commands_list.append("#All jobs apart from control samples are added to the depends on list used to delay multiqc") - # build command for the TSO500 app and set off fastqc commands - # TODO add for loop here to loop through samplesheets and write command for each + commands_list.append("#The TSOapp is set off once for each samplesheet made") + commands_list.append("#Other jobs must be set off manually by running the file once the pipeline has finished") + # build commands for the TSO500 app and set off fastqc commands (need a command per samplesheet) for samplesheet in self.TSO500_samplesheets_list: commands_list.append(self.create_tso500_command(samplesheet)) - # TODO modify this to handle creating separate file for TSO commands - commands_list.append(self.add_to_depends_list("TSO500", 'depends_list')) + self.build_TSO500_post_run_commands() + + # TSO500 multiqc commands are written to a separate file with a function called above + if not TSO500: + commands_list.append(self.create_multiqc_command()) + commands_list.append(self.add_to_depends_list("MultiQC", 'depends_list')) + commands_list.append(self.create_upload_multiqc_command(TSO500)) + commands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list')) - # For TSO samples, the fastqs are created within DNAnexus and the - # commands are generated using sample names parsed from the - # samplesheet. If for whatever reason those fastqs are not created - # by the DNAnexus app, the downstream job will not set off and - # therefore will produce no job ID to provide to the depends_list, - # which will create an error/ slack alert. To solve this problem, - # the job ID is only added to the depends list if it exits - for sample in self.list_of_processed_samples: - pannumber = re.search(r"Pan\d+", sample).group() - commands_list.append( - self.create_fastqc_command(sample) - ) - # Only add to depends_list if job ID from previous command - # is not empty - commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list')) - - commands_list.append(self.create_sambamba_cmd(sample, pannumber)) - # Exclude negative controls from the depends list as the NTC - # coverage calculation can often fail. We want the coverage - # report for the NTC sample to help assess contamination. - # Only add to depends_list if job ID from previous command - # is not empty - commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list')) - - if "HD200" in sample: - commands_list.append(self.create_sompy_cmd(sample, pannumber)) - # Only add to depends_list if job ID from previous command - # is not empty - commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list("sompy", 'depends_list')) - - commands_list.append(self.create_multiqc_command()) - commands_list.append(self.add_to_depends_list("MultiQC", 'depends_list')) - commands_list.append(self.create_upload_multiqc_command(TSO500)) - commands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list')) # setoff the below commands later as they are not depended upon by # MultiQC but are required for duty_csv if rpkm_list: @@ -1664,10 +1640,73 @@ def start_building_dx_run_cmds(self): commands_list.append(self.add_to_depends_list("rpkm", 'depends_list')) commands_list.append(self.add_to_depends_list("depends", 'depends_list_recombined')) - commands_list.append(self.create_duty_csv_command()) + if not TSO500: + commands_list.append(self.create_duty_csv_command()) return commands_list + def build_TSO500_post_run_commands(self): + """ + Function to build TSO500 commands to run after pipeline, i.e. + Fastqc, sambamba, sompy, multiqc, upload multiqc and duty_csv + Commands must be written to file _TSO_post_run_commands.sh + which can be run manually once pipeline done. + For TSO samples, the fastqs are created within DNAnexus and the + commands are generated using sample names parsed from the + samplesheet. If for whatever reason those fastqs are not created + by the DNAnexus app, the downstream job will not set off and + therefore will produce no job ID to provide to the depends_list, + which will create an error/ slack alert. To solve this problem, + the job ID is only added to the depends list if it exits + """ + # Update script log file to say what is being done. + self.loggers.script.info("Building dx run commands for TSO500 post pipeline processing") + + # list to hold all commands. + TSO500 = True + TSOcommands_list = [] + TSOcommands_list.append(self.source_command) + TSOcommands_list.append(self.empty_depends) + TSOcommands_list.append(self.empty_gatk_depends) + + for sample in self.list_of_processed_samples: + pannumber = re.search(r"Pan\d+", sample).group() + TSOcommands_list.append( + self.create_fastqc_command(sample) + ) + # Only add to depends_list if job ID from previous command + # is not empty + TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list')) + + TSOcommands_list.append(self.create_sambamba_cmd(sample, pannumber)) + # Exclude negative controls from the depends list as the NTC + # coverage calculation can often fail. We want the coverage + # report for the NTC sample to help assess contamination. + # Only add to depends_list if job ID from previous command + # is not empty + TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list')) + + if "HD200" in sample: + TSOcommands_list.append(self.create_sompy_cmd(sample, pannumber)) + # Only add to depends_list if job ID from previous command + # is not empty + TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list("sompy", 'depends_list')) + + TSOcommands_list.append(self.create_upload_multiqc_command(TSO500)) + TSOcommands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list')) + + TSOcommands_list.append(self.create_duty_csv_command()) + + with open( + self.runfolder_obj.TSO500_post_run_command_script, "w" + ) as TSO500_commands: + # remove any None values from the command_list + TSO500_commands.writelines( + [line + "\n" for line in filter(None, TSOcommands_list)] + ) + + return TSOcommands_list + def determine_exome_depth_requirements(self,pannnumber_list): """ This function takes a list of all pan numbers found on this run. @@ -1903,7 +1942,6 @@ def create_tso500_command(self,samplesheet): ## docker image (from config) ## runfolder_tar and samplesheet paths (from runfolder_obj class) ## analysis options eg --isNovaSeq flag - # TODO modify for new way of setting off app. WAIT removed dx_command_list = [ self.tso500_dx_command, # ends with --name so supply the runfolder name to name the job self.runfolder_obj.runfolder_name, @@ -1912,13 +1950,13 @@ def create_tso500_command(self,samplesheet): config.TSO500_samplesheet_stage, self.runfolder_obj.nexus_project_id + ":" - + self.#TODO not sure if this will work...find runfolder name in DNAnexus project + + self.runfolder_subdir + "/" - + samplesheet + + samplesheet, config.TSO500_project_name_stage, self.runfolder_obj.nexus_project_name, - config.TSO500_runfolder_name_stage, #TODO take this out again? - self.#find runfolder name in DNAnexus project + config.TSO500_runfolder_name_stage, + self.runfolder_subdir, config.TSO500_analysis_options_stage, TSO500_analysis_options, instance_type,