Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove meta clone everywhere for improved resume stability #332

Merged
merged 9 commits into from
Jul 27, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#313](https://github.com/nf-core/taxprofiler/pull/304) Fix pipeline not providing error when database sheet does not have a header (♥ to @noah472 for reporting, fix by @jfy133)
- [#330](https://github.com/nf-core/taxprofiler/pull/330) Added better tagging to allow disambiguation of Kraken2 steps of Kraken2 vs Bracken (♥ to @MajoroMask for requesting, added by @jfy133)
- [#334](https://github.com/nf-core/taxprofiler/pull/334) Increase the memory of the FALCO process to 4GB (fix by @LilyAnderssonLee)
- Improved meta map stability for more robust pipeline resuming (fix by @jfy133)

### `Dependencies`

Expand Down
42 changes: 20 additions & 22 deletions subworkflows/local/profiling.nf
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ workflow PROFILING {

if ( params.run_malt ) {


// MALT: We groupTuple to have all samples in one channel for MALT as database
// loading takes a long time, so we only want to run it once per database
ch_input_for_malt = ch_input_for_profiling.malt
Expand All @@ -75,17 +74,12 @@ workflow PROFILING {
// as we don't run run on a per-sample basis due to huge datbaases
// so all samples are in one run and so sample-specific metadata
// unnecessary. Set as database name to prevent `null` job ID and prefix.
def temp_meta = [ id: meta.db_name ]
def temp_meta = [ id: db_meta.db_name ]
def new_meta = db_meta + temp_meta
jfy133 marked this conversation as resolved.
Show resolved Hide resolved

// Extend database parameters to specify whether to save alignments or not
def new_db_meta = db_meta.clone()
def sam_format = params.malt_save_reads ? ' --alignments ./ -za false' : ""
new_db_meta.db_params = db_meta.db_params + sam_format

// Combine reduced sample metadata with updated database parameters metadata,
// make sure id is db_name for publishing purposes.
def new_meta = temp_meta + new_db_meta
new_meta.id = new_meta.db_name
new_meta.db_params = db_meta.db_params + sam_format

[ new_meta, reads, db ]

Expand All @@ -106,9 +100,8 @@ workflow PROFILING {
// re-extract meta from file names, use filename without rma to
// ensure we keep paired-end information in downstream filenames
// when no pair-merging
def meta_new = meta.clone()
meta_new['db_name'] = meta.id
meta_new['id'] = rma.baseName
def meta_new = meta + [db_name: meta.id, id: rma.baseName]

[ meta_new, rma ]
jfy133 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -127,16 +120,15 @@ workflow PROFILING {
ch_input_for_kraken2 = ch_input_for_profiling.kraken2
.map {
meta, reads, db_meta, db ->
def db_meta_new = db_meta.clone()

// Only take second element if one exists
def parsed_params = db_meta_new['db_params'].split(";")
// Only take first element if one exists
def parsed_params = db_meta['db_params'].split(";")
if ( parsed_params.size() == 2 ) {
db_meta_new['db_params'] = parsed_params[0]
db_meta_new = db_meta + [db_params: parsed_params[0]]
} else if ( parsed_params.size() == 0 ) {
db_meta_new['db_params'] = ""
db_meta_new = db_meta + [db_params: ""]
} else {
db_meta_new['db_params'] = parsed_params[0]
db_meta_new = db_meta + [db_params: parsed_params[0]]
}

[ meta, reads, db_meta_new, db ]
Expand Down Expand Up @@ -186,18 +178,24 @@ workflow PROFILING {
.map {

key, meta, reads, db_meta, db ->
def db_meta_new = db_meta.clone()

// // Have to make a completely fresh copy here as otherwise
// // was getting db_param loss due to upstream meta parsing at
// // kraken2 input channel manipulation step
def db_meta_keys = db_meta.keySet()
def db_meta_new = db_meta.subMap(db_meta_keys)

// Have to pick second element if using bracken, as first element
// contains kraken parameters
if ( db_meta.tool == 'bracken' ) {

// Only take second element if one exists
def parsed_params = db_meta_new['db_params'].split(";")
def parsed_params = db_meta['db_params'].split(";")

if ( parsed_params.size() == 2 ) {
db_meta_new['db_params'] = parsed_params[1]
db_meta_new = db_meta + [ db_params: parsed_params[1] ]
} else {
db_meta_new['db_params'] = ""
db_meta_new = db_meta + [ db_params: "" ]
}

} else {
Expand Down
3 changes: 1 addition & 2 deletions subworkflows/local/shortread_fastp.nf
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ workflow SHORTREAD_FASTP {
ch_fastp_reads_prepped_pe = FASTP_PAIRED.out.reads_merged
.map {
meta, reads ->
def meta_new = meta.clone()
meta_new['single_end'] = true
def meta_new = meta + [single_end: true]
[ meta_new, [ reads ].flatten() ]
}

Expand Down
3 changes: 1 addition & 2 deletions workflows/taxprofiler.nf
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ workflow TAXPROFILER {
.mix( ch_longreads_hostremoved )
.map {
meta, reads ->
def meta_new = meta.clone()
meta_new.remove('run_accession')
def meta_new = meta - meta.subMap('run_accession')
[ meta_new, reads ]
}
.groupTuple()
Expand Down
Loading