Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add diamond extra args flag #192

Open
wants to merge 10 commits into
base: rlim-add-minimap2-to-wdl
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def diamond_blastx(
out: str,
queries: Iterable[str],
chunk=False,
diamond_args="",
join_chunks=0,
):
cmd = [
Expand All @@ -64,6 +65,7 @@ def diamond_blastx(
database,
"--out",
out,
f"--{diamond_args}",
]
for query in queries:
cmd += ["--query", query]
Expand Down Expand Up @@ -132,7 +134,7 @@ def make_db(reference_fasta: str, output_dir: str, chunks: int):
print(f"COMPLETED CHUNK {i}")


def blastx_chunk(db_chunk: str, output_dir: str, *query: str):
def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str):
try:
os.mkdir(output_dir)
except OSError as e:
Expand All @@ -150,6 +152,7 @@ def blastx_chunk(db_chunk: str, output_dir: str, *query: str):
database=abspath(db_chunk),
out="out.tsv",
chunk=True,
diamond_args=diamond_args,
queries=(abspath(q) for q in query),
)
ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}"
Expand All @@ -173,7 +176,7 @@ def mock_reference_fasta(chunks: int, chunk_size: int):
i += 1


def blastx_join(chunk_dir: str, out: str, *query: str):
def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str):
with TemporaryDirectory() as tmp_dir:
make_par_dir(tmp_dir, "par-tmp")
with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f:
Expand All @@ -194,6 +197,7 @@ def blastx_join(chunk_dir: str, out: str, *query: str):
database=db.name,
out=out,
join_chunks=chunks,
diamond_args=diamond_args,
queries=(abspath(q) for q in query),
)

Expand All @@ -216,11 +220,13 @@ def blastx_join(chunk_dir: str, out: str, *query: str):
blastx_chunk_parser = subparsers.add_parser("blastx-chunk")
blastx_chunk_parser.add_argument("--db", required=True)
blastx_chunk_parser.add_argument("--out-dir", required=True)
blastx_chunk_parser.add_argument("--diamond-args", required=False)
blastx_chunk_parser.add_argument("--query", required=True, action="append")

blastx_chunks_parser = subparsers.add_parser("blastx-chunks")
blastx_chunks_parser.add_argument("--db-dir", required=True)
blastx_chunks_parser.add_argument("--out-dir", required=True)
blastx_chunks_parser.add_argument("--diamond-args", required=False)
blastx_chunks_parser.add_argument("--query", required=True, action="append")

blastx_join_parser = subparsers.add_parser("blastx-join")
Expand All @@ -232,16 +238,16 @@ def blastx_join(chunk_dir: str, out: str, *query: str):
if args.command == "make-db":
make_db(args.__getattribute__("in"), args.db, args.chunks)
elif args.command == "blastx-chunk":
blastx_chunk(args.db, args.out_dir, *args.query)
blastx_chunk(args.db, args.out_dir, args.diamond_args, *args.query)
elif args.command == "blastx-chunks":

def _blastx_chunk(db):
print(f"STARTING: {db}")
res = blastx_chunk(join(args.db_dir, db), args.out_dir, *args.query)
res = blastx_chunk(join(args.db_dir, db), args.out_dir, args.diamond_args, *args.query)
print(f"FINISHING: {db}")
return res

with Pool(48) as p:
p.map(_blastx_chunk, os.listdir(args.db_dir))
elif args.command == "blastx-join":
blastx_join(args.chunk_dir, args.out, *args.query)
blastx_join(args.chunk_dir, args.out, args.diamond_args, *args.query)
12 changes: 8 additions & 4 deletions short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import shutil
import shlex
import sys
import errno
from argparse import ArgumentParser
Expand Down Expand Up @@ -40,8 +41,9 @@ def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries):
raise Exception(f"Command failed: {' '.join(cmd)}")


def minimap2_merge_cmd(cwd, par_tmpdir, chunks, queries):
cmd = ["minimap2", "-cx", "sr", "--split-merge", "-o", f"{par_tmpdir}/out.paf"]
def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries):
print("Starting minimap2 merge", file=sys.stderr)
cmd = ["minimap2", "--split-merge", "-o", f"{par_tmpdir}/out.paf"]
for query in queries:
cmd += [query]

Expand All @@ -51,6 +53,8 @@ def minimap2_merge_cmd(cwd, par_tmpdir, chunks, queries):
for chunk in chunks:
cmd += [f"{chunk}"]

cmd.extend(shlex.split(minimap2_args))

res = run(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE)
if res.returncode != 0:
for line in res.stderr.decode().split("\n"):
Expand Down Expand Up @@ -120,7 +124,7 @@ def minimap2_chunk(db_chunk: str, output_dir: str, *query: str):
)


def minimap2_merge(chunk_dir, out, *query):
def minimap2_merge(chunk_dir, out, minimap2_args, *query):
chunk_dir = abspath(chunk_dir)
with TemporaryDirectory() as tmp_dir:
make_par_dir(tmp_dir, "par-tmp")
Expand All @@ -134,7 +138,7 @@ def minimap2_merge(chunk_dir, out, *query):
for f in os.listdir(chunk_dir):
shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f))
chunks.append(join(tmp_dir, "par-tmp", f))
minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, query_tmp)
minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, minimap2_args, query_tmp)
shutil.copy(join(tmp_dir, "par-tmp", "out.paf"), out)


Expand Down
16 changes: 11 additions & 5 deletions short-read-mngs/idseq_utils/idseq_utils/run_diamond.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from urllib.parse import urlparse
from multiprocessing import Pool

import sys
from idseq_utils.diamond_scatter import blastx_join
from idseq_utils.batch_run_helpers import _run_batch_job, _db_chunks

Expand All @@ -15,7 +15,7 @@


def _run_chunk(
chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, *queries: str
chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, diamond_args: str, *queries: str
):
deployment_environment = os.environ["DEPLOYMENT_ENVIRONMENT"]
priority_name = os.environ.get("PRIORITY_NAME", "normal")
Expand All @@ -28,6 +28,7 @@ def _run_chunk(
else:
project_id, sample_id = "0", "0"

print(db_chunk, diamond_args, file=sys.stderr)
job_name = (f"idseq-{deployment_environment}-{alignment_algorithm}-"
f"project-{project_id}-sample-{sample_id}-part-{chunk_id}")
job_queue = f"idseq-{deployment_environment}-{alignment_algorithm}-{provisioning_model}-{priority_name}"
Expand All @@ -42,8 +43,13 @@ def _run_chunk(
"name": "OUTPUT_DIR",
"value": chunk_dir,
},
{
"name": "EXTRA_ARGS",
"value": diamond_args,
}
]

print(environment, file=sys.stderr)
for i, query in enumerate(queries):
environment.append(
{
Expand All @@ -64,16 +70,16 @@ def _run_chunk(


def run_diamond(
input_dir: str, chunk_dir: str, db_path: str, result_path: str, *queries: str
input_dir: str, chunk_dir: str, db_path: str, result_path: str, diamond_args: str, *queries: str
):
parsed_url = urlparse(db_path, allow_fragments=False)
bucket = parsed_url.netloc
prefix = parsed_url.path.lstrip("/")
chunks = (
[chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", *queries]
[chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", diamond_args, *queries]
for chunk_id, db_chunk in enumerate(_db_chunks(bucket, prefix))
)
with Pool(MAX_CHUNKS_IN_FLIGHT) as p:
p.starmap(_run_chunk, chunks)
run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True)
blastx_join("chunks", result_path, *queries)
blastx_join("chunks", result_path, diamond_args, *queries)
2 changes: 1 addition & 1 deletion short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ def run_minimap2(
with Pool(MAX_CHUNKS_IN_FLIGHT) as p:
p.starmap(_run_chunk, chunks)
run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True)
minimap2_merge("chunks", result_path, *queries)
minimap2_merge("chunks", result_path, minimap2_args, *queries)
5 changes: 3 additions & 2 deletions short-read-mngs/non_host_alignment.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ task RunAlignment_diamond_out {
set -euxo pipefail
if [[ "~{run_locally}" == true ]]; then
diamond makedb --in "~{local_diamond_index}" -d reference
diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8"
diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8" "~{diamond_args}"
else
export DEPLOYMENT_ENVIRONMENT=dev
python3 <<CODE
Expand All @@ -226,6 +226,7 @@ task RunAlignment_diamond_out {
chunk_dir,
"~{db_path}",
"~{prefix}.m8",
"~{diamond_args}",
*fastas
)
CODE
Expand Down Expand Up @@ -413,7 +414,7 @@ workflow idseq_non_host_alignment {
String minimap2_db = "s3://idseq-public-references/minimap2-test/2021-01-22/nt_k12_w8_20/"
String diamond_db = "s3://idseq-public-references/diamond-test/2021-01-22/"
String minimap2_args = "-cx sr --secondary=yes"
String diamond_args = ""
String diamond_args = "sensitive"
String minimap2_prefix = "gsnap"
String diamond_prefix = "rapsearch2"

Expand Down
1 change: 1 addition & 0 deletions short-read-mngs/test/local_test_viral_minimap2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ non_host_alignment.accession2taxid_db: s3://idseq-public-references/mini-databas
non_host_alignment.alignment_scalability: true
non_host_alignment.minimap2_local_db_path: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt
non_host_alignment.diamond_local_db_path: s3://idseq-public-references/test/viral-alignment-indexes/viral_nr
non_host_alignment.diamond_args: "--fast"
postprocess.nt_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt
postprocess.nt_loc_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt_loc.db
postprocess.nr_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nr
Expand Down