From b9c4a3ea039bdb7a27e736d06552e83844de12c8 Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Thu, 20 Jul 2023 14:43:37 +0200 Subject: [PATCH 1/6] Remove meta shallow clones() and replace with subMap deep clones to preserve metamaps across channels --- CHANGELOG.md | 1 + subworkflows/local/profiling.nf | 22 ++++++++++++++-------- subworkflows/local/shortread_fastp.nf | 3 +-- workflows/taxprofiler.nf | 3 +-- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 216b8605..0d6f6273 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#304](https://github.com/nf-core/taxprofiler/pull/304) Correct mistake in kaiju2table documentation, only single rank can be supplied (♥ to @artur-matysik for reporting, fix by @jfy133) - [#307](https://github.com/nf-core/taxprofiler/pull/307) Fix databases being sometimes associated with the wrong tool (e.g. Kaiju) (fix by @jfy133) - [#313](https://github.com/nf-core/taxprofiler/pull/304) Fix pipeline not providing error when database sheet does not have a header (♥ to @noah472 for reporting, fix by @jfy133) +- Improved meta map stability for more robust pipeline resuming ### `Dependencies` diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index de11bf47..7b53281b 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -75,16 +75,21 @@ workflow PROFILING { // as we don't run run on a per-sample basis due to huge datbaases // so all samples are in one run and so sample-specific metadata // unnecessary. Set as database name to prevent `null` job ID and prefix. - def temp_meta = [ id: meta.db_name ] + def temp_meta = [ id: db_meta.db_name ] // Extend database parameters to specify whether to save alignments or not - def new_db_meta = db_meta.clone() def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : "" + + def db_meta_keys = db_meta.keySet() + def new_db_meta = db_meta.subMap(db_meta_keys) new_db_meta.db_params = db_meta.db_params + sam_format // Combine reduced sample metadata with updated database parameters metadata, // make sure id is db_name for publishing purposes. def new_meta = temp_meta + new_db_meta + + println(new_db_meta) + new_meta.id = new_meta.db_name [ new_meta, reads, db ] @@ -106,9 +111,8 @@ workflow PROFILING { // re-extract meta from file names, use filename without rma to // ensure we keep paired-end information in downstream filenames // when no pair-merging - def meta_new = meta.clone() - meta_new['db_name'] = meta.id - meta_new['id'] = rma.baseName + def meta_new = meta + [db_name: meta.id, id: rma.baseName] + [ meta_new, rma ] } @@ -127,9 +131,10 @@ workflow PROFILING { ch_input_for_kraken2 = ch_input_for_profiling.kraken2 .map { meta, reads, db_meta, db -> - def db_meta_new = db_meta.clone() + def db_meta_keys = db_meta.keySet() + def db_meta_new = db_meta.subMap(db_meta_keys) - // Only take second element if one exists + // Only take first element if one exists def parsed_params = db_meta_new['db_params'].split(";") if ( parsed_params.size() == 2 ) { db_meta_new['db_params'] = parsed_params[0] @@ -186,7 +191,8 @@ workflow PROFILING { .map { key, meta, reads, db_meta, db -> - def db_meta_new = db_meta.clone() + def db_meta_keys = db_meta.keySet() + def db_meta_new = db_meta.subMap(db_meta_keys) // Have to pick second element if using bracken, as first element // contains kraken parameters diff --git a/subworkflows/local/shortread_fastp.nf b/subworkflows/local/shortread_fastp.nf index cac5a27a..d92208a5 100644 --- a/subworkflows/local/shortread_fastp.nf +++ b/subworkflows/local/shortread_fastp.nf @@ -28,8 +28,7 @@ workflow SHORTREAD_FASTP { ch_fastp_reads_prepped_pe = FASTP_PAIRED.out.reads_merged .map { meta, reads -> - def meta_new = meta.clone() - meta_new['single_end'] = true + def meta_new = meta + [single_end: true] [ meta_new, [ reads ].flatten() ] } diff --git a/workflows/taxprofiler.nf b/workflows/taxprofiler.nf index f964e740..59ef61f9 100644 --- a/workflows/taxprofiler.nf +++ b/workflows/taxprofiler.nf @@ -209,8 +209,7 @@ workflow TAXPROFILER { .mix( ch_longreads_hostremoved ) .map { meta, reads -> - def meta_new = meta.clone() - meta_new.remove('run_accession') + def meta_new = meta - meta.subMap('run_accession') [ meta_new, reads ] } .groupTuple() From 8f88f4f1b12491a4c6231903abc97c47864a96c9 Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Thu, 20 Jul 2023 14:45:53 +0200 Subject: [PATCH 2/6] Update CHAGNELGO --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d6f6273..d0762e64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [#304](https://github.com/nf-core/taxprofiler/pull/304) Correct mistake in kaiju2table documentation, only single rank can be supplied (♥ to @artur-matysik for reporting, fix by @jfy133) - [#307](https://github.com/nf-core/taxprofiler/pull/307) Fix databases being sometimes associated with the wrong tool (e.g. Kaiju) (fix by @jfy133) - [#313](https://github.com/nf-core/taxprofiler/pull/304) Fix pipeline not providing error when database sheet does not have a header (♥ to @noah472 for reporting, fix by @jfy133) -- Improved meta map stability for more robust pipeline resuming +- Improved meta map stability for more robust pipeline resuming (fix by @jfy133) ### `Dependencies` From bb68cc60c03e7d182d7b1d6de02e32d7e3779a64 Mon Sep 17 00:00:00 2001 From: "James A. Fellows Yates" Date: Mon, 24 Jul 2023 13:31:25 +0200 Subject: [PATCH 3/6] Update subworkflows/local/profiling.nf --- subworkflows/local/profiling.nf | 1 - 1 file changed, 1 deletion(-) diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index 7b53281b..5982359d 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -88,7 +88,6 @@ workflow PROFILING { // make sure id is db_name for publishing purposes. def new_meta = temp_meta + new_db_meta - println(new_db_meta) new_meta.id = new_meta.db_name From e4041a9f3057a628d8aaef2bd8ce8e9e1e028876 Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Tue, 25 Jul 2023 10:28:01 +0200 Subject: [PATCH 4/6] Reduce number of unnecessary subMaps --- subworkflows/local/profiling.nf | 39 ++++++++++++++------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index 5982359d..067d0ecd 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -64,7 +64,6 @@ workflow PROFILING { if ( params.run_malt ) { - // MALT: We groupTuple to have all samples in one channel for MALT as database // loading takes a long time, so we only want to run it once per database ch_input_for_malt = ch_input_for_profiling.malt @@ -76,20 +75,11 @@ workflow PROFILING { // so all samples are in one run and so sample-specific metadata // unnecessary. Set as database name to prevent `null` job ID and prefix. def temp_meta = [ id: db_meta.db_name ] + def new_meta = db_meta + temp_meta // Extend database parameters to specify whether to save alignments or not def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : "" - - def db_meta_keys = db_meta.keySet() - def new_db_meta = db_meta.subMap(db_meta_keys) - new_db_meta.db_params = db_meta.db_params + sam_format - - // Combine reduced sample metadata with updated database parameters metadata, - // make sure id is db_name for publishing purposes. - def new_meta = temp_meta + new_db_meta - - - new_meta.id = new_meta.db_name + new_meta.db_params = db_meta.db_params + sam_format [ new_meta, reads, db ] @@ -130,17 +120,15 @@ workflow PROFILING { ch_input_for_kraken2 = ch_input_for_profiling.kraken2 .map { meta, reads, db_meta, db -> - def db_meta_keys = db_meta.keySet() - def db_meta_new = db_meta.subMap(db_meta_keys) // Only take first element if one exists - def parsed_params = db_meta_new['db_params'].split(";") + def parsed_params = db_meta['db_params'].split(";") if ( parsed_params.size() == 2 ) { - db_meta_new['db_params'] = parsed_params[0] + db_meta_new = db_meta + [db_params: parsed_params[0]] } else if ( parsed_params.size() == 0 ) { - db_meta_new['db_params'] = "" + db_meta_new = db_meta + [db_params: ""] } else { - db_meta_new['db_params'] = parsed_params[0] + db_meta_new = db_meta + [db_params: parsed_params[0]] } [ meta, reads, db_meta_new, db ] @@ -151,7 +139,7 @@ workflow PROFILING { db: it[3] } - KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db, params.kraken2_save_reads, params.kraken2_save_readclassifications ) + KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads.dump(tag: "kraken2_reads"), ch_input_for_kraken2.db, params.kraken2_save_reads, params.kraken2_save_readclassifications ) ch_multiqc_files = ch_multiqc_files.mix( KRAKEN2_KRAKEN2.out.report ) ch_versions = ch_versions.mix( KRAKEN2_KRAKEN2.out.versions.first() ) ch_raw_classifications = ch_raw_classifications.mix( KRAKEN2_KRAKEN2.out.classified_reads_assignment ) @@ -190,6 +178,10 @@ workflow PROFILING { .map { key, meta, reads, db_meta, db -> + + // // Have to make a completely fresh copy here as otherwise + // // was getting db_param loss due to upstream meta parsing at + // // kraken2 input channel manipulation step def db_meta_keys = db_meta.keySet() def db_meta_new = db_meta.subMap(db_meta_keys) @@ -198,11 +190,12 @@ workflow PROFILING { if ( db_meta.tool == 'bracken' ) { // Only take second element if one exists - def parsed_params = db_meta_new['db_params'].split(";") + def parsed_params = db_meta['db_params'].split(";") + if ( parsed_params.size() == 2 ) { - db_meta_new['db_params'] = parsed_params[1] + db_meta_new = db_meta + [ db_params: parsed_params[1] ] } else { - db_meta_new['db_params'] = "" + db_meta_new = db_meta + [ db_params: "" ] } } else { @@ -216,7 +209,7 @@ workflow PROFILING { db: db } - BRACKEN_BRACKEN(ch_input_for_bracken.report, ch_input_for_bracken.db) + BRACKEN_BRACKEN(ch_input_for_bracken.report.dump(tag: "bracken_report"), ch_input_for_bracken.db) ch_versions = ch_versions.mix(BRACKEN_BRACKEN.out.versions.first()) ch_raw_profiles = ch_raw_profiles.mix(BRACKEN_BRACKEN.out.reports) From 75ba693d8323253f33c8b624e555f8c735b3ffef Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Tue, 25 Jul 2023 10:29:10 +0200 Subject: [PATCH 5/6] Remove debugging dumps --- subworkflows/local/profiling.nf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index 067d0ecd..bb0c8f9a 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -139,7 +139,7 @@ workflow PROFILING { db: it[3] } - KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads.dump(tag: "kraken2_reads"), ch_input_for_kraken2.db, params.kraken2_save_reads, params.kraken2_save_readclassifications ) + KRAKEN2_KRAKEN2 ( ch_input_for_kraken2.reads, ch_input_for_kraken2.db, params.kraken2_save_reads, params.kraken2_save_readclassifications ) ch_multiqc_files = ch_multiqc_files.mix( KRAKEN2_KRAKEN2.out.report ) ch_versions = ch_versions.mix( KRAKEN2_KRAKEN2.out.versions.first() ) ch_raw_classifications = ch_raw_classifications.mix( KRAKEN2_KRAKEN2.out.classified_reads_assignment ) @@ -209,7 +209,7 @@ workflow PROFILING { db: db } - BRACKEN_BRACKEN(ch_input_for_bracken.report.dump(tag: "bracken_report"), ch_input_for_bracken.db) + BRACKEN_BRACKEN(ch_input_for_bracken.report, ch_input_for_bracken.db) ch_versions = ch_versions.mix(BRACKEN_BRACKEN.out.versions.first()) ch_raw_profiles = ch_raw_profiles.mix(BRACKEN_BRACKEN.out.reports) From 91fc6f74dcafbb00da6c2218b9b862daa15ecdae Mon Sep 17 00:00:00 2001 From: James Fellows Yates Date: Tue, 25 Jul 2023 11:45:15 +0200 Subject: [PATCH 6/6] After comments from MAxime --- subworkflows/local/profiling.nf | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/subworkflows/local/profiling.nf b/subworkflows/local/profiling.nf index bb0c8f9a..9e0f0165 100644 --- a/subworkflows/local/profiling.nf +++ b/subworkflows/local/profiling.nf @@ -74,8 +74,7 @@ workflow PROFILING { // as we don't run run on a per-sample basis due to huge datbaases // so all samples are in one run and so sample-specific metadata // unnecessary. Set as database name to prevent `null` job ID and prefix. - def temp_meta = [ id: db_meta.db_name ] - def new_meta = db_meta + temp_meta + def new_meta = db_meta + [ id: db_meta.db_name ] // Extend database parameters to specify whether to save alignments or not def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : ""