Skip to content

Commit

Permalink
style: Apply black
Browse files Browse the repository at this point in the history
  • Loading branch information
nvictus committed Jul 23, 2023
1 parent 9439aaa commit f15d3ad
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 60 deletions.
93 changes: 49 additions & 44 deletions src/dask_ngs/_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
BAI_MIN_SHIFT = 14
BAI_DEPTH = 5
COMPRESSED_POSITION_SHIFT = 16
UNCOMPRESSED_POSITION_MASK = 0xffff
UNCOMPRESSED_POSITION_MASK = 0xFFFF
BLOCKSIZE = 65536


def read_bai(path: str):
"""
https://samtools.github.io/hts-specs/SAMv1.pdf
"""
int_kwargs = {'byteorder': 'little', 'signed': False}
int_kwargs = {"byteorder": "little", "signed": False}
with Path(path).open("rb") as f:
# read the 4-byte magic number
f.read(4)
Expand All @@ -27,7 +27,7 @@ def read_bai(path: str):
# read the reference sequence indices
references = []
for i in range(n_ref):
ref = {'ref_id': i}
ref = {"ref_id": i}

# The "Bin Index"
chunks = []
Expand Down Expand Up @@ -66,13 +66,15 @@ def read_bai(path: str):
chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT
chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK

chunks.append((
bin_id,
chunk_beg_cpos,
chunk_beg_upos,
chunk_end_cpos,
chunk_end_upos
))
chunks.append(
(
bin_id,
chunk_beg_cpos,
chunk_beg_upos,
chunk_end_cpos,
chunk_end_upos,
)
)

ref["bins"] = chunks

Expand All @@ -94,17 +96,21 @@ def read_bai(path: str):
n_no_coor = None

for ref in references:
if 'bins' not in ref:
if "bins" not in ref:
continue

ref["bins"] = pd.DataFrame(
ref["bins"],
columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos",
"chunk_end.cpos", "chunk_end.upos"]
columns=[
"bin_id",
"chunk_beg.cpos",
"chunk_beg.upos",
"chunk_end.cpos",
"chunk_end.upos",
],
)
ref["ioffsets"] = pd.DataFrame(
ref["ioffsets"],
columns=["ioffset.cpos", "ioffset.upos"]
ref["ioffsets"], columns=["ioffset.cpos", "ioffset.upos"]
)

return references, n_no_coor
Expand Down Expand Up @@ -146,21 +152,20 @@ def _cumsum_assign_chunks(arr: np.array, thresh: int) -> tuple[np.array, np.arra

def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.DataFrame:
"""Given a dataframe of offset positions, calculate the difference
between each byte offset.
Group those differences into chunks of size `chunksize_bytes`.
Returns:
A Pandas dataframe with additional columns:
chunk_id : int
The chunk index that row was assigned
size : int
The cumulative size of that chunk
between each byte offset.
Group those differences into chunks of size `chunksize_bytes`.
Returns:
A Pandas dataframe with additional columns:
chunk_id : int
The chunk index that row was assigned
size : int
The cumulative size of that chunk
"""

# calculate the difference in byte positions from the prior row
# i.e. current row - previous
offsets["ioffset.cpos.diff"] = offsets['ioffset.cpos'].diff().fillna(
0).astype(int)
offsets["ioffset.cpos.diff"] = offsets["ioffset.cpos"].diff().fillna(0).astype(int)

# group the offsets so
# this produces a dataframe that looks like this:
Expand All @@ -170,13 +175,15 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat
# 456717 | 19251 | 299074
# this represents how far to read each compressed array
# e.g. 38660 + 118983 = 157643
offsets_uniq = offsets.groupby("ioffset.cpos").agg({
"ioffset.upos": "first",
"ioffset.cpos.diff": "first"
}).reset_index()
offsets_uniq = (
offsets.groupby("ioffset.cpos")
.agg({"ioffset.upos": "first", "ioffset.cpos.diff": "first"})
.reset_index()
)

cumsums, chunk_ids = _cumsum_assign_chunks(
offsets_uniq["ioffset.cpos.diff"].to_numpy(), chunksize_bytes)
offsets_uniq["ioffset.cpos.diff"].to_numpy(), chunksize_bytes
)
offsets_uniq["chunk_id"] = chunk_ids
offsets_uniq["size"] = cumsums

