diff --git a/tiledb/cc/common.cc b/tiledb/cc/common.cc index 0c8b81088f..db72157a77 100644 --- a/tiledb/cc/common.cc +++ b/tiledb/cc/common.cc @@ -168,7 +168,8 @@ tiledb_datatype_t np_to_tdb_dtype(py::dtype type) { if (kind == py::str("U")) return TILEDB_STRING_UTF8; - TPY_ERROR_LOC("could not handle numpy dtype"); + TPY_ERROR_LOC("could not handle numpy dtype: " + + py::getattr(type, "name").cast()); } bool is_tdb_num(tiledb_datatype_t type) { diff --git a/tiledb/core.cc b/tiledb/core.cc index 40886d73d3..4cc70cadae 100644 --- a/tiledb/core.cc +++ b/tiledb/core.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -40,7 +41,6 @@ namespace tiledbpy { -using namespace std; using namespace tiledb; namespace py = pybind11; using namespace pybind11::literals; @@ -297,18 +297,260 @@ uint64_t count_zeros(py::array_t a) { return count; } +class PyAgg { + + using ByteBuffer = py::array_t; + using AggToBufferMap = std::map; + using AttrToAggsMap = std::map; + +private: + Context ctx_; + std::shared_ptr array_; + std::shared_ptr query_; + AttrToAggsMap result_buffers_; + AttrToAggsMap validity_buffers_; + + py::dict original_input_; + std::vector attrs_; + +public: + PyAgg() = delete; + + PyAgg(const Context &ctx, py::object py_array, py::object py_layout, + py::dict attr_to_aggs_input) + : ctx_(ctx), original_input_(attr_to_aggs_input) { + tiledb_array_t *c_array_ = (py::capsule)py_array.attr("__capsule__")(); + + // We never own this pointer; pass own=false + array_ = std::make_shared(ctx_, c_array_, false); + query_ = std::make_shared(ctx_, *array_, TILEDB_READ); + + bool issparse = array_->schema().array_type() == TILEDB_SPARSE; + tiledb_layout_t layout = (tiledb_layout_t)py_layout.cast(); + if (!issparse && layout == TILEDB_UNORDERED) { + TPY_ERROR_LOC("TILEDB_UNORDERED read is not supported for dense arrays") + } + query_->set_layout(layout); + + // Iterate through the requested attributes + for (auto attr_to_aggs : attr_to_aggs_input) { + auto attr_name = attr_to_aggs.first.cast(); + auto aggs = attr_to_aggs.second.cast>(); + + tiledb::Attribute attr = array_->schema().attribute(attr_name); + attrs_.push_back(attr_name); + + // For non-nullable attributes, applying max and min to the empty set is + // undefined. To check for this, we need to also run the count aggregate + // to make sure count != 0 + bool requested_max = + std::find(aggs.begin(), aggs.end(), "max") != aggs.end(); + bool requested_min = + std::find(aggs.begin(), aggs.end(), "min") != aggs.end(); + if (!attr.nullable() && (requested_max || requested_min)) { + // If the user already also requested count, then we don't need to + // request it again + if (std::find(aggs.begin(), aggs.end(), "count") == aggs.end()) { + aggs.push_back("count"); + } + } + + // Iterate through the aggreate operations to apply on the given attribute + for (auto agg_name : aggs) { + _apply_agg_operator_to_attr(agg_name, attr_name); + + // Set the result data buffers + auto *res_buf = &result_buffers_[attr_name][agg_name]; + if ("count" == agg_name || "null_count" == agg_name || + "mean" == agg_name) { + // count and null_count use uint64 and mean uses float64 + *res_buf = py::array(py::dtype("uint8"), 8); + } else { + // max, min, and sum use the dtype of the attribute + py::dtype dt(tiledb_dtype(attr.type(), attr.cell_size())); + *res_buf = py::array(py::dtype("uint8"), dt.itemsize()); + } + query_->set_data_buffer(attr_name + agg_name, (void *)res_buf->data(), + 1); + + if (attr.nullable()) { + // For nullable attributes, if the input set for the aggregation + // contains all NULL values, we will not get an aggregate value back + // as this operation is undefined. We need to check the validity + // buffer beforehand to see if we had a valid result + if (!("count" == agg_name || "null_count" == agg_name)) { + auto *val_buf = &validity_buffers_[attr.name()][agg_name]; + *val_buf = py::array(py::dtype("uint8"), 1); + query_->set_validity_buffer(attr_name + agg_name, + (uint8_t *)val_buf->data(), 1); + } + } + } + } + } + + void _apply_agg_operator_to_attr(const std::string &op_label, + const std::string &attr_name) { + using AggregateFunc = + std::function; + + std::unordered_map label_to_agg_func = { + {"sum", QueryExperimental::create_unary_aggregate}, + {"min", QueryExperimental::create_unary_aggregate}, + {"max", QueryExperimental::create_unary_aggregate}, + {"mean", QueryExperimental::create_unary_aggregate}, + {"null_count", + QueryExperimental::create_unary_aggregate}, + }; + + QueryChannel default_channel = + QueryExperimental::get_default_channel(*query_); + + if (label_to_agg_func.find(op_label) != label_to_agg_func.end()) { + AggregateFunc create_unary_aggregate = label_to_agg_func.at(op_label); + ChannelOperation op = create_unary_aggregate(*query_, attr_name); + default_channel.apply_aggregate(attr_name + op_label, op); + } else if ("count" == op_label) { + default_channel.apply_aggregate(attr_name + op_label, CountOperation()); + } else { + TPY_ERROR_LOC("Invalid channel operation " + op_label + + " passed to apply_aggregate."); + } + } + + py::dict get_aggregate() { + query_->submit(); + + // Cast the results to the correct dtype and output this as a Python dict + py::dict output; + for (auto attr_to_agg : original_input_) { + // Be clear in our variable names for strings as py::dict uses py::str + // keys whereas std::map uses std::string keys + std::string attr_cpp_name = attr_to_agg.first.cast(); + + py::str attr_py_name(attr_cpp_name); + output[attr_py_name] = py::dict(); + + tiledb::Attribute attr = array_->schema().attribute(attr_cpp_name); + + for (auto agg_py_name : original_input_[attr_py_name]) { + std::string agg_cpp_name = agg_py_name.cast(); + + if (_is_invalid(attr, agg_cpp_name)) { + output[attr_py_name][agg_py_name] = + _is_integer_dtype(attr) ? py::none() : py::cast(NAN); + } else { + output[attr_py_name][agg_py_name] = _set_result(attr, agg_cpp_name); + } + } + } + return output; + } + + bool _is_invalid(tiledb::Attribute attr, std::string agg_name) { + if (attr.nullable()) { + if ("count" == agg_name || "null_count" == agg_name) + return false; + + // For nullable attributes, check if the validity buffer returned false + const void *val_buf = validity_buffers_[attr.name()][agg_name].data(); + return *((uint8_t *)(val_buf)) == 0; + } else { + // For non-nullable attributes, max and min are undefined for the empty + // set, so we must check the count == 0 + if ("max" == agg_name || "min" == agg_name) { + const void *count_buf = result_buffers_[attr.name()]["count"].data(); + return *((uint64_t *)(count_buf)) == 0; + } + return false; + } + } + + bool _is_integer_dtype(tiledb::Attribute attr) { + switch (attr.type()) { + case TILEDB_INT8: + case TILEDB_INT16: + case TILEDB_UINT8: + case TILEDB_INT32: + case TILEDB_INT64: + case TILEDB_UINT16: + case TILEDB_UINT32: + case TILEDB_UINT64: + return true; + default: + return false; + } + } + + py::object _set_result(tiledb::Attribute attr, std::string agg_name) { + const void *agg_buf = result_buffers_[attr.name()][agg_name].data(); + + if ("mean" == agg_name) + return py::cast(*((double *)agg_buf)); + + if ("count" == agg_name || "null_count" == agg_name) + return py::cast(*((uint64_t *)agg_buf)); + + switch (attr.type()) { + case TILEDB_FLOAT32: + return py::cast("sum" == agg_name ? *((double *)agg_buf) + : *((float *)agg_buf)); + case TILEDB_FLOAT64: + return py::cast(*((double *)agg_buf)); + case TILEDB_INT8: + return py::cast(*((int8_t *)agg_buf)); + case TILEDB_UINT8: + return py::cast(*((uint8_t *)agg_buf)); + case TILEDB_INT16: + return py::cast(*((int16_t *)agg_buf)); + case TILEDB_UINT16: + return py::cast(*((uint16_t *)agg_buf)); + case TILEDB_UINT32: + return py::cast(*((uint32_t *)agg_buf)); + case TILEDB_INT32: + return py::cast(*((int32_t *)agg_buf)); + case TILEDB_INT64: + return py::cast(*((int64_t *)agg_buf)); + case TILEDB_UINT64: + return py::cast(*((uint64_t *)agg_buf)); + default: + TPY_ERROR_LOC( + "[_cast_agg_result] Invalid tiledb dtype for aggregation result") + } + } + + void set_subarray(py::object py_subarray) { + query_->set_subarray(*py_subarray.cast()); + } + + void set_cond(py::object cond) { + py::object init_pyqc = cond.attr("init_query_condition"); + + try { + init_pyqc(array_->uri(), attrs_, ctx_); + } catch (tiledb::TileDBError &e) { + TPY_ERROR_LOC(e.what()); + } catch (py::error_already_set &e) { + TPY_ERROR_LOC(e.what()); + } + auto pyqc = (cond.attr("c_obj")).cast(); + auto qc = pyqc.ptr().get(); + query_->set_condition(*qc); + } +}; + class PyQuery { private: Context ctx_; - shared_ptr domain_; - shared_ptr array_schema_; - shared_ptr array_; - shared_ptr query_; + std::shared_ptr domain_; + std::shared_ptr array_schema_; + std::shared_ptr array_; + std::shared_ptr query_; std::vector attrs_; std::vector dims_; - map buffers_; - vector buffers_order_; + std::map buffers_; + std::vector buffers_order_; bool deduplicate_ = true; bool use_arrow_ = false; @@ -320,9 +562,7 @@ class PyQuery { tiledb_layout_t layout_ = TILEDB_ROW_MAJOR; // label buffer list - std::unordered_map label_input_buffer_data_; - - std::string uri_; + unordered_map label_input_buffer_data_; public: tiledb_ctx_t *c_ctx_; @@ -347,15 +587,11 @@ class PyQuery { tiledb_array_t *c_array_ = (py::capsule)array.attr("__capsule__")(); // we never own this pointer, pass own=false - array_ = std::shared_ptr(new Array(ctx_, c_array_, false)); - - array_schema_ = - std::shared_ptr(new ArraySchema(array_->schema())); + array_ = std::make_shared(ctx_, c_array_, false); - domain_ = - std::shared_ptr(new Domain(array_schema_->domain())); + array_schema_ = std::make_shared(array_->schema()); - uri_ = array.attr("uri").cast(); + domain_ = std::make_shared(array_schema_->domain()); bool issparse = array_->schema().array_type() == TILEDB_SPARSE; @@ -398,8 +634,7 @@ class PyQuery { } } - query_ = - std::shared_ptr(new Query(ctx_, *array_, query_mode)); + query_ = std::make_shared(ctx_, *array_, query_mode); // [](Query* p){} /* note: no deleter*/); if (query_mode == TILEDB_READ) { @@ -424,8 +659,7 @@ class PyQuery { } void set_subarray(py::object py_subarray) { - tiledb::Subarray *subarray = py_subarray.cast(); - query_->set_subarray(*subarray); + query_->set_subarray(*py_subarray.cast()); } #if defined(TILEDB_SERIALIZATION) @@ -456,7 +690,7 @@ class PyQuery { py::object init_pyqc = cond.attr("init_query_condition"); try { - init_pyqc(uri_, attrs_, ctx_); + init_pyqc(array_->uri(), attrs_, ctx_); } catch (tiledb::TileDBError &e) { TPY_ERROR_LOC(e.what()); } catch (py::error_already_set &e) { @@ -1538,6 +1772,18 @@ void init_core(py::module &m) { &PyQuery::_test_alloc_max_bytes) .def_readonly("retries", &PyQuery::retries_); + py::class_(m, "PyAgg") + .def(py::init(), + "ctx"_a, "py_array"_a, "py_layout"_a, "attr_to_aggs_input"_a) + .def("set_subarray", &PyAgg::set_subarray) + .def("set_cond", &PyAgg::set_cond) + .def("get_aggregate", &PyAgg::get_aggregate); + + py::class_(m, "PAPair") + .def(py::init()) + .def("get_array", &PAPair::get_array) + .def("get_schema", &PAPair::get_schema); + m.def("array_to_buffer", &convert_np); m.def("init_stats", &init_stats); @@ -1548,11 +1794,6 @@ void init_core(py::module &m) { m.def("get_stats", &get_stats); m.def("use_stats", &use_stats); - py::class_(m, "PAPair") - .def(py::init()) - .def("get_array", &PAPair::get_array) - .def("get_schema", &PAPair::get_schema); - /* We need to make sure C++ TileDBError is translated to a correctly-typed py error. Note that using py::exception(..., "TileDBError") creates a new diff --git a/tiledb/libtiledb.pxd b/tiledb/libtiledb.pxd index c2ed474b48..f00dc8396f 100644 --- a/tiledb/libtiledb.pxd +++ b/tiledb/libtiledb.pxd @@ -1211,6 +1211,10 @@ cdef class SparseArrayImpl(Array): cdef class DenseArrayImpl(Array): cdef _read_dense_subarray(self, object subarray, list attr_names, object cond, tiledb_layout_t layout, bint include_coords) +cdef class Aggregation(object): + cdef Query query + cdef object attr_to_aggs + cdef class Query(object): cdef Array array cdef object attrs diff --git a/tiledb/libtiledb.pyx b/tiledb/libtiledb.pyx index c7727d902a..b2a7aafd4a 100644 --- a/tiledb/libtiledb.pyx +++ b/tiledb/libtiledb.pyx @@ -8,6 +8,7 @@ from cpython.version cimport PY_MAJOR_VERSION include "common.pxi" import io import warnings +import collections.abc from collections import OrderedDict from json import dumps as json_dumps from json import loads as json_loads @@ -15,7 +16,7 @@ from json import loads as json_loads from ._generated_version import version_tuple as tiledbpy_version from .array_schema import ArraySchema from .enumeration import Enumeration -from .cc import TileDBError +from .cc import TileDBError from .ctx import Config, Ctx, default_ctx from .vfs import VFS @@ -1701,6 +1702,103 @@ cdef class Array(object): self.__init__(uri, mode=mode, key=key, attr=view_attr, timestamp=timestamp_range, ctx=ctx) +cdef class Aggregation(object): + """ + Proxy object returned by Query.agg to calculate aggregations. + """ + + def __init__(self, query=None, attr_to_aggs={}): + if query is None: + raise ValueError("must pass in a query object") + + self.query = query + self.attr_to_aggs = attr_to_aggs + + def __getitem__(self, object selection): + from .main import PyAgg + from .subarray import Subarray + + array = self.query.array + order = self.query.order + + cdef tiledb_layout_t layout = TILEDB_UNORDERED if array.schema.sparse else TILEDB_ROW_MAJOR + if order is None or order == 'C': + layout = TILEDB_ROW_MAJOR + elif order == 'F': + layout = TILEDB_COL_MAJOR + elif order == 'G': + layout = TILEDB_GLOBAL_ORDER + elif order == 'U': + layout = TILEDB_UNORDERED + else: + raise ValueError("order must be 'C' (TILEDB_ROW_MAJOR), "\ + "'F' (TILEDB_COL_MAJOR), "\ + "'G' (TILEDB_GLOBAL_ORDER), "\ + "or 'U' (TILEDB_UNORDERED)") + + q = PyAgg(array._ctx_(), array, layout, self.attr_to_aggs) + + selection = index_as_tuple(selection) + dom = array.schema.domain + idx = replace_ellipsis(dom.ndim, selection) + idx, drop_axes = replace_scalars_slice(dom, idx) + dim_ranges = index_domain_subarray(array, dom, idx) + + subarray = Subarray(array, array.ctx) + subarray.add_ranges([list([x]) for x in dim_ranges]) + q.set_subarray(subarray) + + cond = self.query.cond + if cond is not None and cond != "": + from .query_condition import QueryCondition + + if isinstance(cond, str): + q.set_cond(QueryCondition(cond)) + elif isinstance(cond, QueryCondition): + raise TileDBError( + "Passing `tiledb.QueryCondition` to `cond` is no longer " + "supported as of 0.19.0. Instead of " + "`cond=tiledb.QueryCondition('expression')` " + "you must use `cond='expression'`. This message will be " + "removed in 0.21.0.", + ) + else: + raise TypeError("`cond` expects type str.") + + result = q.get_aggregate() + + # If there was only one attribute, just show the aggregate results + if len(result) == 1: + result = result[list(result.keys())[0]] + + # If there was only one aggregate, just show the value + if len(result) == 1: + result = result[list(result.keys())[0]] + + return result + + @property + def multi_index(self): + """Apply Array.multi_index with query parameters.""" + # Delayed to avoid circular import + from .multirange_indexing import MultiRangeAggregation + return MultiRangeAggregation(self.query.array, query=self) + + @property + def df(self): + raise NotImplementedError(".df indexer not supported for Aggregations") + + @property + def attr_to_aggs(self): + """Return the attribute and aggregration input mapping""" + return self.attr_to_aggs + + @property + def query(self): + """Return the underlying Query object""" + return self.query + + cdef class Query(object): """ Proxy object returned by query() to index into original array @@ -1767,10 +1865,75 @@ cdef class Query(object): raise TileDBError("`return_arrow=True` requires .df indexer`") return self.array.subarray(selection, - attrs=self.attrs, - cond=self.cond, - coords=self.coords if self.coords else self.dims, - order=self.order) + attrs=self.attrs, + cond=self.cond, + coords=self.coords if self.coords else self.dims, + order=self.order) + + def agg(self, aggs): + """ + Calculate an aggregate operation for a given attribute. Available + operations are sum, min, max, mean, count, and null_count (for nullable + attributes only). Aggregates may be combined with other query operations + such as query conditions and slicing. + + The input may be a single operation, a list of operations, or a + dictionary with attribute mapping to a single operation or list of + operations. + + For undefined operations on max and min, which can occur when a nullable + attribute contains only nulled data at the given coordinates or when + there is no data read for the given query (e.g. query conditions that do + not match any values or coordinates that contain no data)), invalid + results are represented as np.nan for attributes of floating point types + and None for integer types. + + >>> import tiledb, tempfile, numpy as np + >>> path = tempfile.mkdtemp() + + >>> with tiledb.from_numpy(path, np.arange(1, 10)) as A: + ... pass + + >>> # Note that tiledb.from_numpy creates anonymous attributes, so the + >>> # name of the attribute is represented as an empty string + + >>> with tiledb.open(path, 'r') as A: + ... A.query().agg("sum")[:] + 45 + + >>> with tiledb.open(path, 'r') as A: + ... A.query(cond="attr('') < 5").agg(["count", "mean"])[:] + {'count': 9, 'mean': 2.5} + + >>> with tiledb.open(path, 'r') as A: + ... A.query().agg({"": ["max", "min"]})[2:7] + {'max': 7, 'min': 3} + + :param agg: The input attributes and operations to apply aggregations on + :returns: single value for single operation on one attribute, a dictionary + of attribute keys associated with a single value for a single operation + across multiple attributes, or a dictionary of attribute keys that maps + to a dictionary of operation labels with the associated value + """ + schema = self.array.schema + attr_to_aggs_map = {} + if isinstance(aggs, dict): + attr_to_aggs_map = { + a: ( + tuple([aggs[a]]) + if isinstance(aggs[a], str) + else tuple(aggs[a]) + ) + for a in aggs + } + elif isinstance(aggs, str): + attrs = tuple(schema.attr(i).name for i in range(schema.nattr)) + attr_to_aggs_map = {a: (aggs,) for a in attrs} + elif isinstance(aggs, collections.abc.Sequence): + attrs = tuple(schema.attr(i).name for i in range(schema.nattr)) + attr_to_aggs_map = {a: tuple(aggs) for a in attrs} + + return Aggregation(self, attr_to_aggs_map) @property def attrs(self): @@ -2144,7 +2307,6 @@ cdef class DenseArrayImpl(Array): q = PyQuery(self._ctx_(), self, tuple(attr_names), tuple(), layout, False) self.pyquery = q - if cond is not None and cond != "": from .query_condition import QueryCondition @@ -2303,6 +2465,16 @@ cdef class DenseArrayImpl(Array): attributes.append(attr._internal_name) # object arrays are var-len and handled later if type(attr_val) is np.ndarray and attr_val.dtype is not np.dtype('O'): + if attr.isnullable and name not in nullmaps: + try: + nullmaps[name] = ~np.ma.masked_invalid(attr_val).mask + attr_val = np.nan_to_num(attr_val) + except Exception as exc: + attr_val = np.asarray(attr_val) + nullmaps[name] = np.array( + [int(v is not None) for v in attr_val], + dtype=np.uint8 + ) attr_val = np.ascontiguousarray(attr_val, dtype=attr.dtype) try: diff --git a/tiledb/multirange_indexing.py b/tiledb/multirange_indexing.py index 7b799f3f4d..010836f18f 100644 --- a/tiledb/multirange_indexing.py +++ b/tiledb/multirange_indexing.py @@ -25,9 +25,10 @@ from .cc import TileDBError from .dataframe_ import check_dataframe_deps +from .libtiledb import Aggregation as AggregationProxy from .libtiledb import Array, ArraySchema, Metadata from .libtiledb import Query as QueryProxy -from .main import PyQuery, increment_stat, use_stats +from .main import PyAgg, PyQuery, increment_stat, use_stats from .query import Query from .query_condition import QueryCondition from .subarray import Subarray @@ -354,6 +355,46 @@ def _run_query(self) -> Dict[str, np.ndarray]: return result_dict +class MultiRangeAggregation(_BaseIndexer): + def __init__(self, array: Array, query: Optional[AggregationProxy] = None): + super().__init__(array, query) + self.result_shape = None + + def _set_shape(self, ranges): + schema = self.array.schema + if not schema.sparse and len(schema.shape) > 1: + self.result_shape = mr_dense_result_shape(ranges, schema.shape) + else: + self.result_shape = None + + def __getitem__(self, idx): + with timing("getitem_time"): + if idx is EmptyRange: + self.pyquery = None + self.subarray = None + else: + self.pyquery = _get_pyagg(self.array, self.query) + self.subarray = Subarray(self.array) + self._set_ranges(idx) + return self._run_query() + + def _run_query(self) -> Dict[str, np.ndarray]: + if self.pyquery is None: + return self._empty_results + + result = self.pyquery.get_aggregate() + + # If there was only one attribute, just show the aggregate results + if len(result) == 1: + result = result[list(result.keys())[0]] + + # If there was only one aggregate, just show the value + if len(result) == 1: + result = result[list(result.keys())[0]] + + return result + + class DataFrameIndexer(_BaseIndexer): """ Implements `.df[]` indexing to directly return a dataframe @@ -610,6 +651,23 @@ def _get_pyquery( return pyquery +def _get_pyagg(array: Array, agg: AggregationProxy) -> PyAgg: + order = agg.query.order + + try: + layout = "CFGU".index(order) + except ValueError: + raise ValueError( + "order must be 'C' (TILEDB_ROW_MAJOR), 'F' (TILEDB_COL_MAJOR), " + "'U' (TILEDB_UNORDERED), or 'G' (TILEDB_GLOBAL_ORDER)" + ) + + pyagg = PyAgg(array._ctx_(), array, layout, agg.attr_to_aggs) + if agg.query.cond is not None: + pyagg.set_cond(QueryCondition(agg.query.cond)) + return pyagg + + def _iter_attr_names( schema: ArraySchema, query: Optional[QueryProxy] = None ) -> Iterator[str]: diff --git a/tiledb/tests/test_aggregates.py b/tiledb/tests/test_aggregates.py new file mode 100644 index 0000000000..5f402eedd6 --- /dev/null +++ b/tiledb/tests/test_aggregates.py @@ -0,0 +1,444 @@ +import numpy as np +import pytest + +import tiledb + +from .common import DiskTestCase + + +class AggregateTest(DiskTestCase): + @pytest.mark.parametrize("sparse", [True, False]) + @pytest.mark.parametrize( + "dtype", + [ + np.uint8, + np.int8, + np.uint16, + np.int16, + np.uint32, + np.int32, + np.uint64, + np.int64, + np.float32, + np.float64, + ], + ) + def test_basic(self, sparse, dtype): + path = self.path("test_basic") + dom = tiledb.Domain(tiledb.Dim(name="d", domain=(0, 9), dtype=np.int32)) + attrs = [tiledb.Attr(name="a", dtype=dtype)] + schema = tiledb.ArraySchema(domain=dom, attrs=attrs, sparse=sparse) + tiledb.Array.create(path, schema) + + data = np.random.randint(1, 10, size=10) + + with tiledb.open(path, "w") as A: + if sparse: + A[np.arange(0, 10)] = data + else: + A[:] = data + + all_aggregates = ("count", "sum", "min", "max", "mean") + + with tiledb.open(path, "r") as A: + # entire column + q = A.query() + expected = q[:]["a"] + + with pytest.raises(tiledb.TileDBError): + q.agg("bad")[:] + + with pytest.raises(tiledb.TileDBError): + q.agg("null_count")[:] + + with pytest.raises(NotImplementedError): + q.agg("count").df[:] + + assert q.agg("sum")[:] == sum(expected) + assert q.agg("min")[:] == min(expected) + assert q.agg("max")[:] == max(expected) + assert q.agg("mean")[:] == sum(expected) / len(expected) + assert q.agg("count")[:] == len(expected) + + assert q.agg({"a": "sum"})[:] == sum(expected) + assert q.agg({"a": "min"})[:] == min(expected) + assert q.agg({"a": "max"})[:] == max(expected) + assert q.agg({"a": "mean"})[:] == sum(expected) / len(expected) + assert q.agg({"a": "count"})[:] == len(expected) + + actual = q.agg(all_aggregates)[:] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + actual = q.agg({"a": all_aggregates})[:] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + # subarray + expected = A[4:7]["a"] + + assert q.agg("sum")[4:7] == sum(expected) + assert q.agg("min")[4:7] == min(expected) + assert q.agg("max")[4:7] == max(expected) + assert q.agg("mean")[4:7] == sum(expected) / len(expected) + assert q.agg("count")[4:7] == len(expected) + + assert q.agg({"a": "sum"})[4:7] == sum(expected) + assert q.agg({"a": "min"})[4:7] == min(expected) + assert q.agg({"a": "max"})[4:7] == max(expected) + assert q.agg({"a": "mean"})[4:7] == sum(expected) / len(expected) + assert q.agg({"a": "count"})[4:7] == len(expected) + + actual = q.agg(all_aggregates)[4:7] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + actual = q.agg({"a": all_aggregates})[4:7] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + @pytest.mark.parametrize("sparse", [True, False]) + @pytest.mark.parametrize( + "dtype", + [ + np.uint8, + np.int8, + np.uint16, + np.int16, + np.uint32, + np.int32, + np.uint64, + np.int64, + np.float32, + np.float64, + ], + ) + def test_multi_index(self, sparse, dtype): + path = self.path("test_multi_index") + dom = tiledb.Domain(tiledb.Dim(name="d", domain=(0, 9), dtype=np.int32)) + attrs = [tiledb.Attr(name="a", dtype=dtype)] + schema = tiledb.ArraySchema(domain=dom, attrs=attrs, sparse=sparse) + tiledb.Array.create(path, schema) + + data = np.random.randint(1, 10, size=10) + + with tiledb.open(path, "w") as A: + if sparse: + A[np.arange(0, 10)] = data + else: + A[:] = data + + all_aggregates = ("count", "sum", "min", "max", "mean") + + with tiledb.open(path, "r") as A: + # entire column + q = A.query() + expected = q.multi_index[:]["a"] + + with pytest.raises(tiledb.TileDBError): + q.agg("bad")[:] + + with pytest.raises(tiledb.TileDBError): + q.agg("null_count")[:] + + assert q.agg("sum").multi_index[:] == sum(expected) + assert q.agg("min").multi_index[:] == min(expected) + assert q.agg("max").multi_index[:] == max(expected) + assert q.agg("mean").multi_index[:] == sum(expected) / len(expected) + assert q.agg("count").multi_index[:] == len(expected) + + actual = q.agg(all_aggregates).multi_index[:] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + actual = q.agg({"a": all_aggregates}).multi_index[:] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + # subarray + expected = A.multi_index[4:7]["a"] + + assert q.agg("sum").multi_index[4:7] == sum(expected) + assert q.agg("min").multi_index[4:7] == min(expected) + assert q.agg("max").multi_index[4:7] == max(expected) + assert q.agg("mean").multi_index[4:7] == sum(expected) / len(expected) + assert q.agg("count").multi_index[4:7] == len(expected) + + actual = q.agg(all_aggregates).multi_index[4:7] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + @pytest.mark.parametrize( + "dtype", + [ + np.uint8, + np.int8, + np.uint16, + np.int16, + np.uint32, + np.int32, + np.uint64, + np.int64, + np.float32, + np.float64, + ], + ) + def test_with_query_condition(self, dtype): + path = self.path("test_with_query_condition") + dom = tiledb.Domain(tiledb.Dim(name="d", domain=(0, 9), dtype=np.int32)) + attrs = [tiledb.Attr(name="a", dtype=dtype)] + schema = tiledb.ArraySchema(domain=dom, attrs=attrs, sparse=True) + tiledb.Array.create(path, schema) + + with tiledb.open(path, "w") as A: + # hardcode the first value to be 1 to ensure that the a < 5 + # query condition always returns a non-empty result + data = np.random.randint(1, 10, size=10) + data[0] = 1 + + A[np.arange(0, 10)] = data + + all_aggregates = ("count", "sum", "min", "max", "mean") + + with tiledb.open(path, "r") as A: + q = A.query(cond="a < 5") + + expected = q[:]["a"] + actual = q.agg(all_aggregates)[:] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + expected = q.multi_index[:]["a"] + actual = q.agg(all_aggregates).multi_index[:] + assert actual["sum"] == sum(expected) + assert actual["min"] == min(expected) + assert actual["max"] == max(expected) + assert actual["mean"] == sum(expected) / len(expected) + assert actual["count"] == len(expected) + + # no value matches query condition + q = A.query(cond="a > 10") + + expected = q[:] + actual = q.agg(all_aggregates)[:] + assert actual["sum"] == 0 + if dtype in (np.float32, np.float64): + assert np.isnan(actual["min"]) + assert np.isnan(actual["max"]) + else: + assert actual["min"] is None + assert actual["max"] is None + assert np.isnan(actual["mean"]) + assert actual["count"] == 0 + + expected = q.multi_index[:] + actual = q.agg(all_aggregates).multi_index[:] + assert actual["sum"] == 0 + if dtype in (np.float32, np.float64): + assert np.isnan(actual["min"]) + assert np.isnan(actual["max"]) + else: + assert actual["min"] is None + assert actual["max"] is None + assert np.isnan(actual["mean"]) + assert actual["count"] == 0 + + @pytest.mark.parametrize("sparse", [True, False]) + def test_nullable(self, sparse): + path = self.path("test_nullable") + dom = tiledb.Domain(tiledb.Dim(name="d", domain=(0, 9), dtype=np.int32)) + attrs = [ + tiledb.Attr(name="integer", nullable=True, dtype=int), + tiledb.Attr(name="float", nullable=True, dtype=float), + ] + schema = tiledb.ArraySchema(domain=dom, attrs=attrs, sparse=sparse) + tiledb.Array.create(path, schema) + + # set index 5 and 7 to be null + data = np.random.rand(10) + data[5], data[7] = np.nan, np.nan + + # write data + with tiledb.open(path, "w") as A: + if sparse: + A[np.arange(0, 10)] = {"integer": data, "float": data} + else: + A[:] = {"integer": data, "float": data} + + with tiledb.open(path, "r") as A: + agg = A.query().agg + + result = agg("null_count") + assert result[0]["integer"]["null_count"] == 0 + assert result[:6]["integer"]["null_count"] == 1 + assert result[5:8]["integer"]["null_count"] == 2 + assert result[5]["integer"]["null_count"] == 1 + assert result[6:]["integer"]["null_count"] == 1 + assert result[7]["integer"]["null_count"] == 1 + assert result[:]["integer"]["null_count"] == 2 + + assert result[0]["float"]["null_count"] == 0 + assert result[:6]["float"]["null_count"] == 1 + assert result[5:8]["float"]["null_count"] == 2 + assert result[5]["float"]["null_count"] == 1 + assert result[6:]["float"]["null_count"] == 1 + assert result[7]["float"]["null_count"] == 1 + assert result[:]["float"]["null_count"] == 2 + + all_aggregates = ("count", "sum", "min", "max", "mean", "null_count") + + actual = agg({"integer": all_aggregates, "float": all_aggregates})[:] + + expected = A[:]["integer"] + expected_no_null = A[:]["integer"].compressed() + assert actual["integer"]["sum"] == sum(expected_no_null) + assert actual["integer"]["min"] == min(expected_no_null) + assert actual["integer"]["max"] == max(expected_no_null) + assert actual["integer"]["mean"] == sum(expected_no_null) / len( + expected_no_null + ) + assert actual["integer"]["count"] == len(expected) + assert actual["integer"]["null_count"] == np.count_nonzero(expected.mask) + + expected = A[:]["float"] + expected_no_null = A[:]["float"].compressed() + assert actual["float"]["sum"] == sum(expected_no_null) + assert actual["float"]["min"] == min(expected_no_null) + assert actual["float"]["max"] == max(expected_no_null) + assert actual["float"]["mean"] == sum(expected_no_null) / len( + expected_no_null + ) + assert actual["float"]["count"] == len(expected) + assert actual["float"]["null_count"] == np.count_nonzero(expected.mask) + + # no valid values + actual = agg({"integer": all_aggregates, "float": all_aggregates})[5] + + assert actual["integer"]["sum"] is None + assert actual["integer"]["min"] is None + assert actual["integer"]["max"] is None + assert actual["integer"]["mean"] is None + assert actual["integer"]["count"] == 1 + assert actual["integer"]["null_count"] == 1 + + assert np.isnan(actual["float"]["sum"]) + assert np.isnan(actual["float"]["min"]) + assert np.isnan(actual["float"]["max"]) + assert np.isnan(actual["float"]["mean"]) + assert actual["float"]["count"] == 1 + assert actual["float"]["null_count"] == 1 + + @pytest.mark.parametrize("sparse", [True, False]) + def test_empty(self, sparse): + path = self.path("test_empty_sparse") + dom = tiledb.Domain(tiledb.Dim(name="d", domain=(0, 9), dtype=np.int32)) + attrs = [ + tiledb.Attr(name="integer", nullable=True, dtype=int), + tiledb.Attr(name="float", nullable=True, dtype=float), + ] + schema = tiledb.ArraySchema(domain=dom, attrs=attrs, sparse=sparse) + tiledb.Array.create(path, schema) + + data = np.random.rand(5) + + # write data + with tiledb.open(path, "w") as A: + if sparse: + A[np.arange(0, 5)] = {"integer": data, "float": data} + else: + A[:5] = {"integer": data, "float": data} + + with tiledb.open(path, "r") as A: + invalid_aggregates = ("sum", "min", "max", "mean") + actual = A.query().agg(invalid_aggregates)[6:] + + assert actual["integer"]["sum"] is None + assert actual["integer"]["min"] is None + assert actual["integer"]["max"] is None + assert actual["integer"]["mean"] is None + + assert np.isnan(actual["float"]["sum"]) + assert np.isnan(actual["float"]["min"]) + assert np.isnan(actual["float"]["max"]) + assert np.isnan(actual["float"]["mean"]) + + def test_multiple_attrs(self): + path = self.path("test_multiple_attrs") + dom = tiledb.Domain(tiledb.Dim(name="d", domain=(0, 9), dtype=np.int32)) + attrs = [ + tiledb.Attr(name="integer", dtype=int), + tiledb.Attr(name="float", dtype=float), + tiledb.Attr(name="string", dtype=str), + ] + schema = tiledb.ArraySchema(domain=dom, attrs=attrs, sparse=True) + tiledb.Array.create(path, schema) + + with tiledb.open(path, "w") as A: + A[np.arange(0, 10)] = { + "integer": np.random.randint(1, 10, size=10), + "float": np.random.randint(1, 10, size=10), + "string": np.random.randint(1, 10, size=10).astype(str), + } + + with tiledb.open(path, "r") as A: + actual = A.query()[:] + agg = A.query().agg + + assert agg({"string": "count"})[:] == len(actual["string"]) + invalid_aggregates = ("sum", "min", "max", "mean") + for invalid_agg in invalid_aggregates: + with pytest.raises(tiledb.TileDBError): + agg({"string": invalid_agg})[:] + + result = agg("count")[:] + assert result["integer"]["count"] == len(actual["integer"]) + assert result["float"]["count"] == len(actual["float"]) + assert result["string"]["count"] == len(actual["string"]) + + with pytest.raises(tiledb.TileDBError): + agg("sum")[:] + + result = agg({"integer": "sum", "float": "sum"})[:] + assert "string" not in result + assert result["integer"]["sum"] == sum(actual["integer"]) + assert result["float"]["sum"] == sum(actual["float"]) + + result = agg( + { + "string": ("count",), + "integer": "sum", + "float": ["max", "min", "sum", "mean"], + } + )[:] + assert result["string"]["count"] == len(actual["string"]) + assert result["integer"]["sum"] == sum(actual["integer"]) + assert result["float"]["max"] == max(actual["float"]) + assert result["float"]["min"] == min(actual["float"]) + assert result["float"]["sum"] == sum(actual["float"]) + assert result["float"]["mean"] == sum(actual["float"]) / len( + actual["float"] + )