Skip to content

Commit

Permalink
Use Temporary Directory for MaveDB Score and Metadata Files
Browse files Browse the repository at this point in the history
Creates a decorator for mapping routines which creates a temporary directory in which score set
metadata and score files can be downloaded. The directory path is then passed to the mapping routine so that these temp files
can be used by the mapper. Once the wrapped function exits, the temporary directory is purged.
  • Loading branch information
bencap committed Nov 13, 2024
1 parent 2813d84 commit 21c75ab
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 27 deletions.
12 changes: 8 additions & 4 deletions src/api/routers/map.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
""""Provide mapping router"""
from pathlib import Path

from cool_seq_tool.schemas import AnnotationLayer
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
Expand All @@ -17,6 +19,7 @@
get_raw_scoreset_metadata,
get_scoreset_metadata,
get_scoreset_records,
with_mavedb_score_set,
)
from dcd_mapping.resource_utils import ResourceAcquisitionError
from dcd_mapping.schemas import ScoreAnnotation, ScoresetMapping, VrsVersion
Expand All @@ -29,7 +32,8 @@


@router.post(path="/map/{urn}", status_code=200, response_model=ScoresetMapping)
async def map_scoreset(urn: str) -> ScoresetMapping:
@with_mavedb_score_set
async def map_scoreset(urn: str, store_path: Path | None = None) -> ScoresetMapping:
"""Perform end-to-end mapping for a scoreset.
:param urn: identifier for a scoreset.
Expand All @@ -38,8 +42,8 @@ async def map_scoreset(urn: str) -> ScoresetMapping:
:param silent: if True, suppress console information output
"""
try:
metadata = get_scoreset_metadata(urn)
records = get_scoreset_records(urn, True)
metadata = get_scoreset_metadata(urn, store_path)
records = get_scoreset_records(urn, True, store_path)
except ScoresetNotSupportedError as e:
return ScoresetMapping(
metadata=None,
Expand Down Expand Up @@ -132,7 +136,7 @@ async def map_scoreset(urn: str) -> ScoresetMapping:
for layer in preferred_layers:
reference_sequences[layer][
"computed_reference_sequence"
] = _get_computed_reference_sequence(urn, layer, transcript)
] = _get_computed_reference_sequence(metadata, layer, transcript)
reference_sequences[layer][
"mapped_reference_sequence"
] = _get_mapped_reference_sequence(layer, transcript, alignment_result)
Expand Down
25 changes: 6 additions & 19 deletions src/dcd_mapping/annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
get_seqrepo,
get_vrs_id_from_identifier,
)
from dcd_mapping.mavedb_data import get_raw_scoreset_metadata, get_scoreset_metadata
from dcd_mapping.resource_utils import LOCAL_STORE_PATH
from dcd_mapping.schemas import (
AlignmentResult,
Expand Down Expand Up @@ -409,7 +408,7 @@ def annotate(


def _get_computed_reference_sequence(
ss: str,
metadata: ScoresetMetadata,
layer: AnnotationLayer,
tx_output: TxSelectResult | None = None,
) -> ComputedReferenceSequence:
Expand All @@ -429,7 +428,6 @@ def _get_computed_reference_sequence(
sequence_type=TargetSequenceType.PROTEIN,
sequence_id=seq_id,
)
metadata = get_scoreset_metadata(ss)
seq_id = f"ga4gh:SQ.{sha512t24u(metadata.target_sequence.encode('ascii'))}"
return ComputedReferenceSequence(
sequence=metadata.target_sequence,
Expand Down Expand Up @@ -516,7 +514,7 @@ def write_scoreset_mapping_to_json(


def save_mapped_output_json(
urn: str,
metadata: ScoresetMetadata,
mappings: list[ScoreAnnotationWithLayer],
align_result: AlignmentResult,
tx_output: TxSelectResult | None,
Expand All @@ -533,10 +531,9 @@ def save_mapped_output_json(
<dcd_mapping_data_dir>/urn:mavedb:00000XXX-X-X_mapping_<ISO8601 datetime>.json
:return: output location
"""
metadata = get_raw_scoreset_metadata(urn)
if preferred_layer_only:
preferred_layers = {
_set_scoreset_layer(urn, mappings),
_set_scoreset_layer(metadata.urn, mappings),
}
else:
preferred_layers = {mapping.annotation_layer for mapping in mappings}
Expand All @@ -549,20 +546,10 @@ def save_mapped_output_json(
for layer in preferred_layers:
reference_sequences[layer][
"computed_reference_sequence"
] = _get_computed_reference_sequence(urn, layer, tx_output)
] = _get_computed_reference_sequence(metadata, layer, tx_output)
reference_sequences[layer][
"mapped_reference_sequence"
] = _get_mapped_reference_sequence(layer, tx_output, align_result)
# except Exception as e:
# _logger.warning(
# str(e)
# )
# output = ScoresetMapping(
# metadata=metadata,
# error_message = str(e).strip("'")
# )

# return write_scoreset_mapping_to_json

mapped_scores: list[ScoreAnnotation] = []
for m in mappings:
Expand All @@ -573,7 +560,7 @@ def save_mapped_output_json(
mapped_scores.append(ScoreAnnotation(**m.model_dump()))

output = ScoresetMapping(
metadata=metadata,
metadata=metadata.model_dump(),
computed_protein_reference_sequence=reference_sequences[
AnnotationLayer.PROTEIN
]["computed_reference_sequence"],
Expand All @@ -589,4 +576,4 @@ def save_mapped_output_json(
mapped_scores=mapped_scores,
)

return write_scoreset_mapping_to_json(urn, output, output_path)
return write_scoreset_mapping_to_json(metadata.urn, output, output_path)
9 changes: 6 additions & 3 deletions src/dcd_mapping/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ScoresetNotSupportedError,
get_scoreset_metadata,
get_scoreset_records,
with_mavedb_score_set,
)
from dcd_mapping.resource_utils import ResourceAcquisitionError
from dcd_mapping.schemas import (
Expand Down Expand Up @@ -264,7 +265,7 @@ async def map_scoreset(
return
try:
final_output = save_mapped_output_json(
metadata.urn,
metadata,
vrs_results,
alignment_result,
transcript,
Expand All @@ -287,12 +288,14 @@ async def map_scoreset(
_emit_info(f"Annotated scores saved to: {final_output}.", silent)


@with_mavedb_score_set
async def map_scoreset_urn(
urn: str,
output_path: Path | None = None,
vrs_version: VrsVersion = VrsVersion.V_2,
prefer_genomic: bool = False,
silent: bool = True,
store_path: Path | None = None,
) -> None:
"""Perform end-to-end mapping for a scoreset.
Expand All @@ -302,8 +305,8 @@ async def map_scoreset_urn(
:param silent: if True, suppress console information output
"""
try:
metadata = get_scoreset_metadata(urn)
records = get_scoreset_records(urn, silent)
metadata = get_scoreset_metadata(urn, store_path)
records = get_scoreset_records(urn, silent, store_path)
except ScoresetNotSupportedError as e:
_emit_info(f"Score set not supported: {e}", silent, logging.ERROR)
final_output = write_scoreset_mapping_to_json(
Expand Down
30 changes: 29 additions & 1 deletion src/dcd_mapping/mavedb_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
Much of this can/should be replaced by the ``mavetools`` library? (and/or ``wags-tails``.)
"""

import csv
import json
import logging
import tempfile
import zipfile
from collections.abc import Callable
from functools import wraps
from pathlib import Path
from typing import Any

Expand All @@ -20,7 +23,7 @@
authentication_header,
http_download,
)
from dcd_mapping.schemas import ScoreRow, ScoresetMetadata, UniProtRef
from dcd_mapping.schemas import ScoreRow, ScoresetMapping, ScoresetMetadata, UniProtRef

__all__ = [
"get_scoreset_urns",
Expand Down Expand Up @@ -135,6 +138,7 @@ def get_raw_scoreset_metadata(
"""
if not dcd_mapping_dir:
dcd_mapping_dir = LOCAL_STORE_PATH

metadata_file = dcd_mapping_dir / f"{scoreset_urn}_metadata.json"
if not metadata_file.exists():
url = f"{MAVEDB_BASE_URL}/api/v1/score-sets/{scoreset_urn}"
Expand Down Expand Up @@ -265,3 +269,27 @@ def get_scoreset_records(
raise ResourceAcquisitionError(msg) from e

return _load_scoreset_records(scores_csv)


def with_mavedb_score_set(fn: Callable) -> Callable:
@wraps(fn)
async def wrapper(*args, **kwargs) -> ScoresetMapping: # noqa: ANN002
urn = args[0] if args else kwargs["urn"]
silent = kwargs.get("silent", False)

with tempfile.TemporaryDirectory(
prefix=f"{LOCAL_STORE_PATH.as_posix()}/"
) as temp_dir:
# Set up metadata and scores for the current run. Now they will be accessible by these functions
# without the need to download the data again.
temp_dir_as_path = Path(temp_dir)
get_scoreset_metadata(urn, temp_dir_as_path)
get_scoreset_records(urn, silent, temp_dir_as_path)

# Pass the storage path of the temp directory to the wrapped function as a kwarg.
kwargs["store_path"] = temp_dir_as_path
v: ScoresetMapping = await fn(*args, **kwargs)

return v

return wrapper

0 comments on commit 21c75ab

Please sign in to comment.