Expand All @@ -185,17 +192,15 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat

def consolidate_chunks(offsets_uniq: pd.DataFrame) -> pd.DataFrame:
"""Group the data by `chunk_id`,
keeping the first compressed byte value (`ioffset.cpos`)
and the first uncompressed byte value of that stream (`ioffset.upos`).
Take the last `size` value which tells you how many compressed bytes to read.
Returns:
A Pandas dataframe grouped by `chunk_id`
Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes.
`ioffsets.upos` tells you which byte to read first from the uncompressed data.
keeping the first compressed byte value (`ioffset.cpos`)
and the first uncompressed byte value of that stream (`ioffset.upos`).
Take the last `size` value which tells you how many compressed bytes to read.
Returns:
A Pandas dataframe grouped by `chunk_id`
Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes.
`ioffsets.upos` tells you which byte to read first from the uncompressed data.
"""
return offsets_uniq.groupby("chunk_id").agg({
"ioffset.cpos": "first",
"ioffset.upos": "first",
"size": "last"
})
return offsets_uniq.groupby("chunk_id").agg(
{"ioffset.cpos": "first", "ioffset.upos": "first", "size": "last"}
)
38 changes: 22 additions & 16 deletions tests/test_chunk_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,46 @@ def offsets(example_bai):
return bai[0]["ioffsets"]


@pytest.mark.parametrize(("offsets", "chunksize"),
[('offsets', 500_000), ('offsets', 1_000_000)], indirect=['offsets'])
@pytest.mark.parametrize(
("offsets", "chunksize"),
[("offsets", 500_000), ("offsets", 1_000_000)],
indirect=["offsets"],
)
def test_map_offsets_to_chunks(offsets, chunksize):
offsets_uniq = _index.map_offsets_to_chunks(offsets, chunksize)

cumsum = 0
for i in range(len(offsets_uniq)-1):
for i in range(len(offsets_uniq) - 1):
prev = offsets_uniq.iloc[i]
next = offsets_uniq.iloc[i+1]
next = offsets_uniq.iloc[i + 1]
# validate differences
assert next['ioffset.cpos'] - prev['ioffset.cpos'] \
== next['ioffset.cpos.diff']
cumsum += next['ioffset.cpos.diff']
assert next["ioffset.cpos"] - prev["ioffset.cpos"] == next["ioffset.cpos.diff"]
cumsum += next["ioffset.cpos.diff"]
if cumsum > chunksize:
cumsum = 0
# validate chunk sizes
assert next['size'] == cumsum
assert next['size'] <= chunksize
assert next["size"] == cumsum
assert next["size"] <= chunksize
return offsets_uniq


@pytest.mark.parametrize(("offsets", "chunksize"),
[('offsets', 500_000), ('offsets', 1_000_000)], indirect=['offsets'])
@pytest.mark.parametrize(
("offsets", "chunksize"),
[("offsets", 500_000), ("offsets", 1_000_000)],
indirect=["offsets"],
)
def test_consolidate_chunks(offsets, chunksize):
offsets_uniq = test_map_offsets_to_chunks(offsets, chunksize)
offset_groups = _index.consolidate_chunks(offsets_uniq)

last_cpos = offsets_uniq.groupby('chunk_id').agg({
"ioffset.cpos": "last",
})
last_cpos = offsets_uniq.groupby("chunk_id").agg(
{
"ioffset.cpos": "last",
}
)

# validate that the final edge of the chunk `last_cpos`
# matches the start of the chunk (`ioffset.cpos`) + its `size`
for i in range(len(offset_groups)):
g = offset_groups.iloc[i]
assert g['ioffset.cpos'] + g['size'] \
== last_cpos.iloc[i]['ioffset.cpos']
assert g["ioffset.cpos"] + g["size"] == last_cpos.iloc[i]["ioffset.cpos"]

0 comments on commit f15d3ad

Please sign in to comment.