Skip to content

Commit

Permalink
Merge pull request #545 from Sage-Bionetworks/gen-1027-store-error-re…
Browse files Browse the repository at this point in the history
…port

[GEN-1027] Store failed annotations error report
  • Loading branch information
rxu17 authored Jan 24, 2024
2 parents 446de5c + e62fd79 commit 4f9be49
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 72 deletions.
212 changes: 175 additions & 37 deletions genie/process_mutation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Process mutation files
TODO deprecate this module and spread functions around"""
from collections import namedtuple
import logging
import os
import shutil
Expand All @@ -10,7 +11,7 @@
import pandas as pd
from synapseclient import Synapse

from . import load, process_functions
from . import extract, load, process_functions

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,12 +199,14 @@ def process_mutation_workflow(
logger.info("No mutation data")
return None
# Certificate to use GENIE Genome Nexus

syn.get(
"syn22053204",
ifcollision="overwrite.local",
downloadLocation=genie_config["genie_annotation_pkg"],
# version=1, # TODO: This should pull from a config file in the future
)

# Genome Nexus Jar file
syn.get(
"syn22084320",
Expand All @@ -212,11 +215,12 @@ def process_mutation_workflow(
# version=13, # TODO: This should pull from a config file in the future
)

annotated_maf_path = annotate_mutation(
annotation_paths = create_annotation_paths(center=center, workdir=workdir)
annotate_mutation(
annotation_paths=annotation_paths,
center=center,
mutation_files=valid_mutation_files,
genie_annotation_pkg=genie_config["genie_annotation_pkg"],
workdir=workdir,
)

maf_tableid = genie_config["vcf2maf"]
Expand All @@ -226,18 +230,165 @@ def process_mutation_workflow(
syn=syn,
center=center,
maf_tableid=maf_tableid,
annotated_maf_path=annotated_maf_path,
annotation_paths=annotation_paths,
flatfiles_synid=flatfiles_synid,
workdir=workdir,
)

return annotated_maf_path
full_error_report = concat_annotation_error_reports(
center=center,
input_dir=annotation_paths.error_dir,
)
check_annotation_error_reports(
syn=syn,
maf_table_synid=maf_tableid,
full_error_report=full_error_report,
center=center,
)
store_annotation_error_reports(
full_error_report=full_error_report,
full_error_report_path=annotation_paths.full_error_report_path,
syn=syn,
errors_folder_synid=genie_config["center_config"][center]["errorsSynId"],
)
return annotation_paths.merged_maf_path


def create_annotation_paths(center: str, workdir: str) -> namedtuple:
"""Creates the filepaths required in the annotation process
Args:
center (str): name of the center
workdir (str): work directory to create paths in
Returns:
namedtuple: tuple with all the paths
"""
input_files_dir = tempfile.mkdtemp(dir=workdir)
output_files_dir = tempfile.mkdtemp(dir=workdir)
Filepaths = namedtuple(
"Filepaths",
[
"input_files_dir",
"output_files_dir",
"error_dir",
"merged_maf_path",
"full_maf_path",
"narrow_maf_path",
"full_error_report_path",
],
)
annotation_paths = Filepaths(
input_files_dir=input_files_dir,
output_files_dir=output_files_dir,
error_dir=os.path.join(output_files_dir, f"{center}_error_reports"),
merged_maf_path=os.path.join(
output_files_dir, f"data_mutations_extended_{center}.txt"
),
full_maf_path=os.path.join(
workdir, center, "staging", f"data_mutations_extended_{center}.txt"
),
narrow_maf_path=os.path.join(
workdir,
center,
"staging",
f"data_mutations_extended_{center}_MAF_narrow.txt",
),
full_error_report_path=os.path.join(
workdir,
center,
"staging",
f"failed_annotations_error_report.txt",
),
)
return annotation_paths


def concat_annotation_error_reports(
center: str,
input_dir: str,
) -> pd.DataFrame:
"""Concatenates the annotation error reports
Args:
center (str): name of center associated with error report
input_dir (str): directory where error reports are
Returns:
pd.DataFrame: full annotation error report
"""
error_files = os.listdir(input_dir)
chunk_size = 10000
error_reports = []

# Read and concatenate TSV files in chunks
for file in error_files:
for chunk in pd.read_csv(
os.path.join(input_dir, file), sep="\t", chunksize=chunk_size
):
error_reports.append(chunk)
full_error_report = pd.concat(error_reports)
full_error_report["Center"] = center
return full_error_report


def check_annotation_error_reports(
syn: Synapse, maf_table_synid: str, full_error_report: pd.DataFrame, center: str
) -> None:
"""A simple QC check to make sure our genome nexus error report
failed annotations matches our final processed maf table's failed
annotations
Args:
syn (Synapse): synapse client
maf_table_synid (str): synapse_id of the narrow maf table
full_error_report (pd.DataFrame): the failed annotations error report
center (str): the center this is for
"""
maf_table_df = extract.get_syntabledf(
syn=syn,
query_string=(
f"SELECT * FROM {maf_table_synid} "
f"WHERE Center = '{center}' AND "
"Annotation_Status = 'FAILED'"
),
)
if len(maf_table_df) != len(full_error_report):
logger.warning(
"Genome nexus's failed annotations error report rows doesn't match"
f"maf table's failed annotations for {center}"
)


def store_annotation_error_reports(
full_error_report: pd.DataFrame,
full_error_report_path: str,
syn: Synapse,
errors_folder_synid: str,
) -> None:
"""Stores the annotation error reports to synapse
Args:
full_error_report (pd.DataFrame): full error report to store
full_error_report_path (str) where to store the flat file of the full error report
syn (synapseclient.Synapse): synapse client object
errors_folder_synid (str): synapse id of error report folder
to store reports in
"""
full_error_report.to_csv(full_error_report_path, sep="\t", index=False)
load.store_file(
syn=syn,
filepath=full_error_report_path,
parentid=errors_folder_synid,
)


# TODO: add to transform
def annotate_mutation(
center: str, mutation_files: list, genie_annotation_pkg: str, workdir: str
) -> str:
annotation_paths: namedtuple,
mutation_files: list,
genie_annotation_pkg: str,
center: str,
) -> None:
"""Process vcf/maf files
Args:
Expand All @@ -248,32 +399,23 @@ def annotate_mutation(
Returns:
Path to final maf
"""
input_files_dir = tempfile.mkdtemp(dir=workdir)
output_files_dir = tempfile.mkdtemp(dir=workdir)
error_dir = os.path.join(output_files_dir, f"{center}_error_reports")

for mutation_file in mutation_files:
move_mutation(mutation_file, input_files_dir)
move_mutation(mutation_file, annotation_paths.input_files_dir)

merged_maf_path = os.path.join(
output_files_dir, f"data_mutations_extended_{center}.txt"
)
annotater_cmd = [
"bash",
os.path.join(genie_annotation_pkg, "annotation_suite_wrapper.sh"),
f"-i={input_files_dir}",
f"-o={output_files_dir}",
f"-e={error_dir}",
f"-m={merged_maf_path}",
f"-i={annotation_paths.input_files_dir}",
f"-o={annotation_paths.output_files_dir}",
f"-e={annotation_paths.error_dir}",
f"-m={annotation_paths.merged_maf_path}",
f"-c={center}",
"-s=WXS",
f"-p={genie_annotation_pkg}",
]

subprocess.check_call(annotater_cmd)

return merged_maf_path


# TODO: add to transform
def append_or_createdf(dataframe: pd.DataFrame, filepath: str):
Expand Down Expand Up @@ -319,17 +461,16 @@ def split_and_store_maf(
syn: Synapse,
center: str,
maf_tableid: str,
annotated_maf_path: str,
annotation_paths: namedtuple,
flatfiles_synid: str,
workdir: str,
):
"""Separates annotated maf file into narrow and full maf and stores them
Args:
syn: Synapse connection
center: Center
maf_tableid: Mutation table synapse id
annotated_maf_path: Annotated maf
annotation_paths: filepaths in the annotation process
flatfiles_synid: GENIE flat files folder
"""
Expand All @@ -338,22 +479,19 @@ def split_and_store_maf(
for col in syn.getTableColumns(maf_tableid)
if col["name"] != "inBED"
]
full_maf_path = os.path.join(
workdir, center, "staging", f"data_mutations_extended_{center}.txt"
)
narrow_maf_path = os.path.join(
workdir, center, "staging", f"data_mutations_extended_{center}_MAF_narrow.txt"
)
maf_chunks = pd.read_csv(
annotated_maf_path, sep="\t", chunksize=100000, comment="#"
annotation_paths.merged_maf_path, sep="\t", chunksize=100000, comment="#"
)

for maf_chunk in maf_chunks:
maf_chunk = format_maf(maf_chunk, center)
append_or_createdf(maf_chunk, full_maf_path)
append_or_createdf(maf_chunk, annotation_paths.full_maf_path)
narrow_maf_chunk = maf_chunk[narrow_maf_cols]
append_or_createdf(narrow_maf_chunk, narrow_maf_path)
append_or_createdf(narrow_maf_chunk, annotation_paths.narrow_maf_path)

load.store_table(syn=syn, filepath=narrow_maf_path, tableid=maf_tableid)
load.store_table(
syn=syn, filepath=annotation_paths.narrow_maf_path, tableid=maf_tableid
)
# Store MAF flat file into synapse
load.store_file(syn=syn, filepath=full_maf_path, parentid=flatfiles_synid)
load.store_file(
syn=syn, filepath=annotation_paths.full_maf_path, parentid=flatfiles_synid
)
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,23 @@ def genie_config():
"center": "SAGE",
"inputSynId": "syn11601335",
"stagingSynId": "syn11601337",
"errorsSynId": "syn53239079",
"release": True,
"mutationInCisFilter": "ON",
},
"TEST": {
"center": "TEST",
"inputSynId": "syn11601340",
"stagingSynId": "syn11601342",
"errorsSynId": "syn53239081",
"release": True,
"mutationInCisFilter": "ON",
},
"GOLD": {
"center": "GOLD",
"inputSynId": "syn52950860",
"stagingSynId": "syn52950861",
"errorsSynId": "syn53239077",
"release": True,
"mutationInCisFilter": "ON",
},
Expand Down
Loading

0 comments on commit 4f9be49

Please sign in to comment.