Skip to content

Commit

Permalink
Merge pull request #11 from apriltuesday/EVA-3330
Browse files Browse the repository at this point in the history
EVA-3330: Migrate Nextflow for running on SLURM
  • Loading branch information
apriltuesday authored Jun 17, 2024
2 parents ff8384c + 71a12a4 commit e0e355a
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ jobs:
- name: Test nextflow workflow
run: |
# Run nextflow tests
export NXF_DEFAULT_DSL=1
export NXF_DEFAULT_DSL=2
tests/nextflow-tests/run_tests.sh
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Example usage:
# Run everything
add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5

# Run remapping and clustering only, resume and run on a specific instance
add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 --tasks remap_cluster --instance 3 --resume
# Run remapping and clustering only, resume
add_target_assembly.py --taxonomy 9031 --target_assembly GCA_016699485.1 --release_version 5 --tasks remap_cluster --resume
```

### Custom assembly generation
Expand Down
5 changes: 1 addition & 4 deletions bin/add_target_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from argparse import ArgumentParser, ArgumentError
from argparse import ArgumentParser

from ebi_eva_common_pyutils.logger import logging_config

Expand All @@ -32,8 +32,6 @@ def main():
help='Task or set of tasks to perform (defaults to all)')
argparse.add_argument('--release_version', required=True, type=int,
help='Release version this assembly will be processed for')
argparse.add_argument('--instance', help="Accessioning instance id for clustering", required=False, default=6,
type=int, choices=range(1, 13))
argparse.add_argument('--resume', help='If a process has been run already this will resume it.',
action='store_true', default=False)
args = argparse.parse_args()
Expand All @@ -45,7 +43,6 @@ def main():

job.run_all(
tasks=args.tasks,
instance=args.instance,
source_of_assembly=args.source_of_assembly,
resume=args.resume
)
Expand Down
4 changes: 2 additions & 2 deletions bin/genome_target_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import requests
from ebi_eva_common_pyutils.logger import logging_config
from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle, insert_new_assembly_and_taxonomy
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query
from ebi_eva_common_pyutils.taxonomy.taxonomy import get_scientific_name_from_ensembl
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle, insert_new_assembly_and_taxonomy
from ebi_eva_internal_pyutils.pg_utils import get_all_results_for_query

logger = logging_config.get_logger(__name__)
logging_config.add_stdout_handler()
Expand Down
30 changes: 15 additions & 15 deletions eva_assembly_ingestion/assembly_ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@
from ebi_eva_common_pyutils.command_utils import run_command_with_output
from ebi_eva_common_pyutils.common_utils import pretty_print
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.config_utils import get_contig_alias_db_creds_for_profile
from ebi_eva_common_pyutils.contig_alias.contig_alias import ContigAliasClient
from ebi_eva_common_pyutils.logger import AppLogger
from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle, insert_new_assembly_and_taxonomy, \
add_to_supported_assemblies
from ebi_eva_common_pyutils.pg_utils import execute_query, get_all_results_for_query
from ebi_eva_common_pyutils.spring_properties import SpringPropertiesGenerator
from ebi_eva_common_pyutils.taxonomy.taxonomy import get_scientific_name_from_taxonomy
from ebi_eva_internal_pyutils.config_utils import get_contig_alias_db_creds_for_profile
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle, insert_new_assembly_and_taxonomy, \
add_to_supported_assemblies
from ebi_eva_internal_pyutils.pg_utils import execute_query, get_all_results_for_query
from ebi_eva_internal_pyutils.spring_properties import SpringPropertiesGenerator
from psycopg2.extras import execute_values

from eva_assembly_ingestion.config import get_nextflow_config_flag
from eva_assembly_ingestion.parse_counts import count_variants_extracted, count_variants_remapped, \
count_variants_ingested

SUPPORTED_ASSEMBLY_TRACKER_TABLE = "evapro.supported_assembly_tracker"


class AssemblyIngestionJob(AppLogger):
all_tasks = ['load_tracker', 'remap_cluster', 'update_dbs']
tracking_table = 'eva_progress_tracker.remapping_tracker'
Expand Down Expand Up @@ -67,11 +69,11 @@ def taxonomies(self):
taxonomy_list = [self.source_taxonomy]
return taxonomy_list

def run_all(self, tasks, instance, source_of_assembly, resume):
def run_all(self, tasks, source_of_assembly, resume):
if 'load_tracker' in tasks:
self.load_tracker()
if 'remap_cluster' in tasks:
self.run_remapping_and_clustering(instance, resume)
self.run_remapping_and_clustering(resume)
if 'update_dbs' in tasks:
self.update_dbs(source_of_assembly)

Expand Down Expand Up @@ -151,14 +153,14 @@ def get_source_assemblies_and_num_studies_dbsnp(self):
)
return get_all_results_for_query(pg_conn, query)

def run_remapping_and_clustering(self, instance, resume):
def run_remapping_and_clustering(self, resume):
"""Run remapping and clustering for all source assemblies in the tracker marked as not Complete, resuming
the nextflow process if specified. (Note that this will also resume or rerun anything marked as Failed.)"""
source_assemblies_and_taxonomies = self.get_incomplete_assemblies_and_taxonomies()
for source_assembly, taxonomy_list in source_assemblies_and_taxonomies:
self.info(f'Running remapping and clustering for the following assemblies: {source_assembly} '
f'for taxonomy {", ".join([str(t) for t in taxonomy_list])}')
self.process_one_assembly(source_assembly, taxonomy_list, instance, resume)
self.process_one_assembly(source_assembly, taxonomy_list, resume)

def get_incomplete_assemblies_and_taxonomies(self):
incomplete_assemblies = []
Expand All @@ -170,7 +172,7 @@ def get_incomplete_assemblies_and_taxonomies(self):
incomplete_assemblies.append((source_assembly, taxonomies))
return incomplete_assemblies

def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume):
def process_one_assembly(self, source_assembly, taxonomy_list, resume):
self.set_status_start(source_assembly, taxonomy_list)
base_directory = cfg['remapping']['base_directory']
nextflow_pipeline = os.path.join(os.path.dirname(__file__), 'nextflow', 'remap_cluster.nf')
Expand All @@ -188,7 +190,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume
)
clustering_template_file = self.create_clustering_properties(
output_file_path=os.path.join(assembly_directory, 'clustering_template.properties'),
instance=instance,
source_assembly=source_assembly
)

Expand All @@ -207,7 +208,6 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume
'extraction_properties': extraction_properties_file,
'ingestion_properties': ingestion_properties_file,
'clustering_properties': clustering_template_file,
'clustering_instance': instance,
'remapping_config': cfg.config_file,
'remapping_required': remapping_required
}
Expand All @@ -221,7 +221,8 @@ def process_one_assembly(self, source_assembly, taxonomy_list, instance, resume
'-log', remapping_log,
'run', nextflow_pipeline,
'-params-file', remap_cluster_config_file,
'-work-dir', work_dir
'-work-dir', work_dir,
get_nextflow_config_flag()
]
if resume:
command.append('-resume')
Expand Down Expand Up @@ -261,9 +262,8 @@ def create_ingestion_properties(self, output_file_path, source_assembly):
open_file.write(properties)
return output_file_path

def create_clustering_properties(self, output_file_path, instance, source_assembly):
def create_clustering_properties(self, output_file_path, source_assembly):
properties = self.properties_generator.get_clustering_properties(
instance=instance,
source_assembly=source_assembly,
target_assembly=self.target_assembly,
rs_report_path=f'{source_assembly}_to_{self.target_assembly}_rs_report.txt'
Expand Down
12 changes: 12 additions & 0 deletions eva_assembly_ingestion/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ def load_config(*args):
os.getenv('ASSEMBLYCONFIG'),
os.path.expanduser('~/.assembly_config.yml'),
)


def get_nextflow_config_flag():
"""
Return the commandline flag for Nextflow to use the config provided in environment variable ASSEMBLY_NEXTFLOW_CONFIG.
If not provided, return an empty string, which allows Nextflow to use the default precedence as described here:
https://www.nextflow.io/docs/latest/config.html
"""
env_val = os.getenv('ASSEMBLY_NEXTFLOW_CONFIG')
if env_val:
return f'-c {env_val}'
return ''
4 changes: 2 additions & 2 deletions eva_assembly_ingestion/custom_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from cached_property import cached_property
from ebi_eva_common_pyutils.config import cfg
from ebi_eva_common_pyutils.logger import AppLogger
from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_common_pyutils.pg_utils import get_all_results_for_query
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_internal_pyutils.pg_utils import get_all_results_for_query
from retry import retry


Expand Down
53 changes: 29 additions & 24 deletions eva_assembly_ingestion/nextflow/remap_cluster.nf
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@ def helpMessage() {
--extraction_properties path to extraction properties file
--ingestion_properties path to ingestion properties file
--clustering_properties path to clustering properties file
--clustering_instance instance id to use for clustering
--output_dir path to the directory where the output file should be copied.
--remapping_config path to the remapping configuration file
--remapping_required flag that sets the remapping as required if true otherwise the remapping is skipped and only the clustering can be run
--memory memory in GB to use for memory-hungry processes (e.g. Java), default 8GB
"""
}

params.source_assembly_accession = null
params.target_assembly_accession = null
params.species_name = null
params.memory = 8
// help
params.help = null

Expand All @@ -52,6 +49,8 @@ empty_ch = params.remapping_required ? Channel.empty() : Channel.of(params.genom


process retrieve_source_genome {
label 'short_time', 'med_mem'

when:
source_assembly_accession != params.target_assembly_accession

Expand All @@ -72,6 +71,7 @@ process retrieve_source_genome {


process retrieve_target_genome {
label 'short_time', 'med_mem'

input:
val target_assembly_accession
Expand All @@ -89,6 +89,7 @@ process retrieve_target_genome {
}

process update_source_genome {
label 'short_time', 'med_mem'

input:
val(source_assembly_accession)
Expand All @@ -106,6 +107,7 @@ process update_source_genome {
}

process update_target_genome {
label 'short_time', 'med_mem'

input:
path target_fasta
Expand All @@ -126,8 +128,7 @@ process update_target_genome {
* Extract the submitted variants to remap from the accessioning warehouse and store them in a VCF file.
*/
process extract_vcf_from_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path source_fasta
Expand All @@ -142,7 +143,7 @@ process extract_vcf_from_mongo {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.vcf_extractor \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_extractor \
--spring.config.location=file:${params.extraction_properties} \
--parameters.fasta=${source_fasta} \
--parameters.assemblyReportUrl=file:${source_report} \
Expand All @@ -156,7 +157,7 @@ process extract_vcf_from_mongo {
* Variant remapping pipeline
*/
process remap_variants {
memory "${params.memory}GB"
label 'long_time', 'med_mem'

input:
each path(source_vcf)
Expand All @@ -181,12 +182,22 @@ process remap_variants {
done
PATH=`pwd`/bin:\$PATH
source $params.executable.python_activate
# Set nextflow config if needed - has to be passed via commandline arg rather than env var
if [[ -z "\${ASSEMBLY_NEXTFLOW_CONFIG:-}" ]]
then
nextflow_config_flag=""
else
nextflow_config_flag="-c \${ASSEMBLY_NEXTFLOW_CONFIG}"
fi
# Nextflow needs the full path to the input parameters hence the pwd
$params.executable.nextflow run $params.nextflow.remapping -resume \
--oldgenome `pwd`/${source_fasta} \
--newgenome `pwd`/${target_fasta} \
--vcffile `pwd`/${source_vcf} \
--outfile `pwd`/${basename_source_vcf}_remapped.vcf
--outfile `pwd`/${basename_source_vcf}_remapped.vcf \
\${nextflow_config_flag}
"""
}

Expand All @@ -195,8 +206,7 @@ process remap_variants {
* Ingest the remapped submitted variants from a VCF file into the accessioning warehouse.
*/
process ingest_vcf_into_mongo {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
each path(remapped_vcf)
Expand All @@ -217,7 +227,7 @@ process ingest_vcf_into_mongo {
loadTo=DBSNP
fi
java -Xmx8G -jar $params.jar.vcf_ingestion \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.vcf_ingestion \
--spring.config.location=file:${params.ingestion_properties} \
--parameters.vcf=${remapped_vcf} \
--parameters.assemblyReportUrl=file:${target_report} \
Expand All @@ -227,31 +237,28 @@ process ingest_vcf_into_mongo {
}

process process_remapped_variants {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path ingestion_log
val source_to_target

output:
path "${source_to_target}_process_remapped.log", emit: process_remapped_log_filename
// TODO this also generates a rs report, for "newly remapped" rs - should we QC this separately?
path "${source_to_target}_rs_report.txt", optional: true, emit: rs_report_filename

publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=PROCESS_REMAPPED_VARIANTS_WITH_RS_JOB \
> ${source_to_target}_process_remapped.log
"""
}

process cluster_unclustered_variants {
memory "${params.memory}GB"
clusterOptions "-g /accession/instance-${params.clustering_instance}"
label 'long_time', 'med_mem'

input:
path process_remapped_log
Expand All @@ -264,7 +271,7 @@ process cluster_unclustered_variants {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=CLUSTER_UNCLUSTERED_VARIANTS_JOB \
> ${source_to_target}_clustering.log
Expand All @@ -275,8 +282,7 @@ process cluster_unclustered_variants {
* Run clustering QC job
*/
process qc_clustering {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path rs_report
Expand All @@ -288,7 +294,7 @@ process qc_clustering {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--spring.batch.job.names=NEW_CLUSTERED_VARIANTS_QC_JOB \
> ${source_to_target}_clustering_qc.log
Expand All @@ -300,8 +306,7 @@ process qc_clustering {
* Run Back propagation of new clustered RS
*/
process backpropagate_clusters {
memory "${params.memory}GB"
clusterOptions "-g /accession"
label 'long_time', 'med_mem'

input:
path "clustering_qc.log"
Expand All @@ -312,7 +317,7 @@ process backpropagate_clusters {
publishDir "$params.output_dir/logs", overwrite: true, mode: "copy", pattern: "*.log*"

"""
java -Xmx8G -jar $params.jar.clustering \
java -Xmx${task.memory.toGiga()-1}G -jar $params.jar.clustering \
--spring.config.location=file:${params.clustering_properties} \
--parameters.remappedFrom=${params.source_assembly_accession} \
--spring.batch.job.names=BACK_PROPAGATE_SPLIT_OR_MERGED_RS_JOB \
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ebi-eva-common-pyutils==0.5.5
ebi-eva-common-pyutils[eva-internal]==0.6.7
pyyaml
requests
retry
Loading

0 comments on commit e0e355a

Please sign in to comment.