diff --git a/src/dask_ngs/_index.py b/src/dask_ngs/_index.py index b9e374e..b1e5f2f 100644 --- a/src/dask_ngs/_index.py +++ b/src/dask_ngs/_index.py @@ -8,7 +8,7 @@ BAI_MIN_SHIFT = 14 BAI_DEPTH = 5 COMPRESSED_POSITION_SHIFT = 16 -UNCOMPRESSED_POSITION_MASK = 0xffff +UNCOMPRESSED_POSITION_MASK = 0xFFFF BLOCKSIZE = 65536 @@ -16,7 +16,7 @@ def read_bai(path: str): """ https://samtools.github.io/hts-specs/SAMv1.pdf """ - int_kwargs = {'byteorder': 'little', 'signed': False} + int_kwargs = {"byteorder": "little", "signed": False} with Path(path).open("rb") as f: # read the 4-byte magic number f.read(4) @@ -27,7 +27,7 @@ def read_bai(path: str): # read the reference sequence indices references = [] for i in range(n_ref): - ref = {'ref_id': i} + ref = {"ref_id": i} # The "Bin Index" chunks = [] @@ -66,13 +66,15 @@ def read_bai(path: str): chunk_end_cpos = vpos >> COMPRESSED_POSITION_SHIFT chunk_end_upos = vpos & UNCOMPRESSED_POSITION_MASK - chunks.append(( - bin_id, - chunk_beg_cpos, - chunk_beg_upos, - chunk_end_cpos, - chunk_end_upos - )) + chunks.append( + ( + bin_id, + chunk_beg_cpos, + chunk_beg_upos, + chunk_end_cpos, + chunk_end_upos, + ) + ) ref["bins"] = chunks @@ -94,17 +96,21 @@ def read_bai(path: str): n_no_coor = None for ref in references: - if 'bins' not in ref: + if "bins" not in ref: continue ref["bins"] = pd.DataFrame( ref["bins"], - columns=["bin_id", "chunk_beg.cpos", "chunk_beg.upos", - "chunk_end.cpos", "chunk_end.upos"] + columns=[ + "bin_id", + "chunk_beg.cpos", + "chunk_beg.upos", + "chunk_end.cpos", + "chunk_end.upos", + ], ) ref["ioffsets"] = pd.DataFrame( - ref["ioffsets"], - columns=["ioffset.cpos", "ioffset.upos"] + ref["ioffsets"], columns=["ioffset.cpos", "ioffset.upos"] ) return references, n_no_coor @@ -146,21 +152,20 @@ def _cumsum_assign_chunks(arr: np.array, thresh: int) -> tuple[np.array, np.arra def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.DataFrame: """Given a dataframe of offset positions, calculate the difference - between each byte offset. - Group those differences into chunks of size `chunksize_bytes`. - - Returns: - A Pandas dataframe with additional columns: - chunk_id : int - The chunk index that row was assigned - size : int - The cumulative size of that chunk + between each byte offset. + Group those differences into chunks of size `chunksize_bytes`. + + Returns: + A Pandas dataframe with additional columns: + chunk_id : int + The chunk index that row was assigned + size : int + The cumulative size of that chunk """ # calculate the difference in byte positions from the prior row # i.e. current row - previous - offsets["ioffset.cpos.diff"] = offsets['ioffset.cpos'].diff().fillna( - 0).astype(int) + offsets["ioffset.cpos.diff"] = offsets["ioffset.cpos"].diff().fillna(0).astype(int) # group the offsets so # this produces a dataframe that looks like this: @@ -170,13 +175,15 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat # 456717 | 19251 | 299074 # this represents how far to read each compressed array # e.g. 38660 + 118983 = 157643 - offsets_uniq = offsets.groupby("ioffset.cpos").agg({ - "ioffset.upos": "first", - "ioffset.cpos.diff": "first" - }).reset_index() + offsets_uniq = ( + offsets.groupby("ioffset.cpos") + .agg({"ioffset.upos": "first", "ioffset.cpos.diff": "first"}) + .reset_index() + ) cumsums, chunk_ids = _cumsum_assign_chunks( - offsets_uniq["ioffset.cpos.diff"].to_numpy(), chunksize_bytes) + offsets_uniq["ioffset.cpos.diff"].to_numpy(), chunksize_bytes + ) offsets_uniq["chunk_id"] = chunk_ids offsets_uniq["size"] = cumsums @@ -185,17 +192,15 @@ def map_offsets_to_chunks(offsets: pd.DataFrame, chunksize_bytes: int) -> pd.Dat def consolidate_chunks(offsets_uniq: pd.DataFrame) -> pd.DataFrame: """Group the data by `chunk_id`, - keeping the first compressed byte value (`ioffset.cpos`) - and the first uncompressed byte value of that stream (`ioffset.upos`). - Take the last `size` value which tells you how many compressed bytes to read. - - Returns: - A Pandas dataframe grouped by `chunk_id` - Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes. - `ioffsets.upos` tells you which byte to read first from the uncompressed data. + keeping the first compressed byte value (`ioffset.cpos`) + and the first uncompressed byte value of that stream (`ioffset.upos`). + Take the last `size` value which tells you how many compressed bytes to read. + + Returns: + A Pandas dataframe grouped by `chunk_id` + Now you can decompress the data starting from `ioffset.cpos` and read `size` bytes. + `ioffsets.upos` tells you which byte to read first from the uncompressed data. """ - return offsets_uniq.groupby("chunk_id").agg({ - "ioffset.cpos": "first", - "ioffset.upos": "first", - "size": "last" - }) + return offsets_uniq.groupby("chunk_id").agg( + {"ioffset.cpos": "first", "ioffset.upos": "first", "size": "last"} + )