Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] Remove double open For SOMAArray reads #3293

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 7 additions & 23 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from . import pytiledbsoma as clib
from ._constants import SOMA_JOINID
from ._exception import SOMAError, map_exception_for_create
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._soma_array import SOMAArray
from ._tdb_handles import DataFrameWrapper
Expand Down Expand Up @@ -722,31 +721,16 @@ def read(
_util.check_unpartitioned(partitions)
self._check_open_read()

handle = self._handle._handle

context = handle.context()
if platform_config is not None:
config = context.tiledb_config.copy()
config.update(platform_config)
context = clib.SOMAContext(config)

sr = clib.SOMADataFrame.open(
uri=handle.uri,
mode=clib.OpenMode.read,
context=context,
column_names=column_names or [],
# TODO: batch_size
return TableReadIter(
array=self,
coords=coords,
column_names=column_names,
result_order=_util.to_clib_result_order(result_order),
timestamp=handle.timestamp and (0, handle.timestamp),
value_filter=value_filter,
platform_config=platform_config,
)

if value_filter is not None:
sr.set_condition(QueryCondition(value_filter), handle.schema)

_util._set_coords(sr, coords)

# TODO: batch_size
return TableReadIter(sr)

def write(
self, values: pa.Table, platform_config: Optional[options.PlatformConfig] = None
) -> Self:
Expand Down
50 changes: 16 additions & 34 deletions apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ._arrow_types import pyarrow_to_carrow_type
from ._common_nd_array import NDArray
from ._exception import SOMAError, map_exception_for_create
from ._read_iters import TableReadIter
from ._tdb_handles import DenseNDArrayWrapper
from ._types import OpenTimestamp, Slice
from ._util import dense_indices_to_shape
Expand Down Expand Up @@ -232,42 +233,20 @@
data_shape = tuple(handle.shape if use_shape else ned)
target_shape = dense_indices_to_shape(coords, data_shape, result_order)

context = handle.context()
if platform_config is not None:
config = context.tiledb_config.copy()
config.update(platform_config)
context = clib.SOMAContext(config)

sr = clib.SOMADenseNDArray.open(
uri=handle.uri,
mode=clib.OpenMode.read,
context=context,
arrow_table = TableReadIter(
array=self,
coords=coords,
column_names=[],
result_order=_util.to_clib_result_order(result_order),
timestamp=handle.timestamp and (0, handle.timestamp),
)
value_filter=None,
platform_config=platform_config,
).concat()

_util._set_coords(sr, coords)

arrow_tables = []
while True:
arrow_table_piece = sr.read_next()
if not arrow_table_piece:
break
arrow_tables.append(arrow_table_piece)

# For dense arrays there is no zero-output case: attempting to make a test case
# to do that, say by indexing a 10x20 array by positions 888 and 999, results
# in read-time errors of the form
#
# [TileDB::Subarray] Error: Cannot add range to dimension 'soma_dim_0'; Range [888, 888] is
# out of domain bounds [0, 9]
if not arrow_tables:
if arrow_table is None:
raise SOMAError(
"internal error: at least one table-piece should have been returned"
)

arrow_table = pa.concat_tables(arrow_tables)
npval = arrow_table.column("soma_data").to_numpy()
# TODO: as currently coded we're looking at the non-empty domain upper
# bound but not its lower bound. That works fine if data are written at
Expand Down Expand Up @@ -310,7 +289,7 @@
"""
_util.check_type("values", values, (pa.Tensor,))

clib_dense_array = self._handle._handle
clib_handle = self._handle._handle

# Compute the coordinates for the dense array.
new_coords: List[Union[int, Slice[int], None]] = []
Expand All @@ -331,13 +310,16 @@
if not input.flags.contiguous:
input = np.ascontiguousarray(input)
order = clib.ResultOrder.rowmajor
clib_dense_array.reset(result_order=order)
_util._set_coords(clib_dense_array, new_coords)
clib_dense_array.write(input)

mq = clib.ManagedQuery(clib_handle, clib_handle.context())
mq.set_layout(order)
_util._set_coords(mq, clib_handle, new_coords)
mq.set_soma_data(input)
mq.submit_write()

tiledb_write_options = TileDBWriteOptions.from_platform_config(platform_config)
if tiledb_write_options.consolidate_and_vacuum:
clib_dense_array.consolidate_and_vacuum()
clib_handle.consolidate_and_vacuum()

Check warning on line 322 in apis/python/src/tiledbsoma/_dense_nd_array.py

View check run for this annotation

Codecov / codecov/patch

apis/python/src/tiledbsoma/_dense_nd_array.py#L322

Added line #L322 was not covered by tests
return self

@classmethod
Expand Down
30 changes: 7 additions & 23 deletions apis/python/src/tiledbsoma/_point_cloud_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
_revise_domain_for_extent,
)
from ._exception import SOMAError, map_exception_for_create
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._spatial_dataframe import SpatialDataFrame
from ._spatial_util import (
Expand Down Expand Up @@ -332,31 +331,16 @@ def read(
_util.check_unpartitioned(partitions)
self._check_open_read()

handle = self._handle._handle

context = handle.context()
if platform_config is not None:
config = context.tiledb_config.copy()
config.update(platform_config)
context = clib.SOMAContext(config)

sr = clib.SOMAPointCloudDataFrame.open(
uri=handle.uri,
mode=clib.OpenMode.read,
context=context,
column_names=column_names or [],
# TODO: batch_size
return TableReadIter(
array=self,
coords=coords,
column_names=column_names,
result_order=_util.to_clib_result_order(result_order),
timestamp=handle.timestamp and (0, handle.timestamp),
value_filter=value_filter,
platform_config=platform_config,
)

if value_filter is not None:
sr.set_condition(QueryCondition(value_filter), handle.schema)

_util._set_coords(sr, coords)

# # TODO: batch_size
return TableReadIter(sr)

def read_spatial_region(
self,
region: Optional[options.SpatialRegion] = None,
Expand Down
Loading