diff --git a/apis/python/src/tiledbsoma/_dataframe.py b/apis/python/src/tiledbsoma/_dataframe.py index 89e3bbdc3a..973b3e01d7 100644 --- a/apis/python/src/tiledbsoma/_dataframe.py +++ b/apis/python/src/tiledbsoma/_dataframe.py @@ -28,7 +28,6 @@ from . import pytiledbsoma as clib from ._constants import SOMA_JOINID from ._exception import SOMAError, map_exception_for_create -from ._query_condition import QueryCondition from ._read_iters import TableReadIter from ._soma_array import SOMAArray from ._tdb_handles import DataFrameWrapper @@ -722,31 +721,16 @@ def read( _util.check_unpartitioned(partitions) self._check_open_read() - handle = self._handle._handle - - context = handle.context() - if platform_config is not None: - config = context.tiledb_config.copy() - config.update(platform_config) - context = clib.SOMAContext(config) - - sr = clib.SOMADataFrame.open( - uri=handle.uri, - mode=clib.OpenMode.read, - context=context, - column_names=column_names or [], + # TODO: batch_size + return TableReadIter( + array=self, + coords=coords, + column_names=column_names, result_order=_util.to_clib_result_order(result_order), - timestamp=handle.timestamp and (0, handle.timestamp), + value_filter=value_filter, + platform_config=platform_config, ) - if value_filter is not None: - sr.set_condition(QueryCondition(value_filter), handle.schema) - - _util._set_coords(sr, coords) - - # TODO: batch_size - return TableReadIter(sr) - def write( self, values: pa.Table, platform_config: Optional[options.PlatformConfig] = None ) -> Self: diff --git a/apis/python/src/tiledbsoma/_dense_nd_array.py b/apis/python/src/tiledbsoma/_dense_nd_array.py index 509a2b2627..ef65f7c9d9 100644 --- a/apis/python/src/tiledbsoma/_dense_nd_array.py +++ b/apis/python/src/tiledbsoma/_dense_nd_array.py @@ -20,6 +20,7 @@ from ._arrow_types import pyarrow_to_carrow_type from ._common_nd_array import NDArray from ._exception import SOMAError, map_exception_for_create +from ._read_iters import TableReadIter from ._tdb_handles import DenseNDArrayWrapper from ._types import OpenTimestamp, Slice from ._util import dense_indices_to_shape @@ -232,42 +233,20 @@ def read( data_shape = tuple(handle.shape if use_shape else ned) target_shape = dense_indices_to_shape(coords, data_shape, result_order) - context = handle.context() - if platform_config is not None: - config = context.tiledb_config.copy() - config.update(platform_config) - context = clib.SOMAContext(config) - - sr = clib.SOMADenseNDArray.open( - uri=handle.uri, - mode=clib.OpenMode.read, - context=context, + arrow_table = TableReadIter( + array=self, + coords=coords, column_names=[], result_order=_util.to_clib_result_order(result_order), - timestamp=handle.timestamp and (0, handle.timestamp), - ) + value_filter=None, + platform_config=platform_config, + ).concat() - _util._set_coords(sr, coords) - - arrow_tables = [] - while True: - arrow_table_piece = sr.read_next() - if not arrow_table_piece: - break - arrow_tables.append(arrow_table_piece) - - # For dense arrays there is no zero-output case: attempting to make a test case - # to do that, say by indexing a 10x20 array by positions 888 and 999, results - # in read-time errors of the form - # - # [TileDB::Subarray] Error: Cannot add range to dimension 'soma_dim_0'; Range [888, 888] is - # out of domain bounds [0, 9] - if not arrow_tables: + if arrow_table is None: raise SOMAError( "internal error: at least one table-piece should have been returned" ) - arrow_table = pa.concat_tables(arrow_tables) npval = arrow_table.column("soma_data").to_numpy() # TODO: as currently coded we're looking at the non-empty domain upper # bound but not its lower bound. That works fine if data are written at @@ -310,7 +289,7 @@ def write( """ _util.check_type("values", values, (pa.Tensor,)) - clib_dense_array = self._handle._handle + clib_handle = self._handle._handle # Compute the coordinates for the dense array. new_coords: List[Union[int, Slice[int], None]] = [] @@ -331,13 +310,16 @@ def write( if not input.flags.contiguous: input = np.ascontiguousarray(input) order = clib.ResultOrder.rowmajor - clib_dense_array.reset(result_order=order) - _util._set_coords(clib_dense_array, new_coords) - clib_dense_array.write(input) + + mq = clib.ManagedQuery(clib_handle, clib_handle.context()) + mq.set_layout(order) + _util._set_coords(mq, clib_handle, new_coords) + mq.set_soma_data(input) + mq.submit_write() tiledb_write_options = TileDBWriteOptions.from_platform_config(platform_config) if tiledb_write_options.consolidate_and_vacuum: - clib_dense_array.consolidate_and_vacuum() + clib_handle.consolidate_and_vacuum() return self @classmethod diff --git a/apis/python/src/tiledbsoma/_point_cloud_dataframe.py b/apis/python/src/tiledbsoma/_point_cloud_dataframe.py index 1a4a8aa456..df5bd44baa 100644 --- a/apis/python/src/tiledbsoma/_point_cloud_dataframe.py +++ b/apis/python/src/tiledbsoma/_point_cloud_dataframe.py @@ -28,7 +28,6 @@ _revise_domain_for_extent, ) from ._exception import SOMAError, map_exception_for_create -from ._query_condition import QueryCondition from ._read_iters import TableReadIter from ._spatial_dataframe import SpatialDataFrame from ._spatial_util import ( @@ -332,31 +331,16 @@ def read( _util.check_unpartitioned(partitions) self._check_open_read() - handle = self._handle._handle - - context = handle.context() - if platform_config is not None: - config = context.tiledb_config.copy() - config.update(platform_config) - context = clib.SOMAContext(config) - - sr = clib.SOMAPointCloudDataFrame.open( - uri=handle.uri, - mode=clib.OpenMode.read, - context=context, - column_names=column_names or [], + # TODO: batch_size + return TableReadIter( + array=self, + coords=coords, + column_names=column_names, result_order=_util.to_clib_result_order(result_order), - timestamp=handle.timestamp and (0, handle.timestamp), + value_filter=value_filter, + platform_config=platform_config, ) - if value_filter is not None: - sr.set_condition(QueryCondition(value_filter), handle.schema) - - _util._set_coords(sr, coords) - - # # TODO: batch_size - return TableReadIter(sr) - def read_spatial_region( self, region: Optional[options.SpatialRegion] = None, diff --git a/apis/python/src/tiledbsoma/_read_iters.py b/apis/python/src/tiledbsoma/_read_iters.py index bec8cc81d8..92a8198ca7 100644 --- a/apis/python/src/tiledbsoma/_read_iters.py +++ b/apis/python/src/tiledbsoma/_read_iters.py @@ -13,7 +13,6 @@ from typing import ( TYPE_CHECKING, Any, - Dict, Iterator, List, Optional, @@ -39,11 +38,13 @@ from ._exception import SOMAError from ._fastercsx import CompressedMatrix from ._indexer import IntIndexer +from ._query_condition import QueryCondition from ._types import NTuple from .options import SOMATileDBContext if TYPE_CHECKING: from . import SparseNDArray + from ._soma_array import SOMAArray # Convenience types @@ -66,8 +67,47 @@ class TableReadIter(somacore.ReadIter[pa.Table]): """Iterator over `Arrow Table `_ elements""" - def __init__(self, sr: clib.SOMAArray): - self._reader = _arrow_table_reader(sr) + def __init__( + self, + array: SOMAArray, + coords: Union[ + options.SparseDFCoords, options.SparseNDCoords, options.DenseNDCoords + ], + column_names: Optional[Sequence[str]], + result_order: clib.ResultOrder, + value_filter: Optional[str], + platform_config: Optional[options.PlatformConfig], + ): + """Initalizes a new TableReadIter for SOMAArrays. + + Args: + array (SOMAArray): + The NDArray, DataFrame, or SpatialDataFrame being read. + + coords (Union[ + options.SparseDFCoords, options.SparseNDCoords, options.DenseNDCoords + ]): + for each index dimension, which rows to read. + ``()`` means no constraint -- all IDs. + + column_names (Optional[Sequence[str]]): + The named columns to read and return. + ``None`` means no constraint -- all column names. + + result_order (clib.ResultOrder): + Order of read results. + This can be one of automatic, rowmajor, or colmajor. + + value_filter (Optional[str]): + An optional [value filter] to apply to the results. + + platform_config (Optional[options.PlatformConfig]): + Pass in parameters for tuning reads. + + """ + self._reader = ArrowTableRead( + array, coords, column_names, result_order, value_filter, platform_config + ) def __next__(self) -> pa.Table: return next(self._reader) @@ -87,10 +127,11 @@ class BlockwiseReadIterBase(somacore.ReadIter[_RT], metaclass=abc.ABCMeta): def __init__( self, - array: "SparseNDArray", - sr: clib.SOMAArray, + array: SOMAArray, coords: options.SparseNDCoords, axis: Union[int, Sequence[int]], + result_order: clib.ResultOrder, + platform_config: Optional[options.PlatformConfig], *, size: Optional[Union[int, Sequence[int]]] = None, reindex_disable_on_axis: Optional[Union[int, Sequence[int]]] = None, @@ -99,10 +140,12 @@ def __init__( ): super().__init__() - self.ndim = len(sr.shape) self.array = array - self.sr = sr + self.shape = array._handle._handle.shape + self.ndim = len(self.shape) self.eager = eager + self.result_order = result_order + self.platform_config = platform_config # Assign a thread pool from the context, or create a new one if no context # is available @@ -115,7 +158,7 @@ def __init__( # raises on various error checks, AND normalizes args self.axis, self.size, self.reindex_disable_on_axis = self._validate_args( - sr.shape, axis, size, reindex_disable_on_axis + self.shape, axis, size, reindex_disable_on_axis ) self.major_axis = self.axis[0] @@ -125,11 +168,7 @@ def __init__( self.joinids: List[pa.Array] = [ pa.array( np.concatenate( - list( - _coords_strider( - self.coords[d], self.sr.shape[d], self.sr.shape[d] - ) - ) + list(_coords_strider(self.coords[d], self.shape[d], self.shape[d])) ) if d != self.major_axis else np.array([], dtype=np.int64) @@ -230,20 +269,26 @@ def _maybe_eager_iterator( def _table_reader(self) -> Iterator[BlockwiseTableReadIterResult]: """Private. Blockwise table reader. Helper function for sub-class use""" - kwargs: Dict[str, object] = {"result_order": self.sr.result_order} for coord_chunk in _coords_strider( self.coords[self.major_axis], - self.sr.shape[self.major_axis], + self.shape[self.major_axis], self.size[0], ): - self.sr.reset(**kwargs) step_coords = list(self.coords) step_coords[self.major_axis] = coord_chunk - _util._set_coords(self.sr, step_coords) joinids = list(self.joinids) joinids[self.major_axis] = pa.array(coord_chunk) - yield pa.concat_tables(_arrow_table_reader(self.sr)), tuple(joinids) + yield pa.concat_tables( + ArrowTableRead( + array=self.array, + coords=step_coords, + column_names=[], # select all columns + result_order=self.result_order, + value_filter=None, + platform_config=self.platform_config, + ) + ), tuple(joinids) def _reindexed_table_reader( self, @@ -286,10 +331,11 @@ class BlockwiseScipyReadIter(BlockwiseReadIterBase[BlockwiseScipyReadIterResult] def __init__( self, - array: "SparseNDArray", - sr: clib.SOMAArray, + array: SOMAArray, coords: options.SparseNDCoords, axis: Union[int, Sequence[int]], + result_order: clib.ResultOrder, + platform_config: Optional[options.PlatformConfig], *, size: Optional[Union[int, Sequence[int]]] = None, reindex_disable_on_axis: Optional[Union[int, Sequence[int]]] = None, @@ -301,9 +347,10 @@ def __init__( self.context = context super().__init__( array, - sr, coords, axis, + result_order, + platform_config, size=size, reindex_disable_on_axis=reindex_disable_on_axis, eager=eager, @@ -311,7 +358,7 @@ def __init__( ) if ( - len(self.sr.shape) != 2 + len(self.shape) != 2 or len(self.coords) > 2 or self.major_axis not in [0, 1] ): @@ -365,7 +412,7 @@ def _mk_shape( self, major_coords: npt.NDArray[np.int64], minor_coords: npt.NDArray[np.int64] ) -> Tuple[int, int]: """Private. Make shape of this iterator step""" - shape = cast(Tuple[int, int], tuple(self.sr.shape)) + shape = cast(Tuple[int, int], tuple(self.shape)) assert len(shape) == 2 _sp_shape: List[int] = list(shape) @@ -393,7 +440,7 @@ def _coo_reader( ) # SOMA disallows duplicates. Canonical implies sorted row-major, no dups - if self.sr.result_order == clib.ResultOrder.rowmajor: + if self.result_order == clib.ResultOrder.rowmajor: sp.has_canonical_format = True yield sp, indices @@ -426,20 +473,41 @@ def _cs_reader( class SparseTensorReadIterBase(somacore.ReadIter[_RT], metaclass=abc.ABCMeta): """Private implementation class""" - def __init__(self, sr: clib.SOMAArray, shape: NTuple): - self.sr = sr + def __init__( + self, + array: SparseNDArray, + coords: options.SparseDFCoords, + shape: NTuple, + result_order: clib.ResultOrder, + platform_config: Optional[options.PlatformConfig], + ): + self.array = array + self.coords = coords self.shape = shape + self.result_order = result_order + self.platform_config = platform_config + + clib_handle = array._handle._handle + + if platform_config is not None: + cfg = clib_handle.context().config() + cfg.update(platform_config) + ctx = clib.SOMAContext(cfg) + else: + ctx = clib_handle.context() + + self.mq = clib.ManagedQuery(clib_handle, ctx) + + self.mq.set_layout(result_order) + + _util._set_coords(self.mq, clib_handle, coords) @abc.abstractmethod def _from_table(self, arrow_table: pa.Table) -> _RT: raise NotImplementedError() def __next__(self) -> _RT: - arrow_table = self.sr.read_next() - if arrow_table is None: - raise StopIteration - - return self._from_table(arrow_table) + return self._from_table(self.mq.next()) def concat(self) -> _RT: """Returns all the requested data in a single operation. @@ -447,7 +515,16 @@ def concat(self) -> _RT: If some data has already been retrieved using ``next``, this will return the rest of the data after that is already returned. """ - arrow_tables = pa.concat_tables(TableReadIter(self.sr)) + arrow_tables = pa.concat_tables( + TableReadIter( + array=self.array, + coords=self.coords, + column_names=[], # select all columns + result_order=self.result_order, + value_filter=None, + platform_config=self.platform_config, + ) + ) return self._from_table(arrow_tables) @@ -465,12 +542,41 @@ def _from_table(self, arrow_table: pa.Table) -> pa.SparseCOOTensor: return pa.SparseCOOTensor.from_numpy(coo_data, coo_coords, shape=self.shape) -def _arrow_table_reader(sr: clib.SOMAArray) -> Iterator[pa.Table]: - """Private. Simple Table iterator on any Array""" - tbl = sr.read_next() - while tbl is not None: - yield tbl - tbl = sr.read_next() +class ArrowTableRead(Iterator[pa.Table]): + def __init__( + self, + array: SOMAArray, + coords: Union[ + options.SparseDFCoords, options.SparseNDCoords, options.DenseNDCoords + ], + column_names: Optional[Sequence[str]], + result_order: clib.ResultOrder, + value_filter: Optional[str], + platform_config: Optional[options.PlatformConfig], + ): + clib_handle = array._handle._handle + + if platform_config is not None: + cfg = clib_handle.context().config() + cfg.update(platform_config) + ctx = clib.SOMAContext(cfg) + else: + ctx = clib_handle.context() + + self.mq = clib.ManagedQuery(clib_handle, ctx) + + self.mq.set_layout(result_order) + + if column_names is not None: + self.mq.select_columns(list(column_names)) + + if value_filter is not None: + self.mq.set_condition(QueryCondition(value_filter), clib_handle.schema) + + _util._set_coords(self.mq, clib_handle, coords) + + def __next__(self) -> pa.Table: + return self.mq.next() def _coords_strider( diff --git a/apis/python/src/tiledbsoma/_sparse_nd_array.py b/apis/python/src/tiledbsoma/_sparse_nd_array.py index 37fbc71d6b..60891c9629 100644 --- a/apis/python/src/tiledbsoma/_sparse_nd_array.py +++ b/apis/python/src/tiledbsoma/_sparse_nd_array.py @@ -266,28 +266,16 @@ def read( * Negative indexing is unsupported. """ del batch_size # Currently unused. - handle: clib.SOMASparseNDArray = self._handle._handle - self._check_open_read() _util.check_unpartitioned(partitions) - context = handle.context() - if platform_config is not None: - config = context.tiledb_config.copy() - config.update(platform_config) - context = clib.SOMAContext(config) - - sr = clib.SOMASparseNDArray.open( - uri=handle.uri, - mode=clib.OpenMode.read, - context=context, - column_names=[], + return SparseNDArrayRead( + array=self, + coords=coords, result_order=_util.to_clib_result_order(result_order), - timestamp=handle.timestamp and (0, handle.timestamp), + platform_config=platform_config, ) - return SparseNDArrayRead(sr, self, coords) - def write( self, values: Union[ @@ -525,18 +513,20 @@ class _SparseNDArrayReadBase(somacore.SparseRead): def __init__( self, - sr: clib.SOMAArray, array: SparseNDArray, coords: options.SparseNDCoords, + result_order: clib.ResultOrder, + platform_config: Optional[options.PlatformConfig], ): """ Lifecycle: Maturing. """ - self.sr = sr - self.shape = tuple(sr.shape) self.array = array self.coords = coords + self.shape = tuple(array._handle._handle.shape) + self.result_order = result_order + self.platform_config = platform_config class SparseNDArrayRead(_SparseNDArrayReadBase): @@ -570,8 +560,13 @@ def coos(self, shape: Optional[NTuple] = None) -> SparseCOOTensorReadIter: """ if shape is not None and (len(shape) != len(self.shape)): raise ValueError(f"shape must be a tuple of size {len(self.shape)}") - _util._set_coords(self.sr, self.coords) - return SparseCOOTensorReadIter(self.sr, shape or self.shape) + return SparseCOOTensorReadIter( + self.array, + self.coords, + shape or self.shape, + self.result_order, + self.platform_config, + ) def tables(self) -> TableReadIter: """ @@ -581,8 +576,14 @@ def tables(self) -> TableReadIter: Lifecycle: Maturing. """ - _util._set_coords(self.sr, self.coords) - return TableReadIter(self.sr) + return TableReadIter( + array=self.array, + coords=self.coords, + column_names=[], + result_order=self.result_order, + value_filter=None, + platform_config=self.platform_config, + ) def blockwise( self, @@ -653,10 +654,11 @@ def blockwise( Maturing. """ return SparseNDArrayBlockwiseRead( - self.sr, self.array, self.coords, axis, + self.result_order, + self.platform_config, size=size, reindex_disable_on_axis=reindex_disable_on_axis, eager=eager, @@ -666,16 +668,18 @@ def blockwise( class SparseNDArrayBlockwiseRead(_SparseNDArrayReadBase): def __init__( self, - sr: clib.SOMAArray, array: SparseNDArray, coords: options.SparseNDCoords, axis: Union[int, Sequence[int]], + result_order: clib.ResultOrder, + platform_config: Optional[options.PlatformConfig], *, size: Optional[Union[int, Sequence[int]]], reindex_disable_on_axis: Optional[Union[int, Sequence[int]]], eager: bool = True, ): - super().__init__(sr, array, coords) + super().__init__(array, coords, result_order, platform_config) + self.result_order = result_order self.axis = axis self.size = size self.reindex_disable_on_axis = reindex_disable_on_axis @@ -696,9 +700,10 @@ def tables(self) -> BlockwiseTableReadIter: """ return BlockwiseTableReadIter( self.array, - self.sr, self.coords, self.axis, + self.result_order, + self.platform_config, size=self.size, reindex_disable_on_axis=self.reindex_disable_on_axis, eager=self.eager, @@ -741,9 +746,10 @@ def scipy(self, *, compress: bool = True) -> BlockwiseScipyReadIter: """ return BlockwiseScipyReadIter( self.array, - self.sr, self.coords, self.axis, + self.result_order, + self.platform_config, size=self.size, compress=compress, reindex_disable_on_axis=self.reindex_disable_on_axis, diff --git a/apis/python/src/tiledbsoma/_util.py b/apis/python/src/tiledbsoma/_util.py index ea118f82e0..e007ab9c16 100644 --- a/apis/python/src/tiledbsoma/_util.py +++ b/apis/python/src/tiledbsoma/_util.py @@ -464,7 +464,9 @@ def _cast_domainish(domainish: List[Any]) -> Tuple[Tuple[object, object], ...]: return tuple(result) -def _set_coords(sarr: clib.SOMAArray, coords: options.SparseNDCoords) -> None: +def _set_coords( + mq: clib.ManagedQuery, sarr: clib.SOMAArray, coords: options.SparseNDCoords +) -> None: if not is_nonstringy_sequence(coords): raise TypeError( f"coords type {type(coords)} must be a regular sequence," @@ -478,10 +480,12 @@ def _set_coords(sarr: clib.SOMAArray, coords: options.SparseNDCoords) -> None: ) for i, coord in enumerate(coords): - _set_coord(i, sarr, coord) + _set_coord(i, mq, sarr, coord) -def _set_coord(dim_idx: int, sarr: clib.SOMAArray, coord: object) -> None: +def _set_coord( + dim_idx: int, mq: clib.ManagedQuery, sarr: clib.SOMAArray, coord: object +) -> None: if coord is None: return @@ -489,19 +493,19 @@ def _set_coord(dim_idx: int, sarr: clib.SOMAArray, coord: object) -> None: dom = _cast_domainish(sarr.domain())[dim_idx] if isinstance(coord, (str, bytes)): - sarr.set_dim_points_string_or_bytes(dim.name, [coord]) + mq.set_dim_points_string_or_bytes(dim.name, [coord]) return if isinstance(coord, (pa.Array, pa.ChunkedArray)): - sarr.set_dim_points_arrow(dim.name, coord) + mq.set_dim_points_arrow(dim.name, coord) return if isinstance(coord, (Sequence, np.ndarray)): - _set_coord_by_py_seq_or_np_array(sarr, dim, coord) + _set_coord_by_py_seq_or_np_array(mq, dim, coord) return if isinstance(coord, int): - sarr.set_dim_points_int64(dim.name, [coord]) + mq.set_dim_points_int64(dim.name, [coord]) return # Note: slice(None, None) matches the is_slice_of part, unless we also check @@ -520,7 +524,7 @@ def _set_coord(dim_idx: int, sarr: clib.SOMAArray, coord: object) -> None: _, stop = ned[dim_idx] else: stop = coord.stop - sarr.set_dim_ranges_string_or_bytes(dim.name, [(start, stop)]) + mq.set_dim_ranges_string_or_bytes(dim.name, [(start, stop)]) return # Note: slice(None, None) matches the is_slice_of part, unless we also check @@ -543,21 +547,21 @@ def _set_coord(dim_idx: int, sarr: clib.SOMAArray, coord: object) -> None: else: istop = ts_dom[1].as_py() - sarr.set_dim_ranges_int64(dim.name, [(istart, istop)]) + mq.set_dim_ranges_int64(dim.name, [(istart, istop)]) return if isinstance(coord, slice): validate_slice(coord) if coord.start is None and coord.stop is None: return - _set_coord_by_numeric_slice(sarr, dim, dom, coord) + _set_coord_by_numeric_slice(mq, dim, dom, coord) return raise TypeError(f"unhandled type {dim.type} for index column named {dim.name}") def _set_coord_by_py_seq_or_np_array( - sarr: clib.SOMAArray, dim: pa.Field, coord: object + mq: clib.ManagedQuery, dim: pa.Field, coord: object ) -> None: if isinstance(coord, np.ndarray): if coord.ndim != 1: @@ -566,7 +570,7 @@ def _set_coord_by_py_seq_or_np_array( ) try: - set_dim_points = getattr(sarr, f"set_dim_points_{dim.type}") + set_dim_points = getattr(mq, f"set_dim_points_{dim.type}") except AttributeError: # We have to handle this type specially below pass @@ -575,7 +579,7 @@ def _set_coord_by_py_seq_or_np_array( return if pa_types_is_string_or_bytes(dim.type): - sarr.set_dim_points_string_or_bytes(dim.name, coord) + mq.set_dim_points_string_or_bytes(dim.name, coord) return if pa.types.is_timestamp(dim.type): @@ -586,14 +590,14 @@ def _set_coord_by_py_seq_or_np_array( icoord = [ int(e.astype("int64")) if isinstance(e, np.datetime64) else e for e in coord ] - sarr.set_dim_points_int64(dim.name, icoord) + mq.set_dim_points_int64(dim.name, icoord) return raise ValueError(f"unhandled type {dim.type} for index column named {dim.name}") def _set_coord_by_numeric_slice( - sarr: clib.SOMAArray, dim: pa.Field, dom: Tuple[object, object], coord: Slice[Any] + mq: clib.ManagedQuery, dim: pa.Field, dom: Tuple[object, object], coord: Slice[Any] ) -> None: try: lo_hi = slice_to_numeric_range(coord, dom) @@ -604,7 +608,7 @@ def _set_coord_by_numeric_slice( return try: - set_dim_range = getattr(sarr, f"set_dim_ranges_{dim.type}") + set_dim_range = getattr(mq, f"set_dim_ranges_{dim.type}") set_dim_range(dim.name, [lo_hi]) return except AttributeError: diff --git a/apis/python/src/tiledbsoma/managed_query.cc b/apis/python/src/tiledbsoma/managed_query.cc index 7de08619bc..e3865f5237 100644 --- a/apis/python/src/tiledbsoma/managed_query.cc +++ b/apis/python/src/tiledbsoma/managed_query.cc @@ -117,7 +117,10 @@ void load_managed_query(py::module& m) { "names"_a, "if_not_empty"_a = false) - .def("submit_read", &ManagedQuery::submit_read) + .def( + "submit_read", + &ManagedQuery::submit_read, + py::call_guard()) .def( "results", [](ManagedQuery& mq) -> std::optional { @@ -133,6 +136,27 @@ void load_managed_query(py::module& m) { } }) + .def( + "next", + [](ManagedQuery& mq) -> std::optional { + // Release python GIL before reading data + py::gil_scoped_release release; + std::optional> tbl; + try { + tbl = mq.read_next(); + // Acquire python GIL before accessing python objects + } catch (const std::exception& e) { + throw TileDBSOMAError(e.what()); + } + py::gil_scoped_acquire acquire; + + if (!tbl) { + throw py::stop_iteration(); + } + + return to_table(tbl); + }) + .def( "set_array_data", [](ManagedQuery& mq, py::handle py_batch) { @@ -143,6 +167,7 @@ void load_managed_query(py::module& m) { py_batch.attr("_export_to_c")( arrow_array_ptr, arrow_schema_ptr); + py::gil_scoped_release release; try { mq.set_array_data( std::make_unique(arrow_schema), @@ -150,6 +175,7 @@ void load_managed_query(py::module& m) { } catch (const std::exception& e) { TPY_ERROR_LOC(e.what()); } + py::gil_scoped_acquire acquire; arrow_schema.release(&arrow_schema); arrow_array.release(&arrow_array); @@ -158,17 +184,21 @@ void load_managed_query(py::module& m) { "set_soma_data", [](ManagedQuery& mq, py::array data) { py::buffer_info data_info = data.request(); + + py::gil_scoped_release release; mq.setup_write_column( "soma_data", data.size(), (const void*)data_info.ptr, static_cast(nullptr), static_cast(nullptr)); + py::gil_scoped_acquire acquire; }) .def( "submit_write", &ManagedQuery::submit_write, - "sort_coords"_a = false) + "sort_coords"_a = false, + py::call_guard()) .def("reset", &ManagedQuery::reset) .def("close", &ManagedQuery::close) diff --git a/apis/python/tests/test_dataframe.py b/apis/python/tests/test_dataframe.py index df2dff0a50..c607cfcf7f 100644 --- a/apis/python/tests/test_dataframe.py +++ b/apis/python/tests/test_dataframe.py @@ -21,10 +21,10 @@ def arrow_schema(): def _schema(): return pa.schema( [ - pa.field("foo", pa.int64()), - pa.field("bar", pa.float64()), - pa.field("baz", pa.string()), - pa.field("quux", pa.bool_()), + pa.field("myint", pa.int64()), + pa.field("myfloat", pa.float64()), + pa.field("mystring", pa.string()), + pa.field("mybool", pa.bool_()), ] ) @@ -36,10 +36,10 @@ def test_dataframe(tmp_path, arrow_schema): asch = pa.schema( [ - ("foo", pa.int32()), - ("bar", pa.float64()), - ("baz", pa.large_string()), - ("quux", pa.bool_()), + ("myint", pa.int32()), + ("myfloat", pa.float64()), + ("mystring", pa.large_string()), + ("mybool", pa.bool_()), ] ) @@ -55,7 +55,7 @@ def test_dataframe(tmp_path, arrow_schema): # nonexistent indexed column soma.DataFrame.create(uri, schema=asch, index_column_names=["bogus"]) soma.DataFrame.create( - uri, schema=asch, index_column_names=["foo"], domain=[[0, 99]] + uri, schema=asch, index_column_names=["myint"], domain=[[0, 99]] ).close() assert soma.DataFrame.exists(uri) @@ -67,7 +67,7 @@ def test_dataframe(tmp_path, arrow_schema): assert len(sdf) == 0 assert sorted(sdf.schema.names) == sorted( - ["foo", "bar", "baz", "soma_joinid", "quux"] + ["myint", "myfloat", "mystring", "soma_joinid", "mybool"] ) assert sorted(sdf.keys()) == sorted(sdf.schema.names) @@ -76,10 +76,10 @@ def test_dataframe(tmp_path, arrow_schema): for _ in range(3): pydict = {} pydict["soma_joinid"] = [0, 1, 2, 3, 4] - pydict["foo"] = [10, 20, 30, 40, 50] - pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5] - pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"] - pydict["quux"] = [True, False, False, True, False] + pydict["myint"] = [10, 20, 30, 40, 50] + pydict["myfloat"] = [4.1, 5.2, 6.3, 7.4, 8.5] + pydict["mystring"] = ["apple", "ball", "cat", "dog", "egg"] + pydict["mybool"] = [True, False, False, True, False] rb = pa.Table.from_pydict(pydict) sdf.tiledbsoma_resize_soma_joinid_shape(len(rb)) @@ -108,20 +108,20 @@ def test_dataframe(tmp_path, arrow_schema): assert table.num_rows == 5 assert table.num_columns == 5 assert [e.as_py() for e in table["soma_joinid"]] == pydict["soma_joinid"] - assert [e.as_py() for e in table["foo"]] == pydict["foo"] - assert [e.as_py() for e in table["bar"]] == pydict["bar"] - assert [e.as_py() for e in table["baz"]] == pydict["baz"] - assert [e.as_py() for e in table["quux"]] == pydict["quux"] + assert [e.as_py() for e in table["myint"]] == pydict["myint"] + assert [e.as_py() for e in table["myfloat"]] == pydict["myfloat"] + assert [e.as_py() for e in table["mystring"]] == pydict["mystring"] + assert [e.as_py() for e in table["mybool"]] == pydict["mybool"] # Read ids table = sdf.read(coords=[[30, 10]]).concat() assert table.num_rows == 2 assert table.num_columns == 5 assert sorted([e.as_py() for e in table["soma_joinid"]]) == [0, 2] - assert sorted([e.as_py() for e in table["foo"]]) == [10, 30] - assert sorted([e.as_py() for e in table["bar"]]) == [4.1, 6.3] - assert sorted([e.as_py() for e in table["baz"]]) == ["apple", "cat"] - assert [e.as_py() for e in table["quux"]] == [True, False] + assert sorted([e.as_py() for e in table["myint"]]) == [10, 30] + assert sorted([e.as_py() for e in table["myfloat"]]) == [4.1, 6.3] + assert sorted([e.as_py() for e in table["mystring"]]) == ["apple", "cat"] + assert [e.as_py() for e in table["mybool"]] == [True, False] # Open and read with bindings with contextlib.closing( @@ -133,18 +133,18 @@ def test_dataframe(tmp_path, arrow_schema): assert table.num_rows == 5 assert table.num_columns == 5 assert [e.as_py() for e in table["soma_joinid"]] == pydict["soma_joinid"] - assert [e.as_py() for e in table["foo"]] == pydict["foo"] - assert [e.as_py() for e in table["bar"]] == pydict["bar"] - assert [e.as_py() for e in table["baz"]] == pydict["baz"] - assert [e.as_py() for e in table["quux"]] == pydict["quux"] + assert [e.as_py() for e in table["myint"]] == pydict["myint"] + assert [e.as_py() for e in table["myfloat"]] == pydict["myfloat"] + assert [e.as_py() for e in table["mystring"]] == pydict["mystring"] + assert [e.as_py() for e in table["mybool"]] == pydict["mybool"] with soma.DataFrame.open(uri) as A: cfg = A.config_options_from_schema() assert not cfg.allows_duplicates - assert json.loads(cfg.dims)["foo"]["filters"] == [ + assert json.loads(cfg.dims)["myint"]["filters"] == [ {"COMPRESSION_LEVEL": 3, "name": "ZSTD"} ] - assert json.loads(cfg.attrs)["bar"]["filters"] == [ + assert json.loads(cfg.attrs)["myfloat"]["filters"] == [ {"COMPRESSION_LEVEL": -1, "name": "ZSTD"} ] @@ -202,16 +202,16 @@ def test_dataframe_reopen(tmp_path, arrow_schema): def test_dataframe_with_float_dim(tmp_path, arrow_schema): sdf = soma.DataFrame.create( - tmp_path.as_posix(), schema=arrow_schema(), index_column_names=("bar",) + tmp_path.as_posix(), schema=arrow_schema(), index_column_names=("myfloat",) ) - assert sdf.index_column_names == ("bar",) + assert sdf.index_column_names == ("myfloat",) def test_dataframe_with_enumeration(tmp_path): schema = pa.schema( [ - pa.field("foo", pa.dictionary(pa.int64(), pa.large_string())), - pa.field("bar", pa.dictionary(pa.int64(), pa.large_string())), + pa.field("myint", pa.dictionary(pa.int64(), pa.large_string())), + pa.field("myfloat", pa.dictionary(pa.int64(), pa.large_string())), ] ) enums = {"enmr1": ("a", "bb", "ccc"), "enmr2": ("cat", "dog")} @@ -220,19 +220,19 @@ def test_dataframe_with_enumeration(tmp_path): ) as sdf: data = {} data["soma_joinid"] = [0, 1, 2, 3, 4] - data["foo"] = ["a", "bb", "ccc", "bb", "a"] - data["bar"] = ["cat", "dog", "cat", "cat", "cat"] + data["myint"] = ["a", "bb", "ccc", "bb", "a"] + data["myfloat"] = ["cat", "dog", "cat", "cat", "cat"] with pytest.raises(soma.SOMAError): sdf.write(pa.Table.from_pydict(data)) - data["foo"] = pd.Categorical(["a", "bb", "ccc", "bb", "a"]) - data["bar"] = pd.Categorical(["cat", "dog", "cat", "cat", "cat"]) + data["myint"] = pd.Categorical(["a", "bb", "ccc", "bb", "a"]) + data["myfloat"] = pd.Categorical(["cat", "dog", "cat", "cat", "cat"]) sdf.write(pa.Table.from_pydict(data)) with soma.DataFrame.open(tmp_path.as_posix()) as sdf: df = sdf.read().concat() - np.testing.assert_array_equal(df["foo"].chunk(0).dictionary, enums["enmr1"]) - np.testing.assert_array_equal(df["bar"].chunk(0).dictionary, enums["enmr2"]) + np.testing.assert_array_equal(df["myint"].chunk(0).dictionary, enums["enmr1"]) + np.testing.assert_array_equal(df["myfloat"].chunk(0).dictionary, enums["enmr2"]) @pytest.fixture @@ -1749,8 +1749,8 @@ def test_only_evolve_schema_when_enmr_is_extended(tmp_path): schema = pa.schema( [ - pa.field("foo", pa.dictionary(pa.int64(), pa.large_string())), - pa.field("bar", pa.large_string()), + pa.field("myint", pa.dictionary(pa.int64(), pa.large_string())), + pa.field("myfloat", pa.large_string()), ] ) @@ -1759,32 +1759,32 @@ def test_only_evolve_schema_when_enmr_is_extended(tmp_path): with soma.DataFrame.create(uri, schema=schema, domain=[[0, 4]]) as sdf: data = {} data["soma_joinid"] = [0, 1, 2, 3, 4] - data["foo"] = pd.Categorical(["a", "bb", "ccc", "bb", "a"]) - data["bar"] = ["cat", "dog", "cat", "cat", "cat"] + data["myint"] = pd.Categorical(["a", "bb", "ccc", "bb", "a"]) + data["myfloat"] = ["cat", "dog", "cat", "cat", "cat"] sdf.write(pa.Table.from_pydict(data)) # +1 evolving the schema with soma.DataFrame.open(uri, "w") as sdf: data = {} data["soma_joinid"] = [0, 1, 2, 3, 4] - data["foo"] = pd.Categorical(["a", "bb", "ccc", "d", "a"]) - data["bar"] = ["cat", "dog", "cat", "cat", "cat"] + data["myint"] = pd.Categorical(["a", "bb", "ccc", "d", "a"]) + data["myfloat"] = ["cat", "dog", "cat", "cat", "cat"] sdf.write(pa.Table.from_pydict(data)) # +0 no changes to enumeration values with soma.DataFrame.open(uri, "w") as sdf: data = {} data["soma_joinid"] = [0, 1, 2, 3, 4] - data["foo"] = pd.Categorical(["a", "bb", "ccc", "d", "a"]) - data["bar"] = ["cat", "dog", "cat", "cat", "cat"] + data["myint"] = pd.Categorical(["a", "bb", "ccc", "d", "a"]) + data["myfloat"] = ["cat", "dog", "cat", "cat", "cat"] sdf.write(pa.Table.from_pydict(data)) # +0 no changes enumeration values with soma.DataFrame.open(uri, "w") as sdf: data = {} data["soma_joinid"] = [0, 1, 2, 3, 4] - data["foo"] = pd.Categorical(["a", "bb", "ccc", "d", "d"]) - data["bar"] = ["cat", "dog", "cat", "cat", "cat"] + data["myint"] = pd.Categorical(["a", "bb", "ccc", "d", "d"]) + data["myfloat"] = ["cat", "dog", "cat", "cat", "cat"] sdf.write(pa.Table.from_pydict(data)) # total 3 fragment files @@ -1899,3 +1899,47 @@ def test_bounds_on_somajoinid_domain(tmp_path): ) assert soma.DataFrame.exists(uri) + + +def test_pass_configs(tmp_path, arrow_schema): + uri = tmp_path.as_posix() + + with soma.DataFrame.create(uri, schema=arrow_schema()) as sdf: + pydict = {} + pydict["soma_joinid"] = [0, 1, 2, 3, 4] + pydict["myint"] = [10, 20, 30, 40, 50] + pydict["myfloat"] = [4.1, 5.2, 6.3, 7.4, 8.5] + pydict["mystring"] = ["apple", "ball", "cat", "dog", "egg"] + pydict["mybool"] = [True, False, False, True, False] + rb = pa.Table.from_pydict(pydict) + sdf.tiledbsoma_resize_soma_joinid_shape(len(rb)) + sdf.write(rb) + + # Pass a custom config to open + with soma.DataFrame.open( + uri, + "r", + context=soma.SOMATileDBContext( + {"sm.mem.total_budget": "0", "sm.io_concurrency_level": "0"} + ), + ) as sdf: + + # This errors out as 0 is not a valid value to set the total memory + # budget or nummber of threads + with pytest.raises(soma.SOMAError): + next(sdf.read()) + + # This still errors out because read still sees that the number of + # threads is 0 and therefore invalid + with pytest.raises(soma.SOMAError): + next(sdf.read(platform_config={"sm.mem.total_budget": "10000"})) + + # With correct values, this reads without issue + next( + sdf.read( + platform_config={ + "sm.mem.total_budget": "10000", + "sm.io_concurrency_level": "1", + } + ) + ) diff --git a/apis/python/tests/test_dense_nd_array.py b/apis/python/tests/test_dense_nd_array.py index dea231f295..693b96da4a 100644 --- a/apis/python/tests/test_dense_nd_array.py +++ b/apis/python/tests/test_dense_nd_array.py @@ -498,3 +498,45 @@ def test_read_to_unwritten_array(tmp_path, shape): with soma.DenseNDArray.open(uri, "r") as A: assert np.array_equal(np.ones(shape) * 255, A.read().to_numpy()) + + +def test_pass_configs(tmp_path): + uri = tmp_path.as_posix() + + with soma.DenseNDArray.create( + tmp_path.as_posix(), + type=pa.uint8(), + shape=(2, 2), + context=SOMATileDBContext(timestamp=1), + ) as a: + a.write( + (slice(0, 2), slice(0, 2)), + pa.Tensor.from_numpy(np.zeros((2, 2), dtype=np.uint8)), + ) + + # Pass a custom config to open + with soma.DenseNDArray.open( + uri, + "r", + context=soma.SOMATileDBContext( + {"sm.mem.total_budget": "0", "sm.io_concurrency_level": "0"} + ), + ) as sdf: + + # This errors out as 0 is not a valid value to set the total memory + # budget or nummber of threads + with pytest.raises(soma.SOMAError): + sdf.read() + + # This still errors out because read still sees that the number of + # threads is 0 and therefore invalid + with pytest.raises(soma.SOMAError): + sdf.read(platform_config={"sm.mem.total_budget": "300000"}) + + # With correct values, this reads without issue + sdf.read( + platform_config={ + "sm.mem.total_budget": "300000", + "sm.io_concurrency_level": "1", + } + ) diff --git a/apis/python/tests/test_sparse_nd_array.py b/apis/python/tests/test_sparse_nd_array.py index 403e806b6d..cedd88c68e 100644 --- a/apis/python/tests/test_sparse_nd_array.py +++ b/apis/python/tests/test_sparse_nd_array.py @@ -1883,3 +1883,47 @@ def test_global_writes(tmp_path): data, platform_config=soma.TileDBCreateOptions(), ) + + +def test_pass_configs(tmp_path): + uri = tmp_path.as_posix() + + with soma.SparseNDArray.create( + tmp_path.as_posix(), type=pa.uint8(), shape=(3,) + ) as a: + data = pa.Table.from_pydict( + { + "soma_dim_0": pa.array([0, 1, 2], type=pa.int64()), + "soma_data": pa.array([1, 2, 3], type=pa.uint8()), + } + ) + a.write(data) + + # Pass a custom config to open + with soma.SparseNDArray.open( + uri, + "r", + context=soma.SOMATileDBContext( + {"sm.mem.total_budget": "0", "sm.io_concurrency_level": "0"} + ), + ) as sdf: + + # This errors out as 0 is not a valid value to set the total memory + # budget or nummber of threads + with pytest.raises(soma.SOMAError): + next(sdf.read().tables()) + + # This still errors out because read still sees that the number of + # threads is 0 and therefore invalid + with pytest.raises(soma.SOMAError): + next(sdf.read(platform_config={"sm.mem.total_budget": "10000"}).tables()) + + # With correct values, this reads without issue + next( + sdf.read( + platform_config={ + "sm.mem.total_budget": "10000", + "sm.io_concurrency_level": "1", + } + ).tables() + ) diff --git a/libtiledbsoma/src/soma/managed_query.cc b/libtiledbsoma/src/soma/managed_query.cc index e851685dae..3b89c9a9a6 100644 --- a/libtiledbsoma/src/soma/managed_query.cc +++ b/libtiledbsoma/src/soma/managed_query.cc @@ -203,6 +203,104 @@ void ManagedQuery::submit_read() { }); } +std::optional> ManagedQuery::read_next() { + setup_read(); + + if (is_empty_query() && !query_submitted_) { + query_submitted_ = true; + return buffers_; + } + + if (is_complete(false)) { + return std::nullopt; + } + + query_submitted_ = true; + query_future_ = std::async(std::launch::async, [&]() { + LOG_DEBUG("[ManagedQuery] submit thread start"); + try { + query_->submit(); + } catch (const std::exception& e) { + return StatusAndException(false, e.what()); + } + LOG_DEBUG("[ManagedQuery] submit thread done"); + return StatusAndException(true, "success"); + }); + + if (query_future_.valid()) { + LOG_DEBUG(std::format("[ManagedQuery] [{}] Waiting for query", name_)); + query_future_.wait(); + LOG_DEBUG( + std::format("[ManagedQuery] [{}] Done waiting for query", name_)); + + auto retval = query_future_.get(); + if (!retval.succeeded()) { + throw TileDBSOMAError(std::format( + "[ManagedQuery] [{}] Query FAILED: {}", + name_, + retval.message())); + } + + } else { + throw TileDBSOMAError( + std::format("[ManagedQuery] [{}] 'query_future_' invalid", name_)); + } + + auto status = query_->query_status(); + + if (status == Query::Status::FAILED) { + throw TileDBSOMAError( + std::format("[ManagedQuery] [{}] Query FAILED", name_)); + } + + // If the query was ever incomplete, the result buffers contents are not + // complete. + if (status == Query::Status::INCOMPLETE) { + results_complete_ = false; + } else if (status == Query::Status::COMPLETE) { + results_complete_ = true; + } + + // Update ColumnBuffer size to match query results + size_t num_cells = 0; + for (auto& name : buffers_->names()) { + num_cells = buffers_->at(name)->update_size(*query_); + LOG_DEBUG(std::format( + "[ManagedQuery] [{}] Buffer {} cells={}", name_, name, num_cells)); + } + total_num_cells_ += num_cells; + + // TODO: retry the query with larger buffers + if (status == Query::Status::INCOMPLETE && !num_cells) { + throw TileDBSOMAError( + std::format("[ManagedQuery] [{}] Buffers are too small.", name_)); + } + + // Visit all attributes and retrieve enumeration vectors + auto attribute_map = schema_->attributes(); + for (auto& nmit : attribute_map) { + auto attrname = nmit.first; + auto attribute = nmit.second; + auto enumname = AttributeExperimental::get_enumeration_name( + *ctx_, attribute); + if (enumname != std::nullopt) { + auto enumeration = ArrayExperimental::get_enumeration( + *ctx_, *array_, enumname.value()); + auto enumvec = enumeration.as_vector(); + if (!buffers_->contains(attrname)) { + continue; + } + auto colbuf = buffers_->at(attrname); + colbuf->add_enumeration(enumvec); + LOG_DEBUG(std::format( + "[ManagedQuery] got Enumeration '{}' for attribute '{}'", + enumname.value(), + attrname)); + } + } + return buffers_; +} + // Please see the header-file comments for context. void ManagedQuery::_fill_in_subarrays_if_dense(bool is_read) { LOG_TRACE("[ManagedQuery] _fill_in_subarrays enter"); @@ -381,6 +479,7 @@ void ManagedQuery::_fill_in_subarrays_if_dense_with_new_shape( std::shared_ptr ManagedQuery::results() { if (is_empty_query()) { + query_submitted_ = true; return buffers_; } diff --git a/libtiledbsoma/src/soma/managed_query.h b/libtiledbsoma/src/soma/managed_query.h index d9bceb8ff8..b35b9c767f 100644 --- a/libtiledbsoma/src/soma/managed_query.h +++ b/libtiledbsoma/src/soma/managed_query.h @@ -281,6 +281,8 @@ class ManagedQuery { */ void setup_read(); + std::optional> read_next(); + /** * @brief Check if the query is complete. * @@ -423,6 +425,10 @@ class ManagedQuery { return query_->query_status(); } + bool is_first_read() const { + return !query_submitted_; + } + private: //=================================================================== //= private non-static