diff --git a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py index 55677869..f37bfc16 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py @@ -50,6 +50,7 @@ def diamond_blastx( out: str, queries: Iterable[str], chunk=False, + diamond_args="", join_chunks=0, ): cmd = [ @@ -64,6 +65,7 @@ def diamond_blastx( database, "--out", out, + f"--{diamond_args}", ] for query in queries: cmd += ["--query", query] @@ -132,7 +134,7 @@ def make_db(reference_fasta: str, output_dir: str, chunks: int): print(f"COMPLETED CHUNK {i}") -def blastx_chunk(db_chunk: str, output_dir: str, *query: str): +def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str): try: os.mkdir(output_dir) except OSError as e: @@ -150,6 +152,7 @@ def blastx_chunk(db_chunk: str, output_dir: str, *query: str): database=abspath(db_chunk), out="out.tsv", chunk=True, + diamond_args=diamond_args, queries=(abspath(q) for q in query), ) ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}" @@ -173,7 +176,7 @@ def mock_reference_fasta(chunks: int, chunk_size: int): i += 1 -def blastx_join(chunk_dir: str, out: str, *query: str): +def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str): with TemporaryDirectory() as tmp_dir: make_par_dir(tmp_dir, "par-tmp") with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f: @@ -194,6 +197,7 @@ def blastx_join(chunk_dir: str, out: str, *query: str): database=db.name, out=out, join_chunks=chunks, + diamond_args=diamond_args, queries=(abspath(q) for q in query), ) @@ -216,11 +220,13 @@ def blastx_join(chunk_dir: str, out: str, *query: str): blastx_chunk_parser = subparsers.add_parser("blastx-chunk") blastx_chunk_parser.add_argument("--db", required=True) blastx_chunk_parser.add_argument("--out-dir", required=True) + blastx_chunk_parser.add_argument("--diamond-args", required=False) blastx_chunk_parser.add_argument("--query", required=True, action="append") blastx_chunks_parser = subparsers.add_parser("blastx-chunks") blastx_chunks_parser.add_argument("--db-dir", required=True) blastx_chunks_parser.add_argument("--out-dir", required=True) + blastx_chunks_parser.add_argument("--diamond-args", required=False) blastx_chunks_parser.add_argument("--query", required=True, action="append") blastx_join_parser = subparsers.add_parser("blastx-join") @@ -232,16 +238,16 @@ def blastx_join(chunk_dir: str, out: str, *query: str): if args.command == "make-db": make_db(args.__getattribute__("in"), args.db, args.chunks) elif args.command == "blastx-chunk": - blastx_chunk(args.db, args.out_dir, *args.query) + blastx_chunk(args.db, args.out_dir, args.diamond_args, *args.query) elif args.command == "blastx-chunks": def _blastx_chunk(db): print(f"STARTING: {db}") - res = blastx_chunk(join(args.db_dir, db), args.out_dir, *args.query) + res = blastx_chunk(join(args.db_dir, db), args.out_dir, args.diamond_args, *args.query) print(f"FINISHING: {db}") return res with Pool(48) as p: p.map(_blastx_chunk, os.listdir(args.db_dir)) elif args.command == "blastx-join": - blastx_join(args.chunk_dir, args.out, *args.query) + blastx_join(args.chunk_dir, args.out, args.diamond_args, *args.query) diff --git a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py index 46ec694f..47060191 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py @@ -1,5 +1,6 @@ import os import shutil +import shlex import sys import errno from argparse import ArgumentParser @@ -40,8 +41,9 @@ def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries): raise Exception(f"Command failed: {' '.join(cmd)}") -def minimap2_merge_cmd(cwd, par_tmpdir, chunks, queries): - cmd = ["minimap2", "-cx", "sr", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] +def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries): + print("Starting minimap2 merge", file=sys.stderr) + cmd = ["minimap2", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] for query in queries: cmd += [query] @@ -51,6 +53,8 @@ def minimap2_merge_cmd(cwd, par_tmpdir, chunks, queries): for chunk in chunks: cmd += [f"{chunk}"] + cmd.extend(shlex.split(minimap2_args)) + res = run(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE) if res.returncode != 0: for line in res.stderr.decode().split("\n"): @@ -120,7 +124,7 @@ def minimap2_chunk(db_chunk: str, output_dir: str, *query: str): ) -def minimap2_merge(chunk_dir, out, *query): +def minimap2_merge(chunk_dir, out, minimap2_args, *query): chunk_dir = abspath(chunk_dir) with TemporaryDirectory() as tmp_dir: make_par_dir(tmp_dir, "par-tmp") @@ -134,7 +138,7 @@ def minimap2_merge(chunk_dir, out, *query): for f in os.listdir(chunk_dir): shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f)) chunks.append(join(tmp_dir, "par-tmp", f)) - minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, query_tmp) + minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, minimap2_args, query_tmp) shutil.copy(join(tmp_dir, "par-tmp", "out.paf"), out) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py index 0d844a6c..ee911f16 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -5,7 +5,7 @@ import logging from urllib.parse import urlparse from multiprocessing import Pool - +import sys from idseq_utils.diamond_scatter import blastx_join from idseq_utils.batch_run_helpers import _run_batch_job, _db_chunks @@ -15,7 +15,7 @@ def _run_chunk( - chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, *queries: str + chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, diamond_args: str, *queries: str ): deployment_environment = os.environ["DEPLOYMENT_ENVIRONMENT"] priority_name = os.environ.get("PRIORITY_NAME", "normal") @@ -28,6 +28,7 @@ def _run_chunk( else: project_id, sample_id = "0", "0" + print(db_chunk, diamond_args, file=sys.stderr) job_name = (f"idseq-{deployment_environment}-{alignment_algorithm}-" f"project-{project_id}-sample-{sample_id}-part-{chunk_id}") job_queue = f"idseq-{deployment_environment}-{alignment_algorithm}-{provisioning_model}-{priority_name}" @@ -42,8 +43,13 @@ def _run_chunk( "name": "OUTPUT_DIR", "value": chunk_dir, }, + { + "name": "EXTRA_ARGS", + "value": diamond_args, + } ] + print(environment, file=sys.stderr) for i, query in enumerate(queries): environment.append( { @@ -64,16 +70,16 @@ def _run_chunk( def run_diamond( - input_dir: str, chunk_dir: str, db_path: str, result_path: str, *queries: str + input_dir: str, chunk_dir: str, db_path: str, result_path: str, diamond_args: str, *queries: str ): parsed_url = urlparse(db_path, allow_fragments=False) bucket = parsed_url.netloc prefix = parsed_url.path.lstrip("/") chunks = ( - [chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", *queries] + [chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", diamond_args, *queries] for chunk_id, db_chunk in enumerate(_db_chunks(bucket, prefix)) ) with Pool(MAX_CHUNKS_IN_FLIGHT) as p: p.starmap(_run_chunk, chunks) run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) - blastx_join("chunks", result_path, *queries) + blastx_join("chunks", result_path, diamond_args, *queries) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py b/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py index 8a5d4c6c..9a13912e 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py @@ -82,4 +82,4 @@ def run_minimap2( with Pool(MAX_CHUNKS_IN_FLIGHT) as p: p.starmap(_run_chunk, chunks) run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) - minimap2_merge("chunks", result_path, *queries) + minimap2_merge("chunks", result_path, minimap2_args, *queries) diff --git a/short-read-mngs/non_host_alignment.wdl b/short-read-mngs/non_host_alignment.wdl index be20daad..dfef18dd 100644 --- a/short-read-mngs/non_host_alignment.wdl +++ b/short-read-mngs/non_host_alignment.wdl @@ -211,7 +211,7 @@ task RunAlignment_diamond_out { set -euxo pipefail if [[ "~{run_locally}" == true ]]; then diamond makedb --in "~{local_diamond_index}" -d reference - diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8" + diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8" "~{diamond_args}" else export DEPLOYMENT_ENVIRONMENT=dev python3 <