diff --git a/.github/workflows/build-wheels.yml b/.github/workflows/build-wheels.yml index 98e44a915..85623bcef 100644 --- a/.github/workflows/build-wheels.yml +++ b/.github/workflows/build-wheels.yml @@ -23,7 +23,7 @@ jobs: - name: Get new version and build number run: | - pip install python-semantic-release + pip install python-semantic-release==7.* latest_tag=$(cat VERSION) echo "Bumping off of latest tag $latest_tag" new_tag=$(pysemver bump prerelease $latest_tag) diff --git a/VERSION b/VERSION index 80cb7d187..8901e47b0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -12.0.0-rc.5 +12.0.0-rc.11 diff --git a/aerospike-stubs/aerospike.pyi b/aerospike-stubs/aerospike.pyi index 2146eabad..81155772b 100644 --- a/aerospike-stubs/aerospike.pyi +++ b/aerospike-stubs/aerospike.pyi @@ -326,6 +326,7 @@ class Client: def batch_get_ops(self, keys: list, ops: list, policy: dict) -> list: ... def batch_operate(self, keys: list, ops: list, policy_batch: dict = ..., policy_batch_write: dict = ...) -> BatchRecords: ... def batch_remove(self, keys: list, policy_batch: dict = ..., policy_batch_remove: dict = ...) -> BatchRecords: ... + def batch_read(self, keys: list, bins: list[str] = ..., policy_batch: dict = ...) -> BatchRecords: ... def batch_write(self, batch_records: BatchRecords, policy_batch: dict = ...) -> BatchRecords: ... def close(self) -> None: ... def connect(self, username: str = ..., password: str = ...) -> Client: ... diff --git a/doc/client.rst b/doc/client.rst index d95f34dd4..2db703796 100755 --- a/doc/client.rst +++ b/doc/client.rst @@ -240,6 +240,9 @@ Batch Operations .. include:: examples/get_many.py :code: python + .. deprecated:: 12.0.0 + Use :meth:`batch_read` instead. + .. method:: exists_many(keys[, policy: dict]) -> [ (key, meta)] Batch-read metadata for multiple keys. @@ -255,6 +258,9 @@ Batch Operations .. include:: examples/exists_many.py :code: python + .. deprecated:: 12.0.0 + Use :meth:`batch_read` instead. + .. method:: select_many(keys, bins: list[, policy: dict]) -> [(key, meta, bins), ...]} Batch-read specific bins from multiple records. @@ -270,6 +276,9 @@ Batch Operations .. include:: examples/select_many.py :code: python + .. deprecated:: 12.0.0 + Use :meth:`batch_read` instead. + .. method:: batch_get_ops(keys, ops, policy: dict) -> [ (key, meta, bins)] Batch-read multiple records, and return them as a :class:`list`. @@ -328,6 +337,27 @@ Batch Operations .. seealso:: More information about the \ batch helpers :ref:`aerospike_operation_helpers.batch` + .. method:: batch_read(keys: list, [bins: list], [policy_batch: dict]) -> BatchRecords + + Read multiple records. + + If a list of bin names is not provided, return all the bins for each record. + + If a list of bin names is provided, return only these bins for the given list of records. + + If an empty list of bin names is provided, only the metadata of each record will be returned. + Each ``BatchRecord.record`` in ``BatchRecords.batch_records`` will only be a 2-tuple ``(key, meta)``. + + :param list keys: The key tuples of the records to fetch. + :param list[str] bins: List of bin names to fetch for each record. + :param dict policy_batch: See :ref:`aerospike_batch_policies`. + + :return: an instance of :class:`BatchRecords `. + + :raises: A subclass of :exc:`~aerospike.exception.AerospikeError`. + + .. note:: Requires server version >= 6.0.0. + .. method:: batch_operate(keys: list, ops: list, [policy_batch: dict], [policy_batch_write: dict]) -> BatchRecords Perform the same read/write transactions on multiple keys. diff --git a/setup.py b/setup.py index 5a6a29e79..67067bf11 100644 --- a/setup.py +++ b/setup.py @@ -320,7 +320,8 @@ def clean(): 'src/main/client/batch_write.c', 'src/main/client/batch_operate.c', 'src/main/client/batch_remove.c', - 'src/main/client/batch_apply.c' + 'src/main/client/batch_apply.c', + 'src/main/client/batch_read.c' ], # Compile diff --git a/src/include/client.h b/src/include/client.h index 9afaccd50..ae89f5fb2 100644 --- a/src/include/client.h +++ b/src/include/client.h @@ -493,6 +493,15 @@ PyObject *AerospikeClient_BatchWrite(AerospikeClient *self, PyObject *args, PyObject *AerospikeClient_Batch_Operate(AerospikeClient *self, PyObject *args, PyObject *kwds); +/** + * Perform reads on multiple keys. + * + * client.batch_read([keys], [bins], policy_batch) + * + */ +PyObject *AerospikeClient_BatchRead(AerospikeClient *self, PyObject *args, + PyObject *kwds); + /** * Remove multiple records by key. * Requires server version 6.0+ diff --git a/src/include/conversions.h b/src/include/conversions.h index 40c8e144a..1b1ab9d7c 100644 --- a/src/include/conversions.h +++ b/src/include/conversions.h @@ -225,4 +225,5 @@ as_status as_partition_status_to_pyobject( as_status as_batch_result_to_BatchRecord(AerospikeClient *self, as_error *err, as_batch_result *bres, - PyObject *py_batch_record); + PyObject *py_batch_record, + bool checking_if_records_exist); diff --git a/src/main/aerospike.c b/src/main/aerospike.c index 4e3c11f31..fbf923f1e 100644 --- a/src/main/aerospike.c +++ b/src/main/aerospike.c @@ -153,7 +153,7 @@ static int Aerospike_Clear(PyObject *aerospike) PyMODINIT_FUNC PyInit_aerospike(void) { - const char version[] = "12.0.0-rc.5"; + const char version[] = "12.0.0-rc.11"; // Makes things "thread-safe" Py_Initialize(); int i = 0; diff --git a/src/main/client/batch_apply.c b/src/main/client/batch_apply.c index ef9400dae..b915387b5 100644 --- a/src/main/client/batch_apply.c +++ b/src/main/client/batch_apply.c @@ -77,8 +77,8 @@ static bool batch_apply_cb(const as_batch_result *results, uint32_t n, } Py_DECREF(py_key); - as_batch_result_to_BatchRecord(data->client, &err, res, - py_batch_record); + as_batch_result_to_BatchRecord(data->client, &err, res, py_batch_record, + false); if (err.code != AEROSPIKE_OK) { as_log_error( "as_batch_result_to_BatchRecord failed at results index: %d", diff --git a/src/main/client/batch_operate.c b/src/main/client/batch_operate.c index c5ae47415..787fdd635 100644 --- a/src/main/client/batch_operate.c +++ b/src/main/client/batch_operate.c @@ -79,8 +79,8 @@ static bool batch_operate_cb(const as_batch_result *results, uint32_t n, } Py_DECREF(py_key); - as_batch_result_to_BatchRecord(data->client, &err, res, - py_batch_record); + as_batch_result_to_BatchRecord(data->client, &err, res, py_batch_record, + false); if (err.code != AEROSPIKE_OK) { as_log_error( "as_batch_result_to_BatchRecord failed at results index: %d", diff --git a/src/main/client/batch_read.c b/src/main/client/batch_read.c new file mode 100644 index 000000000..cf6c601a1 --- /dev/null +++ b/src/main/client/batch_read.c @@ -0,0 +1,303 @@ +#include +#include +#include +#include +#include +#include + +#include "types.h" +#include "policy.h" +#include "conversions.h" +#include "exceptions.h" + +// Struct for Python User-Data for the Callback +typedef struct { + PyObject *py_results; + PyObject *batch_records_module; + PyObject *func_name; + AerospikeClient *client; + bool checking_if_records_exist; +} LocalData; + +static bool batch_read_cb(const as_batch_result *results, uint32_t n, + void *udata) +{ + // Extract callback user-data + LocalData *data = (LocalData *)udata; + as_error err; + as_error_init(&err); + PyObject *py_key = NULL; + PyObject *py_batch_record = NULL; + bool success = true; + + // Lock Python State + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + + for (uint32_t i = 0; i < n; i++) { + + as_batch_read *res = NULL; + res = (as_batch_read *)&results[i]; + + // NOTE these conversions shouldn't go wrong but if they do, return + if (key_to_pyobject(&err, res->key, &py_key) != AEROSPIKE_OK) { + as_log_error("unable to convert res->key at results index: %d", i); + success = false; + break; + } + + // Create BatchRecord instance + py_batch_record = PyObject_CallMethodObjArgs( + data->batch_records_module, data->func_name, py_key, NULL); + if (py_batch_record == NULL) { + as_log_error("unable to instance BatchRecord at results index: %d", + i); + success = false; + Py_DECREF(py_key); + break; + } + Py_DECREF(py_key); + + // Initialize BatchRecord instance + as_batch_result_to_BatchRecord(data->client, &err, res, py_batch_record, + data->checking_if_records_exist); + if (err.code != AEROSPIKE_OK) { + as_log_error( + "as_batch_result_to_BatchRecord failed at results index: %d", + i); + success = false; + Py_DECREF(py_batch_record); + break; + } + + PyList_Append(data->py_results, py_batch_record); + Py_DECREF(py_batch_record); + } + + PyGILState_Release(gstate); + return success; +} + +PyObject *AerospikeClient_BatchRead(AerospikeClient *self, PyObject *args, + PyObject *kwds) +{ + PyObject *py_keys = NULL; + PyObject *py_bins = NULL; + PyObject *py_policy_batch = NULL; + static char *kwlist[] = {"keys", "bins", "policy", NULL}; + if (PyArg_ParseTupleAndKeywords(args, kwds, "O|OO:batch_read", kwlist, + &py_keys, &py_bins, + &py_policy_batch) == false) { + return NULL; + } + + as_error err; + as_error_init(&err); + + PyObject *br_instance = NULL; + + // required arg so don't need to check for NULL + if (!PyList_Check(py_keys)) { + as_error_update(&err, AEROSPIKE_ERR_PARAM, + "keys should be a list of aerospike key tuples"); + goto CLEANUP1; + } + + as_vector tmp_keys; + Py_ssize_t keys_size = PyList_Size(py_keys); + as_vector_init(&tmp_keys, sizeof(as_key), keys_size); + as_vector *tmp_keys_p = &tmp_keys; + + if (!self || !self->as) { + as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid aerospike object"); + goto CLEANUP2; + } + + if (!self->is_conn_16) { + as_error_update(&err, AEROSPIKE_ERR_CLUSTER, + "No connection to aerospike cluster"); + goto CLEANUP2; + } + + uint64_t processed_key_count = 0; + for (int i = 0; i < keys_size; i++) { + PyObject *py_key = PyList_GetItem(py_keys, i); + as_key *tmp_key = (as_key *)as_vector_get(&tmp_keys, i); + + Py_INCREF(py_key); + if (!PyTuple_Check(py_key)) { + as_error_update(&err, AEROSPIKE_ERR_PARAM, + "key should be an aerospike key tuple"); + Py_DECREF(py_key); + goto CLEANUP2; + } + + pyobject_to_key(&err, py_key, tmp_key); + if (err.code != AEROSPIKE_OK) { + as_error_update(&err, AEROSPIKE_ERR_PARAM, + "failed to convert key at index: %d", i); + Py_DECREF(py_key); + goto CLEANUP2; + } + + Py_DECREF(py_key); + processed_key_count++; + } + + as_batch batch; + as_batch_init(&batch, processed_key_count); + memcpy(batch.keys.entries, tmp_keys.list, + sizeof(as_key) * processed_key_count); + + as_policy_batch policy_batch; + as_policy_batch *policy_batch_p = NULL; + + // For expressions conversion. + as_exp batch_exp_list; + as_exp *batch_exp_list_p = NULL; + + if (py_policy_batch) { + if (pyobject_to_policy_batch( + self, &err, py_policy_batch, &policy_batch, &policy_batch_p, + &self->as->config.policies.batch, &batch_exp_list, + &batch_exp_list_p) != AEROSPIKE_OK) { + goto CLEANUP3; + } + } + + // import batch_records helper + PyObject *br_module = NULL; + PyObject *sys_modules = PyImport_GetModuleDict(); + + Py_INCREF(sys_modules); + if (PyMapping_HasKeyString(sys_modules, + "aerospike_helpers.batch.records")) { + br_module = PyMapping_GetItemString(sys_modules, + "aerospike_helpers.batch.records"); + } + else { + br_module = PyImport_ImportModule("aerospike_helpers.batch.records"); + } + Py_DECREF(sys_modules); + + if (!br_module) { + as_error_update(&err, AEROSPIKE_ERR_CLIENT, + "Unable to load batch_records module"); + goto CLEANUP3; + } + + PyObject *obj_name = PyUnicode_FromString("BatchRecords"); + PyObject *res_list = PyList_New(0); + br_instance = + PyObject_CallMethodObjArgs(br_module, obj_name, res_list, NULL); + + Py_DECREF(obj_name); + Py_DECREF(res_list); + + if (!br_instance) { + as_error_update(&err, AEROSPIKE_ERR_CLIENT, + "Unable to instance BatchRecords"); + goto CLEANUP4; + } + + // Create and initialize callback user-data + LocalData data; + // Used to decode record bins + data.client = self; + // Used to append BatchRecord instances to the BatchRecords object in this function + data.py_results = PyObject_GetAttrString(br_instance, "batch_records"); + // Used to create a new BatchRecord instance in the callback function + data.batch_records_module = br_module; + data.func_name = PyUnicode_FromString("BatchRecord"); + data.checking_if_records_exist = false; + + Py_ssize_t bin_count = 0; + const char **filter_bins = NULL; + + // Parse list of bins + if (py_bins != NULL) { + if (!PyList_Check(py_bins)) { + as_error_update(&err, AEROSPIKE_ERR_PARAM, + "Bins argument should be a list."); + goto CLEANUP4; + } + + bin_count = PyList_Size(py_bins); + if (bin_count == 0) { + data.checking_if_records_exist = true; + } + else { + filter_bins = (const char **)malloc(sizeof(char *) * bin_count); + + for (Py_ssize_t i = 0; i < bin_count; i++) { + PyObject *py_bin = PyList_GetItem(py_bins, i); + if (PyUnicode_Check(py_bin)) { + filter_bins[i] = PyUnicode_AsUTF8(py_bin); + } + else { + as_error_update( + &err, AEROSPIKE_ERR_PARAM, + "Bin name should be a string or unicode string."); + goto CLEANUP5; + } + } + } + } + + Py_BEGIN_ALLOW_THREADS + + if (py_bins == NULL) { + aerospike_batch_get(self->as, &err, policy_batch_p, &batch, + batch_read_cb, &data); + } + else if (bin_count == 0) { + aerospike_batch_exists(self->as, &err, policy_batch_p, &batch, + batch_read_cb, &data); + } + else { + aerospike_batch_get_bins(self->as, &err, policy_batch_p, &batch, + filter_bins, bin_count, batch_read_cb, &data); + } + + Py_END_ALLOW_THREADS + + PyObject *py_br_res = PyLong_FromLong((long)err.code); + PyObject_SetAttrString(br_instance, FIELD_NAME_BATCH_RESULT, py_br_res); + Py_DECREF(py_br_res); + + as_error_reset(&err); + +CLEANUP5: + + free(filter_bins); + +CLEANUP4: + + Py_DECREF(br_module); + + Py_DECREF(data.py_results); + Py_DECREF(data.func_name); + +CLEANUP3: + + as_batch_destroy(&batch); + + if (batch_exp_list_p) { + as_exp_destroy(batch_exp_list_p); + } + +CLEANUP2: + + if (tmp_keys_p) { + as_vector_destroy(tmp_keys_p); + } + +CLEANUP1: + + if (err.code != AEROSPIKE_OK) { + raise_exception(&err); + return NULL; + } + + return br_instance; +} diff --git a/src/main/client/batch_remove.c b/src/main/client/batch_remove.c index aaf8571d5..2c5e68afe 100644 --- a/src/main/client/batch_remove.c +++ b/src/main/client/batch_remove.c @@ -77,8 +77,8 @@ static bool batch_remove_cb(const as_batch_result *results, uint32_t n, } Py_DECREF(py_key); - as_batch_result_to_BatchRecord(data->client, &err, res, - py_batch_record); + as_batch_result_to_BatchRecord(data->client, &err, res, py_batch_record, + false); if (err.code != AEROSPIKE_OK) { as_log_error( "as_batch_result_to_BatchRecord failed at results index: %d", diff --git a/src/main/client/type.c b/src/main/client/type.c index 10da99c43..4f964fdf7 100644 --- a/src/main/client/type.c +++ b/src/main/client/type.c @@ -521,6 +521,8 @@ static PyMethodDef AerospikeClient_Type_Methods[] = { METH_VARARGS | METH_KEYWORDS, batch_remove_doc}, {"batch_apply", (PyCFunction)AerospikeClient_Batch_Apply, METH_VARARGS | METH_KEYWORDS, batch_apply_doc}, + {"batch_read", (PyCFunction)AerospikeClient_BatchRead, + METH_VARARGS | METH_KEYWORDS, "Read multiple keys."}, // TRUNCATE OPERATIONS {"truncate", (PyCFunction)AerospikeClient_Truncate, diff --git a/src/main/conversions.c b/src/main/conversions.c index ea35f745f..c71bbf641 100644 --- a/src/main/conversions.c +++ b/src/main/conversions.c @@ -2630,9 +2630,13 @@ as_status get_int_from_py_int(as_error *err, PyObject *py_long, return AEROSPIKE_OK; } +// checking_if_records_exist: +// false if we want to get the record metadata and bins +// true if we only care about the record's metadata as_status as_batch_result_to_BatchRecord(AerospikeClient *self, as_error *err, as_batch_result *bres, - PyObject *py_batch_record) + PyObject *py_batch_record, + bool checking_if_records_exist) { as_status *result_code = &(bres->result); as_record *result_rec = &(bres->record); @@ -2649,7 +2653,20 @@ as_status as_batch_result_to_BatchRecord(AerospikeClient *self, as_error *err, if (*result_code == AEROSPIKE_OK) { PyObject *rec = NULL; - record_to_pyobject(self, err, result_rec, bres->key, &rec); + if (!checking_if_records_exist) { + record_to_pyobject(self, err, result_rec, bres->key, &rec); + } + else { + PyObject *py_result_key = NULL; + PyObject *py_result_meta = NULL; + + key_to_pyobject(err, bres->key, &py_result_key); + metadata_to_pyobject(err, &(bres->record), &py_result_meta); + + rec = PyTuple_New(2); + PyTuple_SetItem(rec, 0, py_result_key); + PyTuple_SetItem(rec, 1, py_result_meta); + } PyObject_SetAttrString(py_batch_record, FIELD_NAME_BATCH_RECORD, rec); Py_DECREF(rec); } diff --git a/test/new_tests/as_errors.py b/test/new_tests/as_errors.py index 78d930275..c12574049 100644 --- a/test/new_tests/as_errors.py +++ b/test/new_tests/as_errors.py @@ -214,6 +214,8 @@ AEROSPIKE_ERR_FAIL_ELEMENT_EXISTS = 24 +AEROSPIKE_FILTERED_OUT = 27 + # # There are no more records left for query. # diff --git a/test/new_tests/test_batch_read.py b/test/new_tests/test_batch_read.py new file mode 100644 index 000000000..4a41af323 --- /dev/null +++ b/test/new_tests/test_batch_read.py @@ -0,0 +1,136 @@ +import pytest + +from aerospike_helpers.batch.records import BatchRecords +from aerospike_helpers.expressions import base as exp +from aerospike import exception as e + +from .test_base_class import TestBaseClass +from . import as_errors + + +class TestBatchRead(TestBaseClass): + @pytest.fixture(autouse=True) + def setup(self, request, connection_with_config_funcs): + as_connection = connection_with_config_funcs + + if self.server_version < [6, 0]: + pytest.mark.xfail(reason="Servers older than 6.0 do not support batch read.") + pytest.xfail() + + self.test_ns = "test" + self.test_set = "demo" + self.keys = [] + self.keys_to_expected_bins = {} + self.batch_size = 5 + + for i in range(self.batch_size): + key = ("test", "demo", i) + rec = { + "count": i, + "ilist_bin": [ + i, + 1, + 2, + 6, + ], + "imap_bin": { + 1: 1, + 2: 2, + 3: 6, + }, + } + as_connection.put(key, rec) + self.keys.append(key) + self.keys_to_expected_bins[key] = rec + + def teardown(): + for i in range(self.batch_size): + key = ("test", "demo", i) + as_connection.remove(key) + + request.addfinalizer(teardown) + + def test_batch_read_with_policy(self): + # No record will satisfy this expression condition + expr = exp.Eq(exp.IntBin("count"), 99).compile() + res: BatchRecords = self.as_connection.batch_read(self.keys, policy={"expressions": expr}) + assert res.result == 0 + for i, batch_rec in enumerate(res.batch_records): + assert batch_rec.key[:3] == self.keys[i] # checking key + assert batch_rec.record is None + assert batch_rec.result == as_errors.AEROSPIKE_FILTERED_OUT + + def test_batch_read_all_bins(self): + res: BatchRecords = self.as_connection.batch_read(self.keys) + + for i, batch_rec in enumerate(res.batch_records): + assert batch_rec.result == 0 + assert batch_rec.key[:3] == self.keys[i] # checking key + assert batch_rec.record[0][:3] == self.keys[i] # checking key in record + assert batch_rec.record[2] == self.keys_to_expected_bins[self.keys[i]] + + @pytest.mark.parametrize( + "bins", + [ + ["count"], + ["count", "imap_bin"] + ] + ) + def test_batch_read_selected_bins(self, bins): + res: BatchRecords = self.as_connection.batch_read(self.keys, bins) + + for i, batch_rec in enumerate(res.batch_records): + key = self.keys[i] + assert batch_rec.result == 0 + assert batch_rec.key[:3] == key # checking key + expected_record = {bin_name: bin_value for bin_name, bin_value in + self.keys_to_expected_bins[key].items() if bin_name in bins} + assert batch_rec.record[0][:3] == key # checking key in record + assert batch_rec.record[2] == expected_record + + def test_batch_read_no_bins(self): + res: BatchRecords = self.as_connection.batch_read(self.keys, []) + + for i, batch_rec in enumerate(res.batch_records): + key = self.keys[i] + assert batch_rec.result == 0 + assert batch_rec.key[:3] == key # checking key + # Only key and metadata should be returned with no bins + assert len(batch_rec.record) == 2 + assert batch_rec.record[0][:3] == key # checking key in record + assert type(batch_rec.record[1]) == dict + + # Negative tests + + def test_batch_read_invalid_args(self): + with pytest.raises(TypeError): + self.as_connection.batch_read() + + def test_batch_read_invalid_key_list(self): + with pytest.raises(e.ParamError) as excinfo: + self.as_connection.batch_read(1) + + assert excinfo.value.msg == "keys should be a list of aerospike key tuples" + + @pytest.mark.parametrize("invalid_key, err_msg", [ + (1, "key should be an aerospike key tuple"), + (("test", "demo", 1, 2, 3), "failed to convert key at index: 0"), + ]) + def test_batch_read_invalid_key(self, invalid_key, err_msg): + with pytest.raises(e.ParamError) as excinfo: + keys = [invalid_key] + self.as_connection.batch_read(keys) + + assert excinfo.value.msg == err_msg + + def test_batch_read_invalid_bin_list(self): + with pytest.raises(e.ParamError) as excinfo: + self.as_connection.batch_read(self.keys, 1) + + assert excinfo.value.msg == "Bins argument should be a list." + + def test_batch_read_invalid_bin(self): + with pytest.raises(e.ParamError) as excinfo: + self.as_connection.batch_read(self.keys, [1]) + + assert excinfo.value.msg == "Bin name should be a string or unicode string."