Skip to content

Commit

Permalink
Fix linting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
nvictus committed Jul 23, 2023
1 parent 2c6bac2 commit 9439aaa
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 104 deletions.
192 changes: 91 additions & 101 deletions src/dask_ngs/_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from __future__ import annotations

from pathlib import Path

import numpy as np
import pandas as pd


BAI_MIN_SHIFT = 14
BAI_DEPTH = 5
COMPRESSED_POSITION_SHIFT = 16
Expand All @@ -16,90 +17,95 @@ def read_bai(path: str):
https://samtools.github.io/hts-specs/SAMv1.pdf
"""
int_kwargs = {'byteorder': 'little', 'signed': False}
f = open(path, "rb")
# read the 4-byte magic number
magic = f.read(4)

# read the number of reference sequences
n_ref = int.from_bytes(f.read(4), **int_kwargs)

# read the reference sequence indices
references = []
for i in range(n_ref):
ref = {'ref_id': i}

# The "Bin Index"
chunks = []
n_bin = int.from_bytes(f.read(4), **int_kwargs)
for _ in range(n_bin):
# bin number
bin_id = int.from_bytes(f.read(4), **int_kwargs)

if bin_id == 37450:
# This is an entry that describes the "pseudo-bin" for the
# reference, using the same byte layout as normal bins but
# interpreted differently.
# The ref beg/ref end fields locate the first and last reads on
# this reference sequence, whether they are mapped or placed
# unmapped. Thus they are equal to the minimum chunk beg and
# maximum chunk end respectively.
n_chunk = int.from_bytes(f.read(4), **int_kwargs) # always 2
# Not really a chunk
vpos = int.from_bytes(f.read(8), **int_kwargs)
ref["ref_beg.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT
ref["ref_beg.upos"] = vpos & UNCOMPRESSED_POSITION_MASK
with Path(path).open("rb") as f:
# read the 4-byte magic number
f.read(4)

# read the number of reference sequences
n_ref = int.from_bytes(f.read(4), **int_kwargs)

# read the reference sequence indices
references = []
for i in range(n_ref):
ref = {'ref_id': i}

# The "Bin Index"
chunks = []
n_bin = int.from_bytes(f.read(4), **int_kwargs)
for _ in range(n_bin):
# bin number
bin_id = int.from_bytes(f.read(4), **int_kwargs)

if bin_id == 37450:
# This is an entry that describes the "pseudo-bin" for the
# reference, using the same byte layout as normal bins but
# interpreted differently.
# The ref beg/ref end fields locate the first and last reads on
# this reference sequence, whether they are mapped or placed
# unmapped. Thus they are equal to the minimum chunk beg and
# maximum chunk end respectively.
n_chunk = int.from_bytes(f.read(4), **int_kwargs) # always 2
# Not really a chunk
vpos = int.from_bytes(f.read(8), **int_kwargs)
ref["ref_beg.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT
ref["ref_beg.upos"] = vpos & UNCOMPRESSED_POSITION_MASK
vpos = int.from_bytes(f.read(8), **int_kwargs)
ref["ref_end.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT
ref["ref_end.upos"] = vpos & UNCOMPRESSED_POSITION_MASK
# Not really a chunk
ref["n_mapped"] = int.from_bytes(f.read(8), **int_kwargs)
ref["n_unmapped"] = int.from_bytes(f.read(8), **int_kwargs)
continue

n_chunk = int.from_bytes(f.read(4), **int_kwargs)
for _ in range(n_chunk):
vpos = int.from_bytes(f.read(8), **int_kwargs)
chunk_beg_cpos = vpos >> COMPRESSED_POSITION_SHIFT
chunk_beg_upos = vpos & UNCOMPRESSED_POSITION_MASK
vpos = int.from_bytes(f.read(8), **int_kwargs)
chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT
chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK

chunks.append((
bin_id,
chunk_beg_cpos,
chunk_beg_upos,
chunk_end_cpos,
chunk_end_upos
))

ref["bins"] = chunks

# The "Linear Index"
ref["ioffsets"] = []
n_intv = int.from_bytes(f.read(4), **int_kwargs)
for _ in range(n_intv):
vpos = int.from_bytes(f.read(8), **int_kwargs)
ref["ref_end.cpos"] = vpos >> COMPRESSED_POSITION_SHIFT
ref["ref_end.upos"] = vpos & UNCOMPRESSED_POSITION_MASK
# Not really a chunk
ref["n_mapped"] = int.from_bytes(f.read(8), **int_kwargs)
ref["n_unmapped"] = int.from_bytes(f.read(8), **int_kwargs)
continue

n_chunk = int.from_bytes(f.read(4), **int_kwargs)
for _ in range(n_chunk):
vpos = int.from_bytes(f.read(8), **int_kwargs)
chunk_beg_cpos = vpos >> COMPRESSED_POSITION_SHIFT
chunk_beg_upos = vpos & UNCOMPRESSED_POSITION_MASK
vpos = int.from_bytes(f.read(8), **int_kwargs)
chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT
chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK

chunks.append(
(bin_id, chunk_beg_cpos, chunk_beg_upos, chunk_end_cpos, chunk_end_upos))

ref["bins"] = chunks

# The "Linear Index"
ref["ioffsets"] = []
n_intv = int.from_bytes(f.read(4), **int_kwargs)
for _ in range(n_intv):
vpos = int.from_bytes(f.read(8), **int_kwargs)
ioffset_cpos = vpos >> COMPRESSED_POSITION_SHIFT
ioffset_upos = vpos & UNCOMPRESSED_POSITION_MASK
ref["ioffsets"].append((ioffset_cpos, ioffset_upos))

references.append(ref)

# Check for the optional n_no_coor at the end of the file
try:
n_no_coor = int.from_bytes(f.read(8), **int_kwargs)
except:
n_no_coor = None

for ref in references:
if not 'bins' in ref:
continue

ref["bins"] = pd.DataFrame(
ref["bins"],
columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos",
"chunk_end.cpos", "chunk_end.upos"]
)
ref["ioffsets"] = pd.DataFrame(
ref["ioffsets"],
columns=["ioffset.cpos", "ioffset.upos"]
)
ioffset_cpos = vpos >> COMPRESSED_POSITION_SHIFT
ioffset_upos = vpos & UNCOMPRESSED_POSITION_MASK
ref["ioffsets"].append((ioffset_cpos, ioffset_upos))

references.append(ref)

# Check for the optional n_no_coor at the end of the file
try:
n_no_coor = int.from_bytes(f.read(8), **int_kwargs)
except Exception:
n_no_coor = None

for ref in references:
if 'bins' not in ref:
continue

ref["bins"] = pd.DataFrame(
ref["bins"],
columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos",
"chunk_end.cpos", "chunk_end.upos"]
)
ref["ioffsets"] = pd.DataFrame(
ref["ioffsets"],
columns=["ioffset.cpos", "ioffset.upos"]
)

return references, n_no_coor

Expand Down Expand Up @@ -193,19 +199,3 @@ def consolidate_chunks(offsets_uniq: pd.DataFrame) -> pd.DataFrame:
"ioffset.upos": "first",
"size": "last"
})


if __name__ == '__main__':

os.chdir('./src/dask_ngs')

# read from the index file to get the data structure
bai, n_no_coor = read_bai('example.bam.bai')
# select the data that defines the byte range offsets in the file
offsets = bai[0]["ioffsets"]
# note that the chunksize in this example is 1MB, not 100MB as is recommended for dask
offsets_uniq = map_offsets_to_chunks(offsets, 1_000_000)

offset_groups = consolidate_chunks(offsets_uniq)

offset_groups
9 changes: 6 additions & 3 deletions tests/test_chunk_indexes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations
import pytest

from pathlib import Path

import pytest

from dask_ngs import _index


Expand All @@ -16,7 +19,7 @@ def offsets(example_bai):
return bai[0]["ioffsets"]


@pytest.mark.parametrize('offsets, chunksize',
@pytest.mark.parametrize(("offsets", "chunksize"),
[('offsets', 500_000), ('offsets', 1_000_000)], indirect=['offsets'])
def test_map_offsets_to_chunks(offsets, chunksize):
offsets_uniq = _index.map_offsets_to_chunks(offsets, chunksize)
Expand All @@ -37,7 +40,7 @@ def test_map_offsets_to_chunks(offsets, chunksize):
return offsets_uniq


@pytest.mark.parametrize('offsets, chunksize',
@pytest.mark.parametrize(("offsets", "chunksize"),
[('offsets', 500_000), ('offsets', 1_000_000)], indirect=['offsets'])
def test_consolidate_chunks(offsets, chunksize):
offsets_uniq = test_map_offsets_to_chunks(offsets, chunksize)
Expand Down

0 comments on commit 9439aaa

Please sign in to comment.