diff --git a/bed_reader/_open_bed.py b/bed_reader/_open_bed.py index 3180304..de8511a 100644 --- a/bed_reader/_open_bed.py +++ b/bed_reader/_open_bed.py @@ -1,11 +1,12 @@ +from itertools import takewhile, repeat import logging import multiprocessing import os from dataclasses import dataclass -from itertools import repeat, takewhile from pathlib import Path from typing import Any, List, Mapping, Optional, Union -from object_store import ObjectStore +from urllib.parse import urlparse, ParseResult as UrlParseResult +from io import BytesIO import numpy as np @@ -14,14 +15,14 @@ except ImportError: sparse = None -from .bed_reader import read_f32, read_f64, read_i8, read_cloud_i8 # type: ignore +from .bed_reader import read_f32, read_f64, read_i8, read_cloud_i8, url_to_bytes # type: ignore # https://stackoverflow.com/questions/845058/how-to-get-line-count-of-a-large-file-cheaply-in-python -def _rawincount(filepath): - with open(filepath, "rb") as f: - bufgen = takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in repeat(None))) - return sum(buf.count(b"\n") for buf in bufgen) +def _rawincount(f): + f.seek(0) + bufgen = takewhile(lambda x: x, (f.read(1024 * 1024) for _ in repeat(None))) + return sum(buf.count(b"\n") for buf in bufgen) @dataclass @@ -208,43 +209,71 @@ class open_bed: def __init__( self, - filepath: Union[str, Path], + location: Union[str, Path, UrlParseResult], iid_count: Optional[int] = None, sid_count: Optional[int] = None, properties: Mapping[str, List[Any]] = {}, count_A1: bool = True, num_threads: Optional[int] = None, skip_format_check: bool = False, - fam_filepath: Union[str, Path] = None, - bim_filepath: Union[str, Path] = None, - # cmk need docs - object_store: Optional[ObjectStore] = None, + fam_location: Union[str, Path, UrlParseResult] = None, + bim_location: Union[str, Path, UrlParseResult] = None, ): # cmk need to read the .fam and .bim files and check file from cloud if requested - self.filepath = Path(filepath) + self.location = self._path_or_url(location) self.count_A1 = count_A1 self._num_threads = num_threads self.skip_format_check = skip_format_check - self._fam_filepath = ( - Path(fam_filepath) - if fam_filepath is not None - else self.filepath.parent / (self.filepath.stem + ".fam") - ) - self._bim_filepath = ( - Path(bim_filepath) - if bim_filepath is not None - else self.filepath.parent / (self.filepath.stem + ".bim") - ) + self._fam_location = (self._path_or_url(fam_location) if fam_location is not None else self._replace_extension(self.location, "fam")) + self._bim_location = (self._path_or_url(bim_location) if bim_location is not None else self._replace_extension(self.location, "bim")) self.properties_dict, self._counts = open_bed._fix_up_properties( properties, iid_count, sid_count, use_fill_sequence=False ) self._iid_range = None self._sid_range = None + # cmk000 + # if not self.skip_format_check: + # with open(self.filepath, "rb") as filepointer: + # self._check_file(filepointer) + + @staticmethod + def _replace_extension(location, extension): + if open_bed._is_url(location): + # Split the path and change the extension + path, _ = os.path.splitext(location.path) + new_path = f"{path}.{extension}" + + # Create a new ParseResult with the updated path + new_parse_result = UrlParseResult( + scheme=location.scheme, + netloc=location.netloc, + path=new_path, + params=location.params, + query=location.query, + fragment=location.fragment, + ) + return new_parse_result + else: + assert isinstance(location, Path) # real assert + return location.parent / (location.stem + "." + extension) + + @staticmethod + def _is_url(location): + return isinstance(location, UrlParseResult) - if not self.skip_format_check: - with open(self.filepath, "rb") as filepointer: - self._check_file(filepointer) + @staticmethod + def _path_or_url(input): + if isinstance(input, Path): + return input + if isinstance(input, UrlParseResult): + return input + assert isinstance(input, str), "Expected a string or Path object or UrlParseResult" + parsed = urlparse(input) + if parsed.scheme and "://" in input: + return parsed + else: + return Path(input) def read( self, @@ -392,23 +421,34 @@ def read( val = np.zeros((len(iid_index), len(sid_index)), order=order, dtype=dtype) + # cmk similar code in sparse if self.iid_count > 0 and self.sid_count > 0: if dtype == np.int8: # cmk000 - reader = read_cloud_i8 + file_reader = read_i8 + cloud_reader = read_cloud_i8 elif dtype == np.float64: - reader = read_f64 + file_reader = read_f64 + cloud_reader = None # cmk000 elif dtype == np.float32: - reader = read_f32 + file_reader = read_f32 + cloud_reader = None # cmk000 else: raise ValueError( f"dtype '{val.dtype}' not known, only " + "'int8', 'float32', and 'float64' are allowed." ) + if open_bed._is_url(self.location): + reader = cloud_reader + location_str = self.location.geturl() + else: + reader = file_reader + location_str = str(self.location.as_posix()) + reader( # cmk000 - str(self.filepath.as_posix()), + location_str, iid_count=self.iid_count, sid_count=self.sid_count, is_a1_counted=self.count_A1, @@ -419,6 +459,7 @@ def read( ) else: + # cmk assert not a cloud read if not self.count_A1: byteZero = 0 byteThree = 2 @@ -442,7 +483,7 @@ def read( ) # allocate it a little big nbyte = int(np.ceil(0.25 * self.iid_count)) - with open(self.filepath, "rb") as filepointer: + with open(self.location, "rb") as filepointer: for SNPsIndex, bimIndex in enumerate(sid_index): startbit = int(np.ceil(0.25 * self.iid_count) * bimIndex + 3) filepointer.seek(startbit) @@ -477,7 +518,7 @@ def read( return val def __str__(self) -> str: - return f"{self.__class__.__name__}('{self.filepath}',...)" + return f"{self.__class__.__name__}('{self.location}',...)" @property def fid(self) -> np.ndarray: @@ -964,18 +1005,24 @@ def sid_count(self) -> np.ndarray: """ return self._count("bim") - def _property_filepath(self, suffix): + def _property_location(self, suffix): if suffix == "fam": - return self._fam_filepath + return self._fam_location else: assert suffix == "bim" # real assert - return self._bim_filepath + return self._bim_location def _count(self, suffix): count = self._counts[suffix] if count is None: - count = _rawincount(self._property_filepath(suffix)) - self._counts[suffix] = count + location = self._property_location(suffix) + if open_bed._is_url(location): + # should not download twice + file_bytes = bytes(url_to_bytes(location.geturl())) + count = _rawincount(BytesIO(file_bytes)) + else: + count = _rawincount(open(location, "rb")) + self._counts[suffix] = count return count @staticmethod @@ -1140,9 +1187,9 @@ def _fix_up_properties(properties, iid_count, sid_count, use_fill_sequence): return properties_dict, count_dict def _read_fam_or_bim(self, suffix): - property_filepath = self._property_filepath(suffix) + property_location = self._property_location(suffix) - logging.info("Loading {0} file {1}".format(suffix, property_filepath)) + logging.info("Loading {0} file {1}".format(suffix, property_location)) count = self._counts[suffix] @@ -1159,15 +1206,27 @@ def _read_fam_or_bim(self, suffix): assert list(usecolsdict.values()) == sorted(usecolsdict.values()) # real assert assert len(usecolsdict) > 0 # real assert - if os.path.getsize(property_filepath) == 0: - columns, row_count = [], 0 + if self._is_url(property_location): + file_bytes = bytes(url_to_bytes(property_location.geturl())) + if len(file_bytes) == 0: + columns, row_count = [], 0 + else: # cmk similar code + columns, row_count = _read_csv( + BytesIO(file_bytes), + delimiter=delimiter, + dtype=dtype_dict, + usecols=usecolsdict.values(), + ) else: - columns, row_count = _read_csv( - property_filepath, - delimiter=delimiter, - dtype=dtype_dict, - usecols=usecolsdict.values(), - ) + if os.path.getsize(property_location) == 0: + columns, row_count = [], 0 + else: + columns, row_count = _read_csv( + property_location, + delimiter=delimiter, + dtype=dtype_dict, + usecols=usecolsdict.values(), + ) if count is None: self._counts[suffix] = row_count @@ -1395,17 +1454,28 @@ def read_sparse( if self.iid_count > 0 and self.sid_count > 0: if dtype == np.int8: - reader = read_i8 + # cmk000 + file_reader = read_i8 + cloud_reader = read_cloud_i8 elif dtype == np.float64: - reader = read_f64 + file_reader = read_f64 + cloud_reader = None # cmk000 elif dtype == np.float32: - reader = read_f32 + file_reader = read_f32 + cloud_reader = None # cmk000 else: raise ValueError( - f"dtype '{dtype}' not known, only " + f"dtype '{val.dtype}' not known, only " + "'int8', 'float32', and 'float64' are allowed." ) + if open_bed._is_url(self.location): + reader = cloud_reader + location_str = self.location.geturl() + else: + reader = file_reader + location_str = str(self.location.as_posix()) + if format == "csc": val = np.zeros((len(iid_index), batch_size), order=order, dtype=dtype) for batch_start in range(0, len(sid_index), batch_size): @@ -1422,7 +1492,7 @@ def read_sparse( batch_index = sid_index[batch_slice] reader( - str(self.filepath), + location_str, iid_count=self.iid_count, sid_count=self.sid_count, is_a1_counted=self.count_A1, @@ -1453,7 +1523,7 @@ def read_sparse( batch_index = iid_index[batch_slice] reader( - str(self.filepath), + location_str, iid_count=self.iid_count, sid_count=self.sid_count, is_a1_counted=self.count_A1, @@ -1553,3 +1623,4 @@ def _convert_to_dtype(str_arr, dtype): logging.basicConfig(level=logging.INFO) pytest.main(["--doctest-modules", __file__]) +# cmk000 look for every self.filepath and fam_file and .bim_file diff --git a/bed_reader/tests/test_open_bed.py b/bed_reader/tests/test_open_bed.py index ff281ce..4fca6dc 100644 --- a/bed_reader/tests/test_open_bed.py +++ b/bed_reader/tests/test_open_bed.py @@ -786,7 +786,7 @@ def test_fam_bim_filepath(shared_datadir, tmp_path): ) assert output_file.exists() and fam_file.exists() and bim_file.exists() - with open_bed(output_file, fam_filepath=fam_file, bim_filepath=bim_file) as deb: + with open_bed(output_file, fam_location=fam_file, bim_location=bim_file) as deb: val2 = deb.read() assert np.allclose(val, val2, equal_nan=True) val_sparse = deb.read_sparse() diff --git a/bed_reader/tests/test_open_bed_cloud.py b/bed_reader/tests/test_open_bed_cloud.py index 9a8a19e..6886f20 100644 --- a/bed_reader/tests/test_open_bed_cloud.py +++ b/bed_reader/tests/test_open_bed_cloud.py @@ -6,7 +6,6 @@ # import numpy as np # import pytest -from object_store import ObjectStore from bed_reader import open_bed @@ -14,10 +13,10 @@ def test_cloud_read1(shared_datadir): import math file = shared_datadir / "plink_sim_10s_100v_10pmiss.bed" - object_store = ObjectStore("file://") + file = "file:///" + str(file.as_posix()) # cmk cmk next up, need to see if this is right and need to pass it to Rust - with open_bed(file, object_store=object_store) as bed: + with open_bed(file) as bed: assert bed.iid_count == 10 assert bed.fid[-1] == "0" assert bed.iid[-1] == "9" diff --git a/ignore.py b/ignore.py deleted file mode 100644 index bfa5010..0000000 --- a/ignore.py +++ /dev/null @@ -1,5 +0,0 @@ -from bed_reader import open_bed, sample_file - -file_name = sample_file("sparse.bed") -with open_bed(file_name) as bed: - val_sparse = bed.read_sparse(dtype="int8") diff --git a/src/bed_cloud.rs b/src/bed_cloud.rs index 81da059..51d6525 100644 --- a/src/bed_cloud.rs +++ b/src/bed_cloud.rs @@ -2214,8 +2214,10 @@ pub struct ObjectPath<TObjectStore> where TObjectStore: ObjectStore, { - object_store: Arc<TObjectStore>, - path: StorePath, + /// cmk doc + pub object_store: Arc<TObjectStore>, + /// cmk doc + pub path: StorePath, } impl<TObjectStore> Clone for ObjectPath<TObjectStore> diff --git a/src/python_module.rs b/src/python_module.rs index fbb436b..53fee81 100644 --- a/src/python_module.rs +++ b/src/python_module.rs @@ -49,6 +49,24 @@ fn bed_reader(_py: Python<'_>, m: &PyModule) -> PyResult<()> { } } + #[pyfn(m)] + fn url_to_bytes(location: &str) -> Result<Vec<u8>, PyErr> { + let rt = runtime::Runtime::new().unwrap(); // cmk unwrap? + + let url = Url::parse(location).unwrap(); // cmk return a BedReader URL parse error + let (object_store, store_path): (Box<dyn ObjectStore>, StorePath) = + object_store::parse_url(&url).unwrap(); // cmk return a BedReader URL parse error + let object_path: ObjectPath<Box<dyn ObjectStore>> = (object_store, store_path).into(); + + rt.block_on(async { + let get_result = object_path.get().await?; + let bytes = get_result.bytes().await.unwrap(); // cmk ??? + let vec: Vec<u8> = bytes.to_vec(); + Ok(vec) + }) + } + + // cmk rename "filename" to "location" #[pyfn(m)] #[allow(clippy::too_many_arguments)] fn read_f64( @@ -710,6 +728,7 @@ fn bed_reader(_py: Python<'_>, m: &PyModule) -> PyResult<()> { Ok(()) } + // cmk // fn bed_reader(_py: Python, m: &PyModule) -> PyResult<()> { // m.add_function(wrap_pyfunction!(read_cloud_i8, m)?)?; // Ok(()) @@ -717,3 +736,5 @@ fn bed_reader(_py: Python<'_>, m: &PyModule) -> PyResult<()> { Ok(()) } + +// cmk on both rust and python side, when counting bim and fam files, also parse them -- don't read them twice.