diff --git a/src/dask_ngs/_index.py b/src/dask_ngs/_index.py index a411d19..b9e374e 100644 --- a/src/dask_ngs/_index.py +++ b/src/dask_ngs/_index.py @@ -1,9 +1,10 @@ -import os +from __future__ import annotations + +from pathlib import Path import numpy as np import pandas as pd - BAI_MIN_SHIFT = 14 BAI_DEPTH = 5 COMPRESSED_POSITION_SHIFT = 16 @@ -16,90 +17,95 @@ def read_bai(path: str): https://samtools.github.io/hts-specs/SAMv1.pdf """ int_kwargs = {'byteorder': 'little', 'signed': False} - f = open(path, "rb") - # read the 4-byte magic number - magic = f.read(4) - - # read the number of reference sequences - n_ref = int.from_bytes(f.read(4), **int_kwargs) - - # read the reference sequence indices - references = [] - for i in range(n_ref): - ref = {'ref_id': i} - - # The "Bin Index" - chunks = [] - n_bin = int.from_bytes(f.read(4), **int_kwargs) - for _ in range(n_bin): - # bin number - bin_id = int.from_bytes(f.read(4), **int_kwargs) - - if bin_id == 37450: - # This is an entry that describes the "pseudo-bin" for the - # reference, using the same byte layout as normal bins but - # interpreted differently. - # The ref beg/ref end fields locate the first and last reads on - # this reference sequence, whether they are mapped or placed - # unmapped. Thus they are equal to the minimum chunk beg and - # maximum chunk end respectively. - n_chunk = int.from_bytes(f.read(4), **int_kwargs) # always 2 - # Not really a chunk - vpos = int.from_bytes(f.read(8), **int_kwargs) - ref["ref_beg.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT - ref["ref_beg.upos"] = vpos & UNCOMPRESSED_POSITION_MASK + with Path(path).open("rb") as f: + # read the 4-byte magic number + f.read(4) + + # read the number of reference sequences + n_ref = int.from_bytes(f.read(4), **int_kwargs) + + # read the reference sequence indices + references = [] + for i in range(n_ref): + ref = {'ref_id': i} + + # The "Bin Index" + chunks = [] + n_bin = int.from_bytes(f.read(4), **int_kwargs) + for _ in range(n_bin): + # bin number + bin_id = int.from_bytes(f.read(4), **int_kwargs) + + if bin_id == 37450: + # This is an entry that describes the "pseudo-bin" for the + # reference, using the same byte layout as normal bins but + # interpreted differently. + # The ref beg/ref end fields locate the first and last reads on + # this reference sequence, whether they are mapped or placed + # unmapped. Thus they are equal to the minimum chunk beg and + # maximum chunk end respectively. + n_chunk = int.from_bytes(f.read(4), **int_kwargs) # always 2 + # Not really a chunk + vpos = int.from_bytes(f.read(8), **int_kwargs) + ref["ref_beg.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT + ref["ref_beg.upos"] = vpos & UNCOMPRESSED_POSITION_MASK + vpos = int.from_bytes(f.read(8), **int_kwargs) + ref["ref_end.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT + ref["ref_end.upos"] = vpos & UNCOMPRESSED_POSITION_MASK + # Not really a chunk + ref["n_mapped"] = int.from_bytes(f.read(8), **int_kwargs) + ref["n_unmapped"] = int.from_bytes(f.read(8), **int_kwargs) + continue + + n_chunk = int.from_bytes(f.read(4), **int_kwargs) + for _ in range(n_chunk): + vpos = int.from_bytes(f.read(8), **int_kwargs) + chunk_beg_cpos = vpos >> COMPRESSED_POSITION_SHIFT + chunk_beg_upos = vpos & UNCOMPRESSED_POSITION_MASK + vpos = int.from_bytes(f.read(8), **int_kwargs) + chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT + chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK + + chunks.append(( + bin_id, + chunk_beg_cpos, + chunk_beg_upos, + chunk_end_cpos, + chunk_end_upos + )) + + ref["bins"] = chunks + + # The "Linear Index" + ref["ioffsets"] = [] + n_intv = int.from_bytes(f.read(4), **int_kwargs) + for _ in range(n_intv): vpos = int.from_bytes(f.read(8), **int_kwargs) - ref["ref_end.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT - ref["ref_end.upos"] = vpos & UNCOMPRESSED_POSITION_MASK - # Not really a chunk - ref["n_mapped"] = int.from_bytes(f.read(8), **int_kwargs) - ref["n_unmapped"] = int.from_bytes(f.read(8), **int_kwargs) - continue - - n_chunk = int.from_bytes(f.read(4), **int_kwargs) - for _ in range(n_chunk): - vpos = int.from_bytes(f.read(8), **int_kwargs) - chunk_beg_cpos = vpos >> COMPRESSED_POSITION_SHIFT - chunk_beg_upos = vpos & UNCOMPRESSED_POSITION_MASK - vpos = int.from_bytes(f.read(8), **int_kwargs) - chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT - chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK - - chunks.append( - (bin_id, chunk_beg_cpos, chunk_beg_upos, chunk_end_cpos, chunk_end_upos)) - - ref["bins"] = chunks - - # The "Linear Index" - ref["ioffsets"] = [] - n_intv = int.from_bytes(f.read(4), **int_kwargs) - for _ in range(n_intv): - vpos = int.from_bytes(f.read(8), **int_kwargs) - ioffset_cpos = vpos >> COMPRESSED_POSITION_SHIFT - ioffset_upos = vpos & UNCOMPRESSED_POSITION_MASK - ref["ioffsets"].append((ioffset_cpos, ioffset_upos)) - - references.append(ref) - - # Check for the optional n_no_coor at the end of the file - try: - n_no_coor = int.from_bytes(f.read(8), **int_kwargs) - except: - n_no_coor = None - - for ref in references: - if not 'bins' in ref: - continue - - ref["bins"] = pd.DataFrame( - ref["bins"], - columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos", - "chunk_end.cpos", "chunk_end.upos"] - ) - ref["ioffsets"] = pd.DataFrame( - ref["ioffsets"], - columns=["ioffset.cpos", "ioffset.upos"] - ) + ioffset_cpos = vpos >> COMPRESSED_POSITION_SHIFT + ioffset_upos = vpos & UNCOMPRESSED_POSITION_MASK + ref["ioffsets"].append((ioffset_cpos, ioffset_upos)) + + references.append(ref) + + # Check for the optional n_no_coor at the end of the file + try: + n_no_coor = int.from_bytes(f.read(8), **int_kwargs) + except Exception: + n_no_coor = None + + for ref in references: + if 'bins' not in ref: + continue + + ref["bins"] = pd.DataFrame( + ref["bins"], + columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos", + "chunk_end.cpos", "chunk_end.upos"] + ) + ref["ioffsets"] = pd.DataFrame( + ref["ioffsets"], + columns=["ioffset.cpos", "ioffset.upos"] + ) return references, n_no_coor @@ -193,19 +199,3 @@ def consolidate_chunks(offsets_uniq: pd.DataFrame) -> pd.DataFrame: "ioffset.upos": "first", "size": "last" }) - - -if __name__ == '__main__': - - os.chdir('./src/dask_ngs') - - # read from the index file to get the data structure - bai, n_no_coor = read_bai('example.bam.bai') - # select the data that defines the byte range offsets in the file - offsets = bai[0]["ioffsets"] - # note that the chunksize in this example is 1MB, not 100MB as is recommended for dask - offsets_uniq = map_offsets_to_chunks(offsets, 1_000_000) - - offset_groups = consolidate_chunks(offsets_uniq) - - offset_groups diff --git a/tests/test_chunk_indexes.py b/tests/test_chunk_indexes.py index 97ac75f..4c80414 100644 --- a/tests/test_chunk_indexes.py +++ b/tests/test_chunk_indexes.py @@ -1,6 +1,9 @@ from __future__ import annotations -import pytest + from pathlib import Path + +import pytest + from dask_ngs import _index @@ -16,7 +19,7 @@ def offsets(example_bai): return bai[0]["ioffsets"] -@pytest.mark.parametrize('offsets, chunksize', +@pytest.mark.parametrize(("offsets", "chunksize"), [('offsets', 500_000), ('offsets', 1_000_000)], indirect=['offsets']) def test_map_offsets_to_chunks(offsets, chunksize): offsets_uniq = _index.map_offsets_to_chunks(offsets, chunksize) @@ -37,7 +40,7 @@ def test_map_offsets_to_chunks(offsets, chunksize): return offsets_uniq -@pytest.mark.parametrize('offsets, chunksize', +@pytest.mark.parametrize(("offsets", "chunksize"), [('offsets', 500_000), ('offsets', 1_000_000)], indirect=['offsets']) def test_consolidate_chunks(offsets, chunksize): offsets_uniq = test_map_offsets_to_chunks(offsets, chunksize)