Skip to content

Commit

Permalink
all but three tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
CarlKCarlK committed Dec 31, 2023
1 parent b793072 commit 028d4fd
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 64 deletions.
177 changes: 124 additions & 53 deletions bed_reader/_open_bed.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from itertools import takewhile, repeat
import logging
import multiprocessing
import os
from dataclasses import dataclass
from itertools import repeat, takewhile
from pathlib import Path
from typing import Any, List, Mapping, Optional, Union
from object_store import ObjectStore
from urllib.parse import urlparse, ParseResult as UrlParseResult
from io import BytesIO

import numpy as np

Expand All @@ -14,14 +15,14 @@
except ImportError:
sparse = None

from .bed_reader import read_f32, read_f64, read_i8, read_cloud_i8 # type: ignore
from .bed_reader import read_f32, read_f64, read_i8, read_cloud_i8, url_to_bytes # type: ignore


# https://stackoverflow.com/questions/845058/how-to-get-line-count-of-a-large-file-cheaply-in-python
def _rawincount(filepath):
with open(filepath, "rb") as f:
bufgen = takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen)
def _rawincount(f):
f.seek(0)
bufgen = takewhile(lambda x: x, (f.read(1024 * 1024) for _ in repeat(None)))
return sum(buf.count(b"\n") for buf in bufgen)


@dataclass
Expand Down Expand Up @@ -208,43 +209,71 @@ class open_bed:

def __init__(
self,
filepath: Union[str, Path],
location: Union[str, Path, UrlParseResult],
iid_count: Optional[int] = None,
sid_count: Optional[int] = None,
properties: Mapping[str, List[Any]] = {},
count_A1: bool = True,
num_threads: Optional[int] = None,
skip_format_check: bool = False,
fam_filepath: Union[str, Path] = None,
bim_filepath: Union[str, Path] = None,
# cmk need docs
object_store: Optional[ObjectStore] = None,
fam_location: Union[str, Path, UrlParseResult] = None,
bim_location: Union[str, Path, UrlParseResult] = None,
):
# cmk need to read the .fam and .bim files and check file from cloud if requested
self.filepath = Path(filepath)
self.location = self._path_or_url(location)
self.count_A1 = count_A1
self._num_threads = num_threads
self.skip_format_check = skip_format_check
self._fam_filepath = (
Path(fam_filepath)
if fam_filepath is not None
else self.filepath.parent / (self.filepath.stem + ".fam")
)
self._bim_filepath = (
Path(bim_filepath)
if bim_filepath is not None
else self.filepath.parent / (self.filepath.stem + ".bim")
)
self._fam_location = (self._path_or_url(fam_location) if fam_location is not None else self._replace_extension(self.location, "fam"))
self._bim_location = (self._path_or_url(bim_location) if bim_location is not None else self._replace_extension(self.location, "bim"))

self.properties_dict, self._counts = open_bed._fix_up_properties(
properties, iid_count, sid_count, use_fill_sequence=False
)
self._iid_range = None
self._sid_range = None
# cmk000
# if not self.skip_format_check:
# with open(self.filepath, "rb") as filepointer:
# self._check_file(filepointer)

@staticmethod
def _replace_extension(location, extension):
if open_bed._is_url(location):
# Split the path and change the extension
path, _ = os.path.splitext(location.path)
new_path = f"{path}.{extension}"

# Create a new ParseResult with the updated path
new_parse_result = UrlParseResult(
scheme=location.scheme,
netloc=location.netloc,
path=new_path,
params=location.params,
query=location.query,
fragment=location.fragment,
)
return new_parse_result
else:
assert isinstance(location, Path) # real assert
return location.parent / (location.stem + "." + extension)

@staticmethod
def _is_url(location):
return isinstance(location, UrlParseResult)

if not self.skip_format_check:
with open(self.filepath, "rb") as filepointer:
self._check_file(filepointer)
@staticmethod
def _path_or_url(input):
if isinstance(input, Path):
return input
if isinstance(input, UrlParseResult):
return input
assert isinstance(input, str), "Expected a string or Path object or UrlParseResult"
parsed = urlparse(input)
if parsed.scheme and "://" in input:
return parsed
else:
return Path(input)

def read(
self,
Expand Down Expand Up @@ -392,23 +421,34 @@ def read(

val = np.zeros((len(iid_index), len(sid_index)), order=order, dtype=dtype)

# cmk similar code in sparse
if self.iid_count > 0 and self.sid_count > 0:
if dtype == np.int8:
# cmk000
reader = read_cloud_i8
file_reader = read_i8
cloud_reader = read_cloud_i8
elif dtype == np.float64:
reader = read_f64
file_reader = read_f64
cloud_reader = None # cmk000
elif dtype == np.float32:
reader = read_f32
file_reader = read_f32
cloud_reader = None # cmk000
else:
raise ValueError(
f"dtype '{val.dtype}' not known, only "
+ "'int8', 'float32', and 'float64' are allowed."
)

if open_bed._is_url(self.location):
reader = cloud_reader
location_str = self.location.geturl()
else:
reader = file_reader
location_str = str(self.location.as_posix())

reader(
# cmk000
str(self.filepath.as_posix()),
location_str,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
Expand All @@ -419,6 +459,7 @@ def read(
)

else:
# cmk assert not a cloud read
if not self.count_A1:
byteZero = 0
byteThree = 2
Expand All @@ -442,7 +483,7 @@ def read(
) # allocate it a little big

nbyte = int(np.ceil(0.25 * self.iid_count))
with open(self.filepath, "rb") as filepointer:
with open(self.location, "rb") as filepointer:
for SNPsIndex, bimIndex in enumerate(sid_index):
startbit = int(np.ceil(0.25 * self.iid_count) * bimIndex + 3)
filepointer.seek(startbit)
Expand Down Expand Up @@ -477,7 +518,7 @@ def read(
return val

def __str__(self) -> str:
return f"{self.__class__.__name__}('{self.filepath}',...)"
return f"{self.__class__.__name__}('{self.location}',...)"

@property
def fid(self) -> np.ndarray:
Expand Down Expand Up @@ -964,18 +1005,24 @@ def sid_count(self) -> np.ndarray:
"""
return self._count("bim")

def _property_filepath(self, suffix):
def _property_location(self, suffix):
if suffix == "fam":
return self._fam_filepath
return self._fam_location
else:
assert suffix == "bim" # real assert
return self._bim_filepath
return self._bim_location

def _count(self, suffix):
count = self._counts[suffix]
if count is None:
count = _rawincount(self._property_filepath(suffix))
self._counts[suffix] = count
location = self._property_location(suffix)
if open_bed._is_url(location):
# should not download twice
file_bytes = bytes(url_to_bytes(location.geturl()))
count = _rawincount(BytesIO(file_bytes))
else:
count = _rawincount(open(location, "rb"))
self._counts[suffix] = count
return count

@staticmethod
Expand Down Expand Up @@ -1140,9 +1187,9 @@ def _fix_up_properties(properties, iid_count, sid_count, use_fill_sequence):
return properties_dict, count_dict

def _read_fam_or_bim(self, suffix):
property_filepath = self._property_filepath(suffix)
property_location = self._property_location(suffix)

logging.info("Loading {0} file {1}".format(suffix, property_filepath))
logging.info("Loading {0} file {1}".format(suffix, property_location))

count = self._counts[suffix]

Expand All @@ -1159,15 +1206,27 @@ def _read_fam_or_bim(self, suffix):
assert list(usecolsdict.values()) == sorted(usecolsdict.values()) # real assert
assert len(usecolsdict) > 0 # real assert

if os.path.getsize(property_filepath) == 0:
columns, row_count = [], 0
if self._is_url(property_location):
file_bytes = bytes(url_to_bytes(property_location.geturl()))
if len(file_bytes) == 0:
columns, row_count = [], 0
else: # cmk similar code
columns, row_count = _read_csv(
BytesIO(file_bytes),
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)
else:
columns, row_count = _read_csv(
property_filepath,
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)
if os.path.getsize(property_location) == 0:
columns, row_count = [], 0
else:
columns, row_count = _read_csv(
property_location,
delimiter=delimiter,
dtype=dtype_dict,
usecols=usecolsdict.values(),
)

if count is None:
self._counts[suffix] = row_count
Expand Down Expand Up @@ -1395,17 +1454,28 @@ def read_sparse(

if self.iid_count > 0 and self.sid_count > 0:
if dtype == np.int8:
reader = read_i8
# cmk000
file_reader = read_i8
cloud_reader = read_cloud_i8
elif dtype == np.float64:
reader = read_f64
file_reader = read_f64
cloud_reader = None # cmk000
elif dtype == np.float32:
reader = read_f32
file_reader = read_f32
cloud_reader = None # cmk000
else:
raise ValueError(
f"dtype '{dtype}' not known, only "
f"dtype '{val.dtype}' not known, only "
+ "'int8', 'float32', and 'float64' are allowed."
)

if open_bed._is_url(self.location):
reader = cloud_reader
location_str = self.location.geturl()
else:
reader = file_reader
location_str = str(self.location.as_posix())

if format == "csc":
val = np.zeros((len(iid_index), batch_size), order=order, dtype=dtype)
for batch_start in range(0, len(sid_index), batch_size):
Expand All @@ -1422,7 +1492,7 @@ def read_sparse(
batch_index = sid_index[batch_slice]

reader(
str(self.filepath),
location_str,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
Expand Down Expand Up @@ -1453,7 +1523,7 @@ def read_sparse(
batch_index = iid_index[batch_slice]

reader(
str(self.filepath),
location_str,
iid_count=self.iid_count,
sid_count=self.sid_count,
is_a1_counted=self.count_A1,
Expand Down Expand Up @@ -1553,3 +1623,4 @@ def _convert_to_dtype(str_arr, dtype):
logging.basicConfig(level=logging.INFO)

pytest.main(["--doctest-modules", __file__])
# cmk000 look for every self.filepath and fam_file and .bim_file
2 changes: 1 addition & 1 deletion bed_reader/tests/test_open_bed.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ def test_fam_bim_filepath(shared_datadir, tmp_path):
)
assert output_file.exists() and fam_file.exists() and bim_file.exists()

with open_bed(output_file, fam_filepath=fam_file, bim_filepath=bim_file) as deb:
with open_bed(output_file, fam_location=fam_file, bim_location=bim_file) as deb:
val2 = deb.read()
assert np.allclose(val, val2, equal_nan=True)
val_sparse = deb.read_sparse()
Expand Down
5 changes: 2 additions & 3 deletions bed_reader/tests/test_open_bed_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@
# import numpy as np
# import pytest

from object_store import ObjectStore
from bed_reader import open_bed


def test_cloud_read1(shared_datadir):
import math

file = shared_datadir / "plink_sim_10s_100v_10pmiss.bed"
object_store = ObjectStore("file://")
file = "file:///" + str(file.as_posix())
# cmk cmk next up, need to see if this is right and need to pass it to Rust

with open_bed(file, object_store=object_store) as bed:
with open_bed(file) as bed:
assert bed.iid_count == 10
assert bed.fid[-1] == "0"
assert bed.iid[-1] == "9"
Expand Down
5 changes: 0 additions & 5 deletions ignore.py

This file was deleted.

6 changes: 4 additions & 2 deletions src/bed_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2214,8 +2214,10 @@ pub struct ObjectPath<TObjectStore>
where
TObjectStore: ObjectStore,
{
object_store: Arc<TObjectStore>,
path: StorePath,
/// cmk doc
pub object_store: Arc<TObjectStore>,
/// cmk doc
pub path: StorePath,
}

impl<TObjectStore> Clone for ObjectPath<TObjectStore>
Expand Down
Loading

0 comments on commit 028d4fd

Please sign in to comment.