Skip to content

Commit

Permalink
style: Apply black
Browse files Browse the repository at this point in the history
  • Loading branch information
nvictus committed Jul 23, 2023
1 parent 9439aaa commit f4aa180
Showing 1 changed file with 49 additions and 44 deletions.
93 changes: 49 additions & 44 deletions src/dask_ngs/_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
BAI_MIN_SHIFT = 14
BAI_DEPTH = 5
COMPRESSED_POSITION_SHIFT = 16
UNCOMPRESSED_POSITION_MASK = 0xffff
UNCOMPRESSED_POSITION_MASK = 0xFFFF
BLOCKSIZE = 65536


def read_bai(path: str):
"""
https://samtools.github.io/hts-specs/SAMv1.pdf
"""
int_kwargs = {'byteorder': 'little', 'signed': False}
int_kwargs = {"byteorder": "little", "signed": False}
with Path(path).open("rb") as f:
# read the 4-byte magic number
f.read(4)
Expand All @@ -27,7 +27,7 @@ def read_bai(path: str):
# read the reference sequence indices
references = []
for i in range(n_ref):
ref = {'ref_id': i}
ref = {"ref_id": i}

# The "Bin Index"
chunks = []
Expand Down Expand Up @@ -66,13 +66,15 @@ def read_bai(path: str):
chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT
chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK

chunks.append((
bin_id,
chunk_beg_cpos,
chunk_beg_upos,
chunk_end_cpos,
chunk_end_upos
))
chunks.append(
(
bin_id,
chunk_beg_cpos,
chunk_beg_upos,
chunk_end_cpos,
chunk_end_upos,
)
)

ref["bins"] = chunks

Expand All @@ -94,17 +96,21 @@ def read_bai(path: str):
n_no_coor = None

for ref in references:
if 'bins' not in ref:
if "bins" not in ref:
continue

ref["bins"] = pd.DataFrame(
ref["bins"],
columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos",
"chunk_end.cpos", "chunk_end.upos"]
columns=[
"bin_id",
"chunk_beg.cpos",
"chunk_beg.upos",
"chunk_end.cpos",
"chunk_end.upos",
],
)
ref["ioffsets"] = pd.DataFrame(
ref["ioffsets"],
columns=["ioffset.cpos", "ioffset.upos"]
ref["ioffsets"], columns=["ioffset.cpos", "ioffset.upos"]
)

return references, n_no_coor
Expand Down Expand Up @@ -146,21 +152,20 @@ def _cumsum_assign_chunks(arr: np.array, thresh: int) -> tuple[np.array, np.arra

def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.DataFrame:
"""Given a dataframe of offset positions, calculate the difference
between each byte offset.
Group those differences into chunks of size `chunksize_bytes`.
Returns:
A Pandas dataframe with additional columns:
chunk_id : int
The chunk index that row was assigned
size : int
The cumulative size of that chunk
between each byte offset.
Group those differences into chunks of size `chunksize_bytes`.
Returns:
A Pandas dataframe with additional columns:
chunk_id : int
The chunk index that row was assigned
size : int
The cumulative size of that chunk
"""

# calculate the difference in byte positions from the prior row
# i.e. current row - previous
offsets["ioffset.cpos.diff"] = offsets['ioffset.cpos'].diff().fillna(
0).astype(int)
offsets["ioffset.cpos.diff"] = offsets["ioffset.cpos"].diff().fillna(0).astype(int)

# group the offsets so
# this produces a dataframe that looks like this:
Expand All @@ -170,13 +175,15 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat
# 456717 | 19251 | 299074
# this represents how far to read each compressed array
# e.g. 38660 + 118983 = 157643
offsets_uniq = offsets.groupby("ioffset.cpos").agg({
"ioffset.upos": "first",
"ioffset.cpos.diff": "first"
}).reset_index()
offsets_uniq = (
offsets.groupby("ioffset.cpos")
.agg({"ioffset.upos": "first", "ioffset.cpos.diff": "first"})
.reset_index()
)

cumsums, chunk_ids = _cumsum_assign_chunks(
offsets_uniq["ioffset.cpos.diff"].to_numpy(), chunksize_bytes)
offsets_uniq["ioffset.cpos.diff"].to_numpy(), chunksize_bytes
)
offsets_uniq["chunk_id"] = chunk_ids
offsets_uniq["size"] = cumsums

Expand All @@ -185,17 +192,15 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat

def consolidate_chunks(offsets_uniq: pd.DataFrame) -> pd.DataFrame:
"""Group the data by `chunk_id`,
keeping the first compressed byte value (`ioffset.cpos`)
and the first uncompressed byte value of that stream (`ioffset.upos`).
Take the last `size` value which tells you how many compressed bytes to read.
Returns:
A Pandas dataframe grouped by `chunk_id`
Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes.
`ioffsets.upos` tells you which byte to read first from the uncompressed data.
keeping the first compressed byte value (`ioffset.cpos`)
and the first uncompressed byte value of that stream (`ioffset.upos`).
Take the last `size` value which tells you how many compressed bytes to read.
Returns:
A Pandas dataframe grouped by `chunk_id`
Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes.
`ioffsets.upos` tells you which byte to read first from the uncompressed data.
"""
return offsets_uniq.groupby("chunk_id").agg({
"ioffset.cpos": "first",
"ioffset.upos": "first",
"size": "last"
})
return offsets_uniq.groupby("chunk_id").agg(
{"ioffset.cpos": "first", "ioffset.upos": "first", "size": "last"}
)

0 comments on commit f4aa180

Please sign in to comment.