Skip to content

Commit

Permalink
Add Sync Transaction support (#17)
Browse files Browse the repository at this point in the history
* [fix] Rework code according to Valgrind's recommendations

* Clear code

* Avoid PEP's warnings

* Update code

* Support sync transactions

* Rewrite query results wrapper

* Properly use NewItem method. Reorganize code

* Implement transaction py API

* Add transaction py-tests

* Update Readme.md

* [fix] Fix build pyreindexer with reindexer v.4.15

* Try fix MacOS tests

* Correct style

* Update tests

* Reorganize README

* Correct test_item

* Update transaction test

* Simplify code

* Style changes

* Add commit_with_count()

* Add commit_with_count(). Fix code and check in test

* Upgrade version

* Fix transaction tests

* Fix build

* Update transaction tests

* Update transaction tests. Fix build

* Review changes

---------

Co-authored-by: Alexander.A.Utkin <[email protected]>
  • Loading branch information
AlexAUtkin and Alexander.A.Utkin authored Dec 6, 2024
1 parent 7e83f55 commit f7220cf
Show file tree
Hide file tree
Showing 20 changed files with 1,132 additions and 198 deletions.
238 changes: 195 additions & 43 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyreindexer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ endif()
enable_testing()

set(PY_MIN_VERSION 3.6)
set(RX_MIN_VERSION 3.24.0)
set(RX_MIN_VERSION 3.30.0)
find_package(PythonInterp ${PY_MIN_VERSION})
find_package(PythonLibs ${PY_MIN_VERSION})
find_package(reindexer CONFIG ${RX_MIN_VERSION})
Expand Down
1 change: 0 additions & 1 deletion pyreindexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@
"""

from pyreindexer.rx_connector import RxConnector
# from pyreindexer.index_definition import IndexDefinition
38 changes: 35 additions & 3 deletions pyreindexer/example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def create_index_example(db, namespace):

try:
db.index_add(namespace, index_definition)
except Exception:
except (Exception,):
db.index_drop(namespace, 'id')
db.index_add(namespace, index_definition)

Expand All @@ -46,7 +46,7 @@ def create_items_example(db, namespace):
items_count = 10

for i in range(0, items_count):
item = {'id': i + 1, 'name': 'item_' + str(i % 2)}
item = {'id': i + 1, 'name': 'item_' + str(i % 2), 'value': 'check'}
db.item_upsert(namespace, item)


Expand All @@ -56,9 +56,37 @@ def select_item_query_example(db, namespace):
return db.select("SELECT * FROM " + namespace + " WHERE name='" + item_name_for_lookup + "'")


def transaction_example(db, namespace, items_in_base):
transaction = db.new_transaction(namespace)

items_count = len(items_in_base)

# delete first few items
for i in range(int(items_count/2)):
transaction.delete(items_in_base[i])

# update last one item, overwrite field 'value'
item = items_in_base[items_count - 1]
item['value'] = 'the transaction was here'
transaction.update(item)

# stop transaction and commit changes to namespace
transaction.commit()

# print records from namespace
selected_items_tr = select_item_query_example(db, namespace)

res_count = selected_items_tr.count()
print('Transaction results count: ', res_count)

# disposable QueryResults iterator
for item in selected_items_tr:
print('Item: ', item)


def rx_example():
db = RxConnector('builtin:///tmp/pyrx')
#db = RxConnector('cproto://127.0.0.1:6534/pyrx')
# db = RxConnector('cproto://127.0.0.1:6534/pyrx')

namespace = 'test_table'

Expand All @@ -75,13 +103,17 @@ def rx_example():
print('Results count: ', res_count)

# disposable QueryResults iterator
items_copy = []
for item in selected_items:
items_copy.append(item)
print('Item: ', item)

# won't be iterated again
for item in selected_items:
print('Item: ', item)

transaction_example(db, namespace, items_copy)

db.close()


Expand Down
4 changes: 2 additions & 2 deletions pyreindexer/index_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class IndexDefinition(dict):
`none`, `ascii`, `utf8`, `numeric`, `custom`.
sort_order_letters (str): Order for a sort sequence for a custom collate mode.
config (dict): A config for a fulltext engine.
[More](https://github.com/Restream/reindexer/blob/master/fulltext.md) .
[More](https://github.com/Restream/reindexer/blob/master/fulltext.md).
"""

def __getitem__(self, attr):
Expand All @@ -37,7 +37,7 @@ def __setitem__(self, attr, value):
super(IndexDefinition, self).update({attr: value})
return self

def update(self, dict_part=None):
def update(self, *args, **kwargs):
raise NotImplementedError(
'Bulk update is not implemented for IndexDefinition instance')

Expand Down
48 changes: 36 additions & 12 deletions pyreindexer/lib/include/queryresults_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,49 @@ using QueryResultsT = reindexer::QueryResults;

class QueryResultsWrapper {
public:
QueryResultsWrapper() : qresPtr(kResultsJson) {}
size_t Count() const { return qresPtr.Count(); }
void GetItemJSON(reindexer::WrSerializer& wrser, bool withHdrLen) { itPtr.GetJSON(wrser, withHdrLen); }
void Next() {
QueryResultsWrapper(DBInterface* db) : db_{db}, qres_{kResultsJson} {
assert(db_);
}

void Wrap(QueryResultsT&& qres) {
qres_ = std::move(qres);
it_ = qres_.begin();
wrap_ = true;
}

Error Select(const std::string& query) {
return db_->Select(query, *this);
}

size_t Count() const {
assert(wrap_);
return qres_.Count();
}

void GetItemJSON(reindexer::WrSerializer& wrser, bool withHdrLen) {
assert(wrap_);
it_.GetJSON(wrser, withHdrLen);
}

void Next() {
assert(wrap_);
db_->FetchResults(*this);
}

const std::vector<reindexer::AggregationResult>& GetAggregationResults() & { return qresPtr.GetAggregationResults(); }
void FetchResults() {
assert(wrap_);
// when results are fetched iterator closes and frees a memory of results buffer of Reindexer
++it_;
}

const std::vector<reindexer::AggregationResult>& GetAggregationResults() & { return qres_.GetAggregationResults(); }
const std::vector<reindexer::AggregationResult>& GetAggregationResults() && = delete;

private:
friend DBInterface;

void iterInit() { itPtr = qresPtr.begin(); }

DBInterface* db_ = nullptr;
QueryResultsT qresPtr;
QueryResultsT::Iterator itPtr;
DBInterface* db_{nullptr};
QueryResultsT qres_;
QueryResultsT::Iterator it_;
bool wrap_{false};
};

} // namespace pyreindexer
66 changes: 66 additions & 0 deletions pyreindexer/lib/include/transaction_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#pragma once

#include "reindexerinterface.h"

#ifdef PYREINDEXER_CPROTO
#include "client/cororeindexer.h"
#else
#include "core/reindexer.h"
#endif

namespace pyreindexer {

#ifdef PYREINDEXER_CPROTO
using DBInterface = ReindexerInterface<reindexer::client::CoroReindexer>;
using TransactionT = reindexer::client::CoroTransaction;
using QueryResultsT = reindexer::client::CoroQueryResults;
using ItemT = reindexer::client::Item;
#else
using DBInterface = ReindexerInterface<reindexer::Reindexer>;
using TransactionT = reindexer::Transaction;
using QueryResultsT = reindexer::QueryResults;
using ItemT = reindexer::Item;
#endif

class TransactionWrapper {
public:
TransactionWrapper(DBInterface* db) : db_{db} {
assert(db_);
}

void Wrap(TransactionT&& transaction) {
transaction_ = std::move(transaction);
wrap_ = true;
}

Error Start(std::string_view ns) {
return db_->StartTransaction(ns, *this);
}

ItemT NewItem() {
assert(wrap_);
return db_->NewItem(transaction_);
}

Error Modify(ItemT&& item, ItemModifyMode mode) {
assert(wrap_);
return db_->Modify(transaction_, std::move(item), mode);
}

Error Commit(size_t& count) {
assert(wrap_);
return db_->CommitTransaction(transaction_, count);
}

Error Rollback() {
assert(wrap_);
return db_->RollbackTransaction(transaction_);
}

private:
DBInterface* db_{nullptr};
TransactionT transaction_;
bool wrap_{false};
};

} // namespace pyreindexer
Loading

0 comments on commit f7220cf

Please sign in to comment.