Skip to content

Commit

Permalink
changes to split up TSO runs, use latest TSO app, latest duty_csv app
Browse files Browse the repository at this point in the history
  • Loading branch information
mokaguys committed Oct 18, 2023
1 parent 3772ed0 commit b5bc093
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 64 deletions.
22 changes: 14 additions & 8 deletions automate_demultiplex_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
"999999_A01229_0182_AHM2TSO500",
]

# TSO500 batch size (for splitting samplesheet)
if testing:
batch_size = 2
else:
batch_size = 16

# path to log file which records the output of the upload agent
upload_and_setoff_workflow_logfile = (
"{document_root}/automate_demultiplexing_logfiles/upload_agent_script_logfiles/"
Expand Down Expand Up @@ -134,7 +140,7 @@
# MokaSNP ID
mokasnp_pipeline_ID = "5091"
# TSO500 pipeline ID
TSO_pipeline_ID = "5237"
TSO_pipeline_ID = "5288" #TSO v1.6

# -- Moka WES test status--
# Test Status = NextSEQ sequencing
Expand Down Expand Up @@ -170,8 +176,8 @@
congenica_app_path = "Apps/congenica_upload_v1.3.2"
congenica_SFTP_upload_app = "applet-GFfJpj80jy1x1Bz1P1Bk3vQf"

# TSO500 app TODO update to new version of app v1.6.0
tso500_app = "applet-GPgkz0j0jy1Yf4XxkXjVgKfv" # Apps/TSO500_v1.5.1
# TSO500 app
tso500_app = "applet-GZgv0Jj0jy1Yfbx3QvqyKjzp" # Apps/TSO500_v1.6.0
tso500_app_name = "TSO500_v1.6.0"
tso500_docker_image = (
"project-ByfFPz00jy1fk6PjpZ95F27J:file-Fz9Zyx00b5j8xKVkKv4fZ6JB"
Expand Down Expand Up @@ -1256,7 +1262,7 @@
},
"Pan4969": { # TSO500 no UTRs. TERT promoter
"TSO500": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand All @@ -1265,7 +1271,7 @@
"Pan5085": { # TSO500 High throughput Synnovis. no UTRs. TERT promoter
"TSO500": True,
"TSO500_high_throughput": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand All @@ -1274,7 +1280,7 @@
"Pan5112": { # TSO500 High throughput BSPS. no UTRs. TERT promoter
"TSO500": True,
"TSO500_high_throughput": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand All @@ -1284,7 +1290,7 @@
"Pan5114": { # TSO500 High throughput Control. no UTRs. TERT promoter
"TSO500": True,
"TSO500_high_throughput": True,
"sambamba_bedfile": "Pan5130dataSambamba.bed",
"sambamba_bedfile": "Pan5205dataSambamba.bed",
"clinical_coverage_depth": 100,
"multiqc_coverage_level": 100,
"coverage_min_basecall_qual": 25,
Expand Down Expand Up @@ -1765,7 +1771,7 @@
}

duty_csv_id = (
"project-ByfFPz00jy1fk6PjpZ95F27J:applet-GQg9J280jy1Zf79KGx9gk5K3"
"project-ByfFPz00jy1fk6PjpZ95F27J:applet-GZYx3Kj0kKj3YBV7qgK6VjXQ"
)
duty_csv_inputs = {
# tso_pannumbers should not include the dry lab pan number
Expand Down
150 changes: 94 additions & 56 deletions upload_and_setoff_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def __init__(self, runfolder):
+ self.runfolder_name
+ "_congenica_upload_commands.sh"
)
self.TSO500_post_run_command_script = (
config.DNA_Nexus_workflow_logfolder
+ self.runfolder_name
+ "_TSO_post_run_commands.sh"
)
#TODO copy lines above to create separate dx run commands output script for TSO (to be run by duty binfx)
self.nexus_project_name = ""
self.nexus_path = ""
Expand Down Expand Up @@ -340,7 +345,7 @@ def quarterback(self):
if TSO500_sample_list:
self.list_of_processed_samples, self.fastq_string = (
TSO500_sample_list,
self.runfolder_obj.runfolder_samplesheet_path, #TODO this sets the fastq_string to be the samplesheet path
self.runfolder_obj.runfolder_samplesheet_path, #this sets the fastq_string to be the samplesheet path
)

else:
Expand Down Expand Up @@ -376,7 +381,7 @@ def quarterback(self):
# pass path to function which checks files were uploaded without error
if TSO500_sample_list:
# split TSO samplesheet to multiple sheets with <=16 samples/sheet
self.TSO500_samplesheets_list = self.split_tso500_sampleheet():
self.TSO500_samplesheets_list = self.split_TSO500_sampleheet()
backup_attempt_count = 1
while backup_attempt_count < 5:
self.loggers.script.info(
Expand All @@ -393,8 +398,6 @@ def quarterback(self):
backup_attempt_count += 1

#upload fastqs. if TSO500 run, this uploads the samplesheet to the project root
#TODO make this an else for the above if TSO500_sample_list. then split and upload TSO samplesheets separately.
# TODO check whether upload_fastqs() output or related variables are used elsewhere
self.look_for_upload_errors(self.upload_fastqs())

# upload cluster density files and check upload was successful.
Expand Down Expand Up @@ -634,15 +637,16 @@ def check_for_TSO500(self):
open(self.loggers.upload_agent.filepath, "w").close()
return sample_list

def split_tso500_sampleheet(self):
def split_TSO500_sampleheet(self):
"""
take TSO500 samplesheet and split in to parts with <=16 samples/sheet
write samplesheets to runfolder
return list of samplesheet paths? or just names (if they're saved in the runfolder,
they'll be uploaded to DNAnexus, can access from there for dx run cmds)
"""
# samplesheet in the runfolder
samplesheet_file = self.runfolder_samplesheet_name
samplesheet_file = os.path.join(self.runfolder_obj.runfolderpath, self.runfolder_obj.runfolder_samplesheet_name)

# Read all lines from the sample sheet
with open(samplesheet_file) as samplesheet:
all_lines = samplesheet.readlines()
Expand All @@ -652,20 +656,20 @@ def split_tso500_sampleheet(self):
# sample lines start with "TSO". This excludes empty lines below the samples list, i.e. lines containing ",,,,,,,"
samples = [sample for sample in all_lines[25:] if sample.startswith("TSO")]

# Split samples into batches of 16
batches = [samples[i:i + 16] for i in range(0, len(samples), 16)]
# Split samples into batches (size specified in config)
batches = [samples[i:i + config.batch_size] for i in range(0, len(samples), config.batch_size)]

# Write batches to separate files named "PartXofY", and add samplesheet to list
samplesheet_list = []
number_of_batches = len(batches)
samplesheet_base_name = samplesheet_file.split(".csv")[0]
for samplesheet_count, batch in enumerate(batches, start=1):
#capture samplesheet file path to write samplesheet paths to the runfolder
samplesheet_filename = "%sPart%sof%s.csv" % (samplesheet_base_name,samplesheet_count,number_of_batches)
samplesheet_filepath = "%sPart%sof%s.csv" % (samplesheet_base_name,samplesheet_count,number_of_batches)
# capture samplesheet name to write to list- use runfolder name
samplesheet_name = "%s_SampleSheetPart%sof%s.csv" % (self.runfolder_obj.runfolder_name,samplesheet_count,number_of_batches)
samplesheet_list.append(samplesheet_name)
with open(samplesheet_filename, "a") as new_samplesheet:
with open(samplesheet_filepath, "a") as new_samplesheet:
new_samplesheet.writelines(samplesheet_header)
new_samplesheet.writelines(batch)

Expand Down Expand Up @@ -1608,51 +1612,23 @@ def start_building_dx_run_cmds(self):
for cmd in self.determine_exome_depth_requirements(pannnumber_list):
commands_list.append(cmd)

# write TSO commands if a TSO run.
if TSO500:
commands_list.append("#The TSOapp is set off first. This utilises the --wait flag, so the bash script waits until this job finishes before running the coverage, hap.py and fastqc commands using the samplesheet to determine expected files and their locations ")
commands_list.append("#All jobs apart from control samples are added to the depends on list used to delay multiqc")
# build command for the TSO500 app and set off fastqc commands
# TODO add for loop here to loop through samplesheets and write command for each
commands_list.append("#The TSOapp is set off once for each samplesheet made")
commands_list.append("#Other jobs must be set off manually by running the file once the pipeline has finished")
# build commands for the TSO500 app and set off fastqc commands (need a command per samplesheet)
for samplesheet in self.TSO500_samplesheets_list:
commands_list.append(self.create_tso500_command(samplesheet))

# TODO modify this to handle creating separate file for TSO commands
commands_list.append(self.add_to_depends_list("TSO500", 'depends_list'))
self.build_TSO500_post_run_commands()

# TSO500 multiqc commands are written to a separate file with a function called above
if not TSO500:
commands_list.append(self.create_multiqc_command())
commands_list.append(self.add_to_depends_list("MultiQC", 'depends_list'))
commands_list.append(self.create_upload_multiqc_command(TSO500))
commands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list'))

# For TSO samples, the fastqs are created within DNAnexus and the
# commands are generated using sample names parsed from the
# samplesheet. If for whatever reason those fastqs are not created
# by the DNAnexus app, the downstream job will not set off and
# therefore will produce no job ID to provide to the depends_list,
# which will create an error/ slack alert. To solve this problem,
# the job ID is only added to the depends list if it exits
for sample in self.list_of_processed_samples:
pannumber = re.search(r"Pan\d+", sample).group()
commands_list.append(
self.create_fastqc_command(sample)
)
# Only add to depends_list if job ID from previous command
# is not empty
commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))

commands_list.append(self.create_sambamba_cmd(sample, pannumber))
# Exclude negative controls from the depends list as the NTC
# coverage calculation can often fail. We want the coverage
# report for the NTC sample to help assess contamination.
# Only add to depends_list if job ID from previous command
# is not empty
commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))

if "HD200" in sample:
commands_list.append(self.create_sompy_cmd(sample, pannumber))
# Only add to depends_list if job ID from previous command
# is not empty
commands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list("sompy", 'depends_list'))

commands_list.append(self.create_multiqc_command())
commands_list.append(self.add_to_depends_list("MultiQC", 'depends_list'))
commands_list.append(self.create_upload_multiqc_command(TSO500))
commands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list'))
# setoff the below commands later as they are not depended upon by
# MultiQC but are required for duty_csv
if rpkm_list:
Expand All @@ -1664,10 +1640,73 @@ def start_building_dx_run_cmds(self):
commands_list.append(self.add_to_depends_list("rpkm", 'depends_list'))
commands_list.append(self.add_to_depends_list("depends", 'depends_list_recombined'))

commands_list.append(self.create_duty_csv_command())
if not TSO500:
commands_list.append(self.create_duty_csv_command())

return commands_list

def build_TSO500_post_run_commands(self):
"""
Function to build TSO500 commands to run after pipeline, i.e.
Fastqc, sambamba, sompy, multiqc, upload multiqc and duty_csv
Commands must be written to file _TSO_post_run_commands.sh
which can be run manually once pipeline done.
For TSO samples, the fastqs are created within DNAnexus and the
commands are generated using sample names parsed from the
samplesheet. If for whatever reason those fastqs are not created
by the DNAnexus app, the downstream job will not set off and
therefore will produce no job ID to provide to the depends_list,
which will create an error/ slack alert. To solve this problem,
the job ID is only added to the depends list if it exits
"""
# Update script log file to say what is being done.
self.loggers.script.info("Building dx run commands for TSO500 post pipeline processing")

# list to hold all commands.
TSO500 = True
TSOcommands_list = []
TSOcommands_list.append(self.source_command)
TSOcommands_list.append(self.empty_depends)
TSOcommands_list.append(self.empty_gatk_depends)

for sample in self.list_of_processed_samples:
pannumber = re.search(r"Pan\d+", sample).group()
TSOcommands_list.append(
self.create_fastqc_command(sample)
)
# Only add to depends_list if job ID from previous command
# is not empty
TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))

TSOcommands_list.append(self.create_sambamba_cmd(sample, pannumber))
# Exclude negative controls from the depends list as the NTC
# coverage calculation can often fail. We want the coverage
# report for the NTC sample to help assess contamination.
# Only add to depends_list if job ID from previous command
# is not empty
TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list(sample, 'depends_list'))

if "HD200" in sample:
TSOcommands_list.append(self.create_sompy_cmd(sample, pannumber))
# Only add to depends_list if job ID from previous command
# is not empty
TSOcommands_list.append(self.if_jobid_exists_depends % self.add_to_depends_list("sompy", 'depends_list'))

TSOcommands_list.append(self.create_upload_multiqc_command(TSO500))
TSOcommands_list.append(self.add_to_depends_list("UploadMultiQC", 'depends_list'))

TSOcommands_list.append(self.create_duty_csv_command())

with open(
self.runfolder_obj.TSO500_post_run_command_script, "w"
) as TSO500_commands:
# remove any None values from the command_list
TSO500_commands.writelines(
[line + "\n" for line in filter(None, TSOcommands_list)]
)

return TSOcommands_list

def determine_exome_depth_requirements(self,pannnumber_list):
"""
This function takes a list of all pan numbers found on this run.
Expand Down Expand Up @@ -1903,7 +1942,6 @@ def create_tso500_command(self,samplesheet):
## docker image (from config)
## runfolder_tar and samplesheet paths (from runfolder_obj class)
## analysis options eg --isNovaSeq flag
# TODO modify for new way of setting off app. WAIT removed
dx_command_list = [
self.tso500_dx_command, # ends with --name so supply the runfolder name to name the job
self.runfolder_obj.runfolder_name,
Expand All @@ -1912,13 +1950,13 @@ def create_tso500_command(self,samplesheet):
config.TSO500_samplesheet_stage,
self.runfolder_obj.nexus_project_id
+ ":"
+ self.#TODO not sure if this will work...find runfolder name in DNAnexus project
+ self.runfolder_subdir
+ "/"
+ samplesheet
+ samplesheet,
config.TSO500_project_name_stage,
self.runfolder_obj.nexus_project_name,
config.TSO500_runfolder_name_stage, #TODO take this out again?
self.#find runfolder name in DNAnexus project
config.TSO500_runfolder_name_stage,
self.runfolder_subdir,
config.TSO500_analysis_options_stage,
TSO500_analysis_options,
instance_type,
Expand Down

0 comments on commit b5bc093

Please sign in to comment.