diff --git a/isoslam/isoslam.py b/isoslam/isoslam.py index 843cac7..372cf71 100644 --- a/isoslam/isoslam.py +++ b/isoslam/isoslam.py @@ -5,6 +5,7 @@ from typing import Any from loguru import logger +from pysam import AlignedSegment from isoslam import io @@ -53,8 +54,8 @@ def extract_strand_transcript(gtf_file: str | Path) -> tuple[defaultdict[Any, An Returns ------- tuple[dict[str, tuple[str]], dict[str, tuple[str]]] - Two dictionaries are returned, one of the ``strand`` the other of the ``transcript_id`` both using the ``gene_id`` as - the key. + Two dictionaries are returned, one of the ``strand`` the other of the ``transcript_id`` both using the + ``gene_id`` as the key. """ strand = defaultdict(str) transcript = defaultdict(list) @@ -65,3 +66,83 @@ def extract_strand_transcript(gtf_file: str | Path) -> tuple[defaultdict[Any, An transcript[entry.gene_id].append(entry.transcript_id) logger.info(f"Extracted features from : {gtf_file}") return (strand, transcript) + + +def extract_features_from_read(read: AlignedSegment) -> dict[str, int | str | None | tuple[int, int]]: + """ + Extract start, end and length from an aligned segment read. + + Parameters + ---------- + read : AlignedSegment + An aligned segment read from ``pysam``. + + Returns + ------- + dict[str, Any] + Dictionary of ``start``, ``end`` and ``length`` of the segment. + """ + block_start, block_end = zip(*read.get_blocks()) + try: + status = read.get_tag("XS") + except KeyError: + status = None + try: + transcript = read.get_tag("XT") + except KeyError: + transcript = None + return { + "start": read.reference_start, + "end": read.reference_end, + "length": read.reference_length, + "status": status, + "transcript": transcript, + "block_start": block_start, + "block_end": block_end, + } + + +def extract_features_from_pair(pair: list[AlignedSegment]) -> dict[str, dict[str, Any]]: + """ + Extract features from a pair of reads. + + Parameters + ---------- + pair : list[AlignedSegment] + A list of two aligned segments from ``pysam``. + + Returns + ------- + dic[str, dict[str, Any]] + Returns a nested dictionaries of the ``start``, ``end`` and ``length`` of each read. + """ + return { + "read1": extract_features_from_read(pair[0]), + "read2": extract_features_from_read(pair[1]), + } + + +# def extract_utron(read: AlignedSegment, transcript_to_gene: Any, tag: str = "XS") -> list | None: +# """ +# Extract and sum the utrons based on tag. + +# ACTION : This function needs better documentation, my guess is that its extracting the transcripts to genes and +# then getting some related information (what I'm not sure) from the .bed file and adding these up. + +# Parameters +# ---------- +# read : AlignedSegement +# An aligned segment read. +# transcript_to_gene : TextIO +# Transcript to gene from a ``.bed`` file. +# tag : str +# Type of tag to extract. + +# Returns +# ------- +# list | None +# List of the length of assigned regions. +# """ +# if read.get_tag(tag) == "Assigned": +# return sum(bed_file[transcript] for transcript in tx2gene[read.get_tag("XT")]) +# return None diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9975a78 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,45 @@ +"""Fixtures for pytest.""" + +from pathlib import Path + +import pytest +from pysam import AlignedSegment, AlignmentFile + +from isoslam import io + +BASE_DIR = Path.cwd() +TEST_DIR = BASE_DIR / "tests" +RESOURCES = TEST_DIR / "resources" +GTF_DIR = RESOURCES / "gtf" +BED_DIR = RESOURCES / "bed" +VCF_DIR = RESOURCES / "vcf" +BAM_DIR = RESOURCES / "bam" + + +# pylint: disable=redefined-outer-name + + +@pytest.fixture() +def bam_file() -> AlignmentFile: + """Load a bam file.""" + return io.load_file(BAM_DIR / "d0_no4sU_filtered_remapped_sorted.bam") + + +@pytest.fixture() +def aligned_segment_28584(bam_file: AlignmentFile) -> AlignedSegment: + """Extract an individual AlignedSegment from a ``.bam`` file.""" + return next(bam_file.fetch(contig="chr9", start=28592, end=28593)) + + +@pytest.fixture() +def aligned_segment_17416(bam_file: AlignmentFile) -> AlignedSegment: + """Extract an individual AlignedSegment from a ``.bam`` file.""" + # @ns-rse : I have no idea why the generator doesn't return AlignedSegments that match the start/end here + return next(bam_file.fetch(contig="chr9", start=17804, end=18126)) + + +@pytest.fixture() +def aligned_segment_18029(bam_file: AlignmentFile) -> AlignedSegment: + """Extract an individual AlignedSegment from a ``.bam`` file.""" + # @ns-rse : I have no idea why the generator doesn't return AlignedSegments that match the start/end here + return next(bam_file.fetch(contig="chr9", start=18156, end=24870)) diff --git a/tests/test_isoslam.py b/tests/test_isoslam.py index 985c56e..3d2bbc3 100644 --- a/tests/test_isoslam.py +++ b/tests/test_isoslam.py @@ -10,6 +10,9 @@ BASE_DIR = Path.cwd() RESOURCES = BASE_DIR / "tests" / "resources" +# pylint: disable=too-many-arguments +# pylint: disable=too-many-positional-arguments + @pytest.mark.parametrize( ("bed_file", "expected_transcript"), @@ -60,3 +63,113 @@ def test_extract_strand_transcript( strand, transcript = isoslam.extract_strand_transcript(gtf_file) assert strand == expected_strand assert transcript == expected_transcript + + +@pytest.mark.parametrize( + ("aligned_segment", "start", "end", "length", "status", "transcript", "block_start", "block_end"), + [ + pytest.param( # type: ignore[misc] + "aligned_segment_28584", + 28584, + 28733, + 149, + None, + None, + (28584, 28704), + (28704, 28733), + id="28584 - Assignment and Transcript are None", + ), + pytest.param( + "aligned_segment_17416", + 17416, + 17805, + 389, + "Assigned", + "Something", + (17416, 17718), + (17479, 17805), + id="17416 - Assignment and Transcript are None", + marks=pytest.mark.xfail(reasion="Not getting Assigned or Transcript tags for some reason?"), + ), + pytest.param( + "aligned_segment_18029", + 18029, + 18385, + 356, + "Assigned", + "something", + (18029, 18380), + (18174, 18385), + id="17416 - Assignment and Transcript are None", + marks=pytest.mark.xfail(reasion="Not getting Assigned tag for some reason?"), + ), + ], +) +def test_extract_features_from_read_param( + aligned_segment: str, + start: int, + end: int, + length: int, + status: str, + transcript: str, + block_start: tuple[int, int], + block_end: tuple[int, int], + request: pytest.FixtureRequest, +) -> None: + """Test extract of features from an aligned segment read.""" + segment = isoslam.extract_features_from_read(request.getfixturevalue(aligned_segment)) + assert isinstance(segment, dict) + assert segment["start"] == start + assert segment["end"] == end + assert segment["length"] == length + assert segment["status"] == status + assert segment["transcript"] == transcript + assert segment["block_start"] == block_start + assert segment["block_end"] == block_end + + +@pytest.mark.parametrize( + ("aligned_segment1", "aligned_segment2", "expected"), + [ + pytest.param( # type: ignore[misc] + "aligned_segment_28584", + "aligned_segment_17416", + { + "read1": { + "start": 28584, + "end": 28733, + "length": 149, + "status": None, + "transcript": None, + "block_start": (28584, 28704), + "block_end": (28704, 28733), + }, + "read2": { + "start": 17416, + "end": 17805, + "length": 389, + "status": None, + "transcript": None, + "block_start": (17416, 17718), + "block_end": (17479, 17805), + }, + }, + id="28584 and 17416 - Assignment and Transcript are None", + ), + ], +) +def test_extract_features_from_pair( + aligned_segment1: str, + aligned_segment2: str, + expected: dict[str, dict[str, int | None | str | tuple[int, int]]], + request: pytest.FixtureRequest, +) -> None: + """Test extract of features from a list of pairs.""" + read_pair = isoslam.extract_features_from_pair( + [ + request.getfixturevalue(aligned_segment1), + request.getfixturevalue(aligned_segment2), + ] + ) + assert isinstance(read_pair, dict) + assert read_pair == expected