From 1024ab2dca6834efa8e3c6d55c1cd8c62eeacee1 Mon Sep 17 00:00:00 2001 From: reindexer-bot <@> Date: Sat, 18 Nov 2023 23:20:07 +0000 Subject: [PATCH] [ref] Change builtin storages path --- cpp_src/CMakeLists.txt | 27 +-- cpp_src/client/rpcclient.cc | 48 ++++- cpp_src/client/rpcclient.h | 2 + .../test/test_storage_compatibility.sh | 195 ++++++++++++++++++ .../cmd/reindexer_tool/commandsexecutor.cc | 78 +++---- cpp_src/core/defnsconfigs.h | 1 + cpp_src/core/ft/areaholder.h | 3 +- cpp_src/core/ft/ft_fast/selecter.cc | 31 +-- cpp_src/core/ft/ft_fuzzy/merger/basemerger.cc | 2 +- cpp_src/core/ft/idrelset.cc | 6 - cpp_src/core/ft/idrelset.h | 12 +- cpp_src/core/idset.h | 2 +- cpp_src/core/index/indextext/fastindextext.cc | 2 +- cpp_src/core/index/keyentry.h | 8 +- cpp_src/core/index/rtree/greenesplitter.h | 5 +- cpp_src/core/index/rtree/rstarsplitter.h | 5 +- cpp_src/core/key_value_type.h | 3 +- cpp_src/core/lsn.cc | 6 +- cpp_src/core/lsn.h | 92 +++++---- cpp_src/core/namespace/namespace.cc | 2 +- cpp_src/core/namespace/namespace.h | 5 +- cpp_src/core/namespace/namespaceimpl.cc | 91 ++++++-- cpp_src/core/namespace/namespaceimpl.h | 10 +- cpp_src/core/nsselecter/querypreprocessor.cc | 2 +- cpp_src/core/reindexerimpl.cc | 37 ++-- cpp_src/core/reindexerimpl.h | 7 +- cpp_src/core/transaction.cc | 8 + cpp_src/core/transaction.h | 2 + cpp_src/core/transactionimpl.cc | 84 +++++--- cpp_src/core/transactionimpl.h | 45 +++- cpp_src/estl/contexted_locks.h | 61 +++--- cpp_src/estl/h_vector.h | 1 + cpp_src/gtests/tests/API/api.cc | 12 +- .../gtests/tests/fixtures/fuzzing/index.cc | 4 +- cpp_src/gtests/tests/fixtures/fuzzing/index.h | 10 +- cpp_src/gtests/tests/fixtures/fuzzing/ns.cc | 12 +- .../tests/fixtures/fuzzing/ns_scheme.cc | 34 +-- .../gtests/tests/fixtures/fuzzing/ns_scheme.h | 10 +- .../fixtures/fuzzing/random_generator.cc | 2 +- .../tests/fixtures/fuzzing/random_generator.h | 14 +- cpp_src/gtests/tests/fixtures/fuzzing/types.h | 4 +- cpp_src/readme.md | 2 +- cpp_src/replicator/replicator.cc | 49 +++++ cpp_src/replicator/replicator.h | 3 +- cpp_src/replicator/walrecord.cc | 82 +++++--- cpp_src/replicator/walrecord.h | 12 +- cpp_src/replicator/walselecter.cc | 4 + cpp_src/server/contrib/server.yml | 11 +- cpp_src/server/httpserver.cc | 20 +- cpp_src/tools/fsops.cc | 16 ++ cpp_src/tools/fsops.h | 1 + cpp_src/tools/jsonstring.h | 15 +- cpp_src/vendor/gason/gason.h | 1 + describer.go | 2 +- fulltext.md | 8 +- test/builtinserver_test.go | 6 +- test/compatibility_test/compatibility_test.sh | 10 +- test/fields_autogen_test.go | 40 +++- test/slave_empty_storage_test.go | 4 +- test/uuid_test.go | 10 +- 60 files changed, 897 insertions(+), 384 deletions(-) create mode 100755 cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh diff --git a/cpp_src/CMakeLists.txt b/cpp_src/CMakeLists.txt index 8bfc3150d..c47953a94 100644 --- a/cpp_src/CMakeLists.txt +++ b/cpp_src/CMakeLists.txt @@ -53,25 +53,28 @@ include (TargetArch) target_architecture(COMPILER_TARGET_ARCH) # Configure compile options -string(REPLACE "-DNDEBUG" "" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}") -string(REPLACE "-O2" "-O3" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") -string(REPLACE "-O2" "-O3" CMAKE_C_FLAGS "${CMAKE_C_FLAGS}") -if (NOT ${COMPILER_TARGET_ARCH} STREQUAL "e2k") - string(REPLACE "-g" "-g1" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}") -else() - string(REPLACE "-g" "-g0" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}") -endif() - +if (MSVC) + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g1") + set(CMAKE_C_FLAGS_RELWITHDEBINFO "-O2 -g1") + set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG") + set(CMAKE_C_FLAGS_RELEASE "-O2 -DNDEBUG") +else () + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g1") + set(CMAKE_C_FLAGS_RELWITHDEBINFO "-O3 -g1") + set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") + set(CMAKE_C_FLAGS_RELEASE "-O3 -DNDEBUG") +endif () if (${COMPILER_TARGET_ARCH} STREQUAL "e2k") + set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g0") add_definitions(-D__E2K__) add_definitions(-D__LCC__) -endif() +endif () if (NOT MSVC AND NOT APPLE) check_linker_flag (-gz cxx_linker_supports_gz) if (cxx_linker_supports_gz) - set (CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO "${CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO} -gz") - endif () + set (CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO "${CMAKE_EXE_LINKER_FLAGS_RELWITHDEBINFO} -gz") + endif () endif () if (MSVC) diff --git a/cpp_src/client/rpcclient.cc b/cpp_src/client/rpcclient.cc index f0f4b4b72..48189bf15 100644 --- a/cpp_src/client/rpcclient.cc +++ b/cpp_src/client/rpcclient.cc @@ -3,6 +3,7 @@ #include #include "client/itemimpl.h" #include "core/namespacedef.h" +#include "core/schema.h" #include "gason/gason.h" #include "tools/cpucheck.h" #include "tools/errors.h" @@ -98,6 +99,7 @@ void RPCClient::run(size_t thIdx) { workers_[thIdx].stop_.start(); delayedUpdates_.clear(); + serialDelays_ = 0; for (size_t i = thIdx; int(i) < config_.ConnPoolSize; i += config_.WorkerThreads) { connections_[i].reset(new cproto::ClientConnection(workers_[thIdx].loop_, &connectData_, @@ -717,21 +719,39 @@ void RPCClient::onUpdates(net::cproto::RPCAnswer& ans, cproto::ClientConnection* // If tagsMatcher has been updated but there is no bundled tagsMatcher in cjson // then we need to ask server to send tagsMatcher. + ++serialDelays_; // Delay this update and all the further updates until we get responce from server. ans.EnsureHold(); delayedUpdates_.emplace_back(std::move(ans)); QueryResults* qr = new QueryResults; Select(Query(std::string(nsName)).Limit(0), *qr, - InternalRdxContext(nullptr, - [=](const Error& err) { - delete qr; - // If there are delayed updates then send them to client - auto uq = std::move(delayedUpdates_); - delayedUpdates_.clear(); - if (err.ok()) - for (auto& a1 : uq) onUpdates(a1, conn); - }), + InternalRdxContext( + nullptr, + [this, qr, conn](const Error& err) { + delete qr; + // If there are delayed updates then send them to client + auto uq = std::move(delayedUpdates_); + delayedUpdates_.clear(); + + if (!err.ok() || serialDelays_ > 1) { + // This update was already dealyed, but was not able to synchronize tagsmatcher. + // Such situation usually means, that master's namespace was recreated and must be synchronized via force + // sync. + // Current fix is suboptimal and in some cases even incorrect (but still better, than previous + // implementation) - proper fix requires some versioning info about namespaces, which exists + // in v4 only + std::string_view nsName(std::string_view(uq.front().GetArgs(1)[1])); + logPrintf( + LogWarning, + "[repl:%s] Unable to sync tags matcher via online-replication (err: '%s'). Calling UpdatesLost fallback", + nsName, err.what()); + serialDelays_ = 0; + observers_.OnUpdatesLost(nsName); + } else { + for (auto& a1 : uq) onUpdates(a1, conn); + } + }), conn); return; } else { @@ -748,7 +768,17 @@ void RPCClient::onUpdates(net::cproto::RPCAnswer& ans, cproto::ClientConnection* ns->tagsMatcher_.deserialize(rdser, wrec.itemModify.tmVersion, ns->tagsMatcher_.stateToken()); } } + } else if (wrec.type == WalTagsMatcher) { + TagsMatcher tm; + Serializer ser(wrec.data.data(), wrec.data.size()); + const auto version = ser.GetVarint(); + const auto stateToken = ser.GetVarint(); + tm.deserialize(ser, version, stateToken); + auto ns = getNamespace(nsName); + std::lock_guard lck(ns->lck_); + ns->tagsMatcher_ = std::move(tm); } + serialDelays_ = 0; observers_.OnWALUpdate(LSNPair(lsn, originLSN), nsName, wrec); } diff --git a/cpp_src/client/rpcclient.h b/cpp_src/client/rpcclient.h index eefb2cf03..a906fe898 100644 --- a/cpp_src/client/rpcclient.h +++ b/cpp_src/client/rpcclient.h @@ -90,6 +90,7 @@ class RPCClient { ev::async stop_; std::atomic_bool running; }; + Error selectImpl(std::string_view query, QueryResults &result, cproto::ClientConnection *, seconds netTimeout, const InternalRdxContext &ctx); Error selectImpl(const Query &query, QueryResults &result, cproto::ClientConnection *, seconds netTimeout, @@ -122,6 +123,7 @@ class RPCClient { UpdatesObservers observers_; std::atomic updatesConn_; std::vector delayedUpdates_; + uint64_t serialDelays_ = 0; cproto::ClientConnection::ConnectData connectData_; }; diff --git a/cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh b/cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh new file mode 100755 index 000000000..d189d3841 --- /dev/null +++ b/cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh @@ -0,0 +1,195 @@ +#!/bin/bash +# Task: https://github.com/restream/reindexer/-/issues/1188 +set -e + +function KillAndRemoveServer { + local pid=$1 + kill $pid + wait $pid + yum remove -y 'reindexer*' > /dev/null +} + +function WaitForDB { + # wait until DB is loaded + set +e # disable "exit on error" so the script won't stop when DB's not loaded yet + is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list'); + while [[ $is_connected != "test" ]] + do + sleep 2 + is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list'); + done + set -e +} + +function CompareNamespacesLists { + local ns_list_actual=$1 + local ns_list_expected=$2 + local pid=$3 + + diff=$(echo ${ns_list_actual[@]} ${ns_list_expected[@]} | tr ' ' '\n' | sort | uniq -u) # compare in any order + if [ "$diff" == "" ]; then + echo "## PASS: namespaces list not changed" + else + echo "##### FAIL: namespaces list was changed" + echo "expected: $ns_list_expected" + echo "actual: $ns_list_actual" + KillAndRemoveServer $pid; + exit 1 + fi +} + +function CompareMemstats { + local actual=$1 + local expected=$2 + local pid=$3 + diff=$(echo ${actual[@]} ${expected[@]} | tr ' ' '\n' | sed 's/\(.*\),$/\1/' | sort | uniq -u) # compare in any order + if [ "$diff" == "" ]; then + echo "## PASS: memstats not changed" + else + echo "##### FAIL: memstats was changed" + echo "expected: $expected" + echo "actual: $actual" + KillAndRemoveServer $pid; + exit 1 + fi +} + + +RX_SERVER_CURRENT_VERSION_RPM="$(basename build/reindexer-*server*.rpm)" +VERSION_FROM_RPM=$(echo "$RX_SERVER_CURRENT_VERSION_RPM" | grep -o '.*server-..') +VERSION=$(echo ${VERSION_FROM_RPM: -2:1}) # one-digit version + +echo "## choose latest release rpm file" +if [ $VERSION == 3 ]; then + LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 3) + namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg' +elif [ $VERSION == 4 ]; then + LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 4) + # replicationstats ns added for v4 + namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\n#replicationstats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg' +else + echo "Unknown version" + exit 1 +fi + +echo "## downloading latest release rpm file: $LATEST_RELEASE" +curl "http://repo.itv.restr.im/itv-api-ng/7/x86_64/$LATEST_RELEASE" --output $LATEST_RELEASE; +echo "## downloading example DB" +curl "https://git.restream.ru/MaksimKravchuk/reindexer_testdata/-/raw/master/big.zip" --output big.zip; +unzip -o big.zip # unzips into mydb_big.rxdump; + +ADDRESS="cproto://127.0.0.1:6534/" +DB_NAME="test" + +memstats_expected=$'[ +{"replication":{"data_hash":24651210926,"data_count":3}}, +{"replication":{"data_hash":6252344969,"data_count":1}}, +{"replication":{"data_hash":37734732881,"data_count":28}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":1024095024522,"data_count":1145}}, +{"replication":{"data_hash":8373644068,"data_count":1315}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":7404222244,"data_count":97}}, +{"replication":{"data_hash":94132837196,"data_count":4}}, +{"replication":{"data_hash":1896088071,"data_count":2}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":-672103903,"data_count":33538}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":6833710705,"data_count":1}}, +{"replication":{"data_hash":5858155773472,"data_count":4500}}, +{"replication":{"data_hash":-473221280268823592,"data_count":65448}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":8288213744,"data_count":3}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":0,"data_count":0}}, +{"replication":{"data_hash":354171024786967,"data_count":3941}}, +{"replication":{"data_hash":-6520334670,"data_count":35886}}, +{"replication":{"data_hash":112772074632,"data_count":281}}, +{"replication":{"data_hash":-12679568198538,"data_count":1623116}} +] +Returned 27 rows' + +echo "##### Forward compatibility test #####" + +DB_PATH=$(pwd)"/rx_db" + +echo "Database: "$DB_PATH + +echo "## installing latest release: $LATEST_RELEASE" +yum install -y $LATEST_RELEASE > /dev/null; +# run RX server with disabled logging +reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +reindexer_tool --dsn $ADDRESS$DB_NAME -f mydb_big.rxdump --createdb; +sleep 1; + +namespaces_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_1; +CompareNamespacesLists "${namespaces_1[@]}" "${namespaces_list_expected[@]}" $server_pid; + +memstats_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats'); +CompareMemstats "${memstats_1[@]}" "${memstats_expected[@]}" $server_pid; + +KillAndRemoveServer $server_pid; + +echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM" +yum install -y build/*.rpm > /dev/null; +reindexer_server -l0 --corelog=none --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +WaitForDB + +namespaces_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_2; +CompareNamespacesLists "${namespaces_2[@]}" "${namespaces_1[@]}" $server_pid; + +memstats_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats'); +CompareMemstats "${memstats_2[@]}" "${memstats_1[@]}" $server_pid; + +KillAndRemoveServer $server_pid; +rm -rf $DB_PATH; +sleep 1; + +echo "##### Backward compatibility test #####" + +echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM" +yum install -y build/*.rpm > /dev/null; +reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +reindexer_tool --dsn $ADDRESS$DB_NAME -f mydb_big.rxdump --createdb; +sleep 1; + +namespaces_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_3; +CompareNamespacesLists "${namespaces_3[@]}" "${namespaces_list_expected[@]}" $server_pid; + +memstats_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats'); +CompareMemstats "${memstats_3[@]}" "${memstats_expected[@]}" $server_pid; + +KillAndRemoveServer $server_pid; + +echo "## installing latest release: $LATEST_RELEASE" +yum install -y $LATEST_RELEASE > /dev/null; +reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH & +server_pid=$! +sleep 2; + +WaitForDB + +namespaces_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list'); +echo $namespaces_4; +CompareNamespacesLists "${namespaces_4[@]}" "${namespaces_3[@]}" $server_pid; + +memstats_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats'); +CompareMemstats "${memstats_4[@]}" "${memstats_3[@]}" $server_pid; + +KillAndRemoveServer $server_pid; +rm -rf $DB_PATH; diff --git a/cpp_src/cmd/reindexer_tool/commandsexecutor.cc b/cpp_src/cmd/reindexer_tool/commandsexecutor.cc index 77c3abb70..1b5853d70 100644 --- a/cpp_src/cmd/reindexer_tool/commandsexecutor.cc +++ b/cpp_src/cmd/reindexer_tool/commandsexecutor.cc @@ -191,7 +191,43 @@ Error CommandsExecutor::runImpl(const std::string& dsn, Args&&... a using reindexer::net::ev::sig; assertrx(!executorThr_.joinable()); - auto fn = [this](const std::string& dsn, Args&&... args) { + auto fnLoop = [this](const std::string& dsn, Args&&... args) { + std::string outputMode; + Error err; + if (reindexer::fs::ReadFile(reindexer::fs::JoinPath(reindexer::fs::GetHomeDir(), kConfigFile), outputMode) > 0) { + try { + gason::JsonParser jsonParser; + gason::JsonNode value = jsonParser.Parse(reindexer::giftStr(outputMode)); + for (auto node : value) { + WrSerializer ser; + reindexer::jsonValueToString(node.value, ser, 0, 0, false); + variables_[kVariableOutput] = std::string(ser.Slice()); + } + } catch (const gason::Exception& e) { + err = Error(errParseJson, "Unable to parse output mode: %s", e.what()); + } + } + if (err.ok() && variables_.empty()) { + variables_[kVariableOutput] = kOutputModeJson; + } + if (err.ok() && !uri_.parse(dsn)) { + err = Error(errNotValid, "Cannot connect to DB: Not a valid uri"); + } + if (err.ok()) err = db().Connect(dsn, std::forward(args)...); + if (err.ok()) { + loop_.spawn( + [this] { + // This coroutine should prevent loop from stopping for core::Reindexer + stopCh_.pop(); + }, + k8KStack); + } + std::lock_guard lck(mtx_); + status_.running = err.ok(); + status_.err = std::move(err); + }; + + auto fnThread = [this, &fnLoop](const std::string& dsn, Args&&... args) { sig sint; sint.set(loop_); sint.set([this](sig&) { cancelCtx_.Cancel(); }); @@ -214,49 +250,13 @@ Error CommandsExecutor::runImpl(const std::string& dsn, Args&&... a }); }); cmdAsync_.start(); - - auto fn = [this](const std::string& dsn, Args&&... args) { - std::string outputMode; - Error err; - if (reindexer::fs::ReadFile(reindexer::fs::JoinPath(reindexer::fs::GetHomeDir(), kConfigFile), outputMode) > 0) { - try { - gason::JsonParser jsonParser; - gason::JsonNode value = jsonParser.Parse(reindexer::giftStr(outputMode)); - for (auto node : value) { - WrSerializer ser; - reindexer::jsonValueToString(node.value, ser, 0, 0, false); - variables_[kVariableOutput] = std::string(ser.Slice()); - } - } catch (const gason::Exception& e) { - err = Error(errParseJson, "Unable to parse output mode: %s", e.what()); - } - } - if (err.ok() && variables_.empty()) { - variables_[kVariableOutput] = kOutputModeJson; - } - if (err.ok() && !uri_.parse(dsn)) { - err = Error(errNotValid, "Cannot connect to DB: Not a valid uri"); - } - if (err.ok()) err = db().Connect(dsn, std::forward(args)...); - if (err.ok()) { - loop_.spawn( - [this] { - // This coroutine should prevent loop from stopping for core::Reindexer - stopCh_.pop(); - }, - k8KStack); - } - std::lock_guard lck(mtx_); - status_.running = err.ok(); - status_.err = std::move(err); - }; - loop_.spawn(std::bind(fn, std::cref(dsn), std::forward(args)...)); + loop_.spawn(std::bind(fnLoop, std::cref(dsn), std::forward(args)...)); loop_.run(); }; setStatus(Status()); - executorThr_ = std::thread(std::bind(fn, std::cref(dsn), std::forward(args)...)); + executorThr_ = std::thread(std::bind(fnThread, std::cref(dsn), std::forward(args)...)); auto status = GetStatus(); while (!status.running && status.err.ok()) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); diff --git a/cpp_src/core/defnsconfigs.h b/cpp_src/core/defnsconfigs.h index 04fec8aa3..1601c2b68 100644 --- a/cpp_src/core/defnsconfigs.h +++ b/cpp_src/core/defnsconfigs.h @@ -75,6 +75,7 @@ const std::vector kDefDBConfig = { "replication":{ "role":"none", "master_dsn":"cproto://127.0.0.1:6534/db", + "server_id":0, "cluster_id":2, "force_sync_on_logic_error": false, "force_sync_on_wrong_data_hash": false, diff --git a/cpp_src/core/ft/areaholder.h b/cpp_src/core/ft/areaholder.h index e4db54a9f..fb186f6b0 100644 --- a/cpp_src/core/ft/areaholder.h +++ b/cpp_src/core/ft/areaholder.h @@ -39,7 +39,8 @@ class AreaBuffer { [[nodiscard]] bool Empty() const noexcept { return data_.empty(); } void Commit() { if (!data_.empty()) { - boost::sort::pdqsort(data_.begin(), data_.end(), [](const Area &rhs, const Area &lhs) { return rhs.start < lhs.start; }); + boost::sort::pdqsort_branchless(data_.begin(), data_.end(), + [](const Area &rhs, const Area &lhs) noexcept { return rhs.start < lhs.start; }); for (auto vit = data_.begin() + 1; vit != data_.end(); ++vit) { auto prev = vit - 1; if (vit->Concat(*prev)) { diff --git a/cpp_src/core/ft/ft_fast/selecter.cc b/cpp_src/core/ft/ft_fast/selecter.cc index b460ca929..40082ae17 100644 --- a/cpp_src/core/ft/ft_fast/selecter.cc +++ b/cpp_src/core/ft/ft_fast/selecter.cc @@ -335,8 +335,8 @@ void Selecter::processLowRelVariants(FtSelectContext& ctx, const FtMerge return false; }); } else { - boost::sort::pdqsort(ctx.lowRelVariants.begin(), ctx.lowRelVariants.end(), - [](FtBoundVariantEntry& l, FtBoundVariantEntry& r) noexcept { return l.proc > r.proc; }); + boost::sort::pdqsort_branchless(ctx.lowRelVariants.begin(), ctx.lowRelVariants.end(), + [](FtBoundVariantEntry& l, FtBoundVariantEntry& r) noexcept { return l.proc > r.proc; }); } auto lastVariantLen = ctx.lowRelVariants.size() ? ctx.lowRelVariants[0].GetLenCached() : -1; @@ -790,7 +790,7 @@ std::pair Selecter::calcTermRank(const TextSearchResults& r if (!termRank) return std::make_pair(termRank, field); if (holder_.cfg_->summationRanksByFieldsRatio > 0) { - std::sort(ranksInFields.begin(), ranksInFields.end()); + boost::sort::pdqsort_branchless(ranksInFields.begin(), ranksInFields.end()); double k = holder_.cfg_->summationRanksByFieldsRatio; for (auto rank : ranksInFields) { termRank += (k * rank); @@ -921,9 +921,10 @@ void Selecter::mergeIterationGroup(TextSearchResults& rawRes, index_t ra mergedPosInfo.rank = 0; } else { auto& posTmp = mergedPosInfo.posTmp; - boost::sort::pdqsort( - posTmp.begin(), posTmp.end(), - [](const std::pair& l, const std::pair& r) { return l.first < r.first; }); + boost::sort::pdqsort_branchless(posTmp.begin(), posTmp.end(), + [](const std::pair& l, + const std::pair& r) noexcept { return l.first < r.first; }); + auto last = std::unique(posTmp.begin(), posTmp.end()); posTmp.resize(last - posTmp.begin()); @@ -984,9 +985,9 @@ void Selecter::mergeResultsPart(std::vector& rawResul merged.maxRank = m.proc; } } - - boost::sort::pdqsort(merged.begin(), merged.end(), - [](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) { return lhs.proc > rhs.proc; }); + boost::sort::pdqsort_branchless( + merged.begin(), merged.end(), + [](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) noexcept { return lhs.proc > rhs.proc; }); } template @@ -1244,12 +1245,11 @@ typename IDataHolder::MergeData Selecter::mergeResults(std::vector merged_rd; std::vector idoffsets; - for (auto& rawRes : rawResults) { - boost::sort::pdqsort(rawRes.begin(), rawRes.end(), - [](const TextSearchResult& lhs, const TextSearchResult& rhs) { return lhs.proc_ > rhs.proc_; }); + boost::sort::pdqsort_branchless( + rawRes.begin(), rawRes.end(), + [](const TextSearchResult& lhs, const TextSearchResult& rhs) noexcept { return lhs.proc_ > rhs.proc_; }); } - const auto maxMergedSize = std::min(size_t(holder_.cfg_->mergeLimit), totalORVids); merged.reserve(maxMergedSize); @@ -1332,8 +1332,9 @@ typename IDataHolder::MergeData Selecter::mergeResults(std::vector rhs.proc; }); + boost::sort::pdqsort_branchless( + merged.begin(), merged.end(), + [](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) noexcept { return lhs.proc > rhs.proc; }); return merged; } diff --git a/cpp_src/core/ft/ft_fuzzy/merger/basemerger.cc b/cpp_src/core/ft/ft_fuzzy/merger/basemerger.cc index 967161001..12a57498c 100644 --- a/cpp_src/core/ft/ft_fuzzy/merger/basemerger.cc +++ b/cpp_src/core/ft/ft_fuzzy/merger/basemerger.cc @@ -93,7 +93,7 @@ SearchResult BaseMerger::Merge(MergeCtx& ctx, bool inTransaction, const reindexe data_set.AddData(it->Id(), id_ctx); } } - boost::sort::pdqsort(data_set.data_->begin(), data_set.data_->end(), [](const MergedData& lhs, const MergedData& rhs) { + boost::sort::pdqsort(data_set.data_->begin(), data_set.data_->end(), [](const MergedData& lhs, const MergedData& rhs) noexcept { if (lhs.proc_ == rhs.proc_) { return lhs.id_ < rhs.id_; } diff --git a/cpp_src/core/ft/idrelset.cc b/cpp_src/core/ft/idrelset.cc index 7f31546d1..6b87bc15c 100644 --- a/cpp_src/core/ft/idrelset.cc +++ b/cpp_src/core/ft/idrelset.cc @@ -1,4 +1,3 @@ - #include "idrelset.h" #include #include "estl/h_vector.h" @@ -84,9 +83,4 @@ int IdRelSet::Add(VDocIdType id, int pos, int field) { return back().Size(); } -void IdRelType::SimpleCommit() { - boost::sort::pdqsort(pos_.begin(), pos_.end(), - [](const IdRelType::PosType& lhs, const IdRelType::PosType& rhs) { return lhs.pos() < rhs.pos(); }); -} - } // namespace reindexer diff --git a/cpp_src/core/ft/idrelset.h b/cpp_src/core/ft/idrelset.h index 05ea5f8b7..23bc811fa 100644 --- a/cpp_src/core/ft/idrelset.h +++ b/cpp_src/core/ft/idrelset.h @@ -102,11 +102,11 @@ class IdRelType { addField(field); } void SortAndUnique() { - boost::sort::pdqsort(pos_.begin(), pos_.end()); + boost::sort::pdqsort_branchless(pos_.begin(), pos_.end()); auto last = std::unique(pos_.begin(), pos_.end()); pos_.resize(last - pos_.begin()); } - void Clear() { + void Clear() noexcept { usedFieldsMask_ = 0; #ifdef REINDEXER_FT_EXTRA_DEBUG pos_.clear(); @@ -116,7 +116,11 @@ class IdRelType { } size_t Size() const noexcept { return pos_.size(); } size_t size() const noexcept { return pos_.size(); } - void SimpleCommit(); + void SimpleCommit() noexcept { + boost::sort::pdqsort_branchless( + pos_.begin(), pos_.end(), + [](const IdRelType::PosType& lhs, const IdRelType::PosType& rhs) noexcept { return lhs.pos() < rhs.pos(); }); + } const RVector& Pos() const noexcept { return pos_; } uint64_t UsedFieldsMask() const noexcept { return usedFieldsMask_; } size_t HeapSize() const noexcept { return heapSize(pos_); } @@ -141,7 +145,7 @@ class IdRelType { class IdRelSet : public std::vector { public: int Add(VDocIdType id, int pos, int field); - void SimpleCommit() { + void SimpleCommit() noexcept { for (auto& val : *this) val.SimpleCommit(); } diff --git a/cpp_src/core/idset.h b/cpp_src/core/idset.h index 81522fcef..e2e501c00 100644 --- a/cpp_src/core/idset.h +++ b/cpp_src/core/idset.h @@ -108,7 +108,7 @@ class IdSet : public IdSetPlain { return *this; } static Ptr BuildFromUnsorted(base_idset &&ids) { - boost::sort::pdqsort(ids.begin(), ids.end()); + boost::sort::pdqsort_branchless(ids.begin(), ids.end()); ids.erase(std::unique(ids.begin(), ids.end()), ids.end()); // TODO: It would be better to integrate unique into sort return make_intrusive>(std::move(ids)); } diff --git a/cpp_src/core/index/indextext/fastindextext.cc b/cpp_src/core/index/indextext/fastindextext.cc index 12cbdcae9..c56dc44b8 100644 --- a/cpp_src/core/index/indextext/fastindextext.cc +++ b/cpp_src/core/index/indextext/fastindextext.cc @@ -123,7 +123,7 @@ template IndexMemStat FastIndexText::GetMemStat(const RdxContext &ctx) { auto ret = IndexUnordered::GetMemStat(ctx); - contexted_shared_lock lck(this->mtx_, &ctx); + contexted_shared_lock lck(this->mtx_, ctx); ret.fulltextSize = this->holder_->GetMemStat(); ret.idsetCache = this->cache_ft_ ? this->cache_ft_->GetMemStat() : LRUCacheMemStat(); return ret; diff --git a/cpp_src/core/index/keyentry.h b/cpp_src/core/index/keyentry.h index 079f543c6..9c500dba1 100644 --- a/cpp_src/core/index/keyentry.h +++ b/cpp_src/core/index/keyentry.h @@ -33,12 +33,14 @@ class KeyEntry { auto idsAsc = Sorted(ctx.getCurSortId()); size_t idx = 0; + const auto& ids2Sorts = ctx.ids2Sorts(); + [[maybe_unused]] const IdType maxRowId = IdType(ids2Sorts.size()); // For all ids of current key for (auto rowid : ids_) { - assertf(rowid < int(ctx.ids2Sorts().size()), "id=%d,ctx.ids2Sorts().size()=%d", rowid, ctx.ids2Sorts().size()); - idsAsc[idx++] = ctx.ids2Sorts()[rowid]; + assertf(rowid < maxRowId, "id=%d,ctx.ids2Sorts().size()=%d", rowid, maxRowId); + idsAsc[idx++] = ids2Sorts[rowid]; } - boost::sort::pdqsort(idsAsc.begin(), idsAsc.end()); + boost::sort::pdqsort_branchless(idsAsc.begin(), idsAsc.end()); } void Dump(std::ostream& os, std::string_view step, std::string_view offset) const { std::string newOffset; diff --git a/cpp_src/core/index/rtree/greenesplitter.h b/cpp_src/core/index/rtree/greenesplitter.h index ba707e781..c8c6a7179 100644 --- a/cpp_src/core/index/rtree/greenesplitter.h +++ b/cpp_src/core/index/rtree/greenesplitter.h @@ -1,6 +1,7 @@ #pragma once #include "splitter.h" +#include "vendor/sort/pdqsort.hpp" namespace reindexer { @@ -53,7 +54,7 @@ class GreeneSplitter : private Splitter secondSeedBoundRect.Left() + secondSeedBoundRect.Right()) { std::swap(seeds.first, seeds.second); } - std::sort(std::begin(indexes), std::end(indexes), [&src, this](size_t lhs, size_t rhs) { + boost::sort::pdqsort(std::begin(indexes), std::end(indexes), [&src, this](size_t lhs, size_t rhs) { return Base::getBoundRect(lhs < MaxEntries ? src[lhs] : this->appendingEntry_).Left() < Base::getBoundRect(rhs < MaxEntries ? src[rhs] : this->appendingEntry_).Left(); }); @@ -61,7 +62,7 @@ class GreeneSplitter : private Splitter secondSeedBoundRect.Bottom() + secondSeedBoundRect.Top()) { std::swap(seeds.first, seeds.second); } - std::sort(std::begin(indexes), std::end(indexes), [&src, this](size_t lhs, size_t rhs) { + boost::sort::pdqsort(std::begin(indexes), std::end(indexes), [&src, this](size_t lhs, size_t rhs) { return Base::getBoundRect(lhs < MaxEntries ? src[lhs] : this->appendingEntry_).Bottom() < Base::getBoundRect(rhs < MaxEntries ? src[rhs] : this->appendingEntry_).Bottom(); }); diff --git a/cpp_src/core/index/rtree/rstarsplitter.h b/cpp_src/core/index/rtree/rstarsplitter.h index a44c9748e..427a4add3 100644 --- a/cpp_src/core/index/rtree/rstarsplitter.h +++ b/cpp_src/core/index/rtree/rstarsplitter.h @@ -1,6 +1,7 @@ #pragma once #include "splitter.h" +#include "vendor/sort/pdqsort.hpp" namespace reindexer { @@ -19,12 +20,12 @@ class RStarSplitter : private Splitter diff --git a/cpp_src/core/lsn.cc b/cpp_src/core/lsn.cc index dc88fb059..73c0130e2 100644 --- a/cpp_src/core/lsn.cc +++ b/cpp_src/core/lsn.cc @@ -3,9 +3,11 @@ namespace reindexer { -void lsn_t::GetJSON(JsonBuilder &builder) const { +void lsn_t::GetJSON(JsonBuilder& builder) const { builder.Put("server_id", Server()); builder.Put("counter", Counter()); } -} // namespace reindexer +[[noreturn]] void lsn_t::throwValidation(ErrorCode code, const char* fmt, int64_t value) { throw Error(code, fmt, value); } + +} // namespace reindexer \ No newline at end of file diff --git a/cpp_src/core/lsn.h b/cpp_src/core/lsn.h index eb1f83d15..174540da4 100644 --- a/cpp_src/core/lsn.h +++ b/cpp_src/core/lsn.h @@ -13,58 +13,53 @@ class JsonBuilder; // SSS NNN NNN NNN NNN NNN (18 decimal digits) struct lsn_t { - static const int64_t digitCountLSNMult = 1000000000000000ll; +private: + static constexpr int16_t kMinServerIDValue = 0; + static constexpr int16_t kMaxServerIDValue = 999; - static const int64_t kCounterbitCount = 48; - static const int64_t kCounterMask = (1ull << kCounterbitCount) - 1ull; + static constexpr int64_t kMaxCounter = 1000000000000000ll; + static constexpr int64_t kDefaultCounter = kMaxCounter - 1; +public: void GetJSON(JsonBuilder &builder) const; void FromJSON(const gason::JsonNode &root) { - int server = root["server_id"].As(0); - int64_t counter = root["counter"].As(digitCountLSNMult - 1ll); + const int server = root["server_id"].As(0); + const int64_t counter = root["counter"].As(kDefaultCounter); payload_ = int64_t(lsn_t(counter, server)); } - lsn_t() {} - explicit lsn_t(int64_t v) { - if ((v & kCounterMask) == kCounterMask) // init -1 - payload_ = digitCountLSNMult - 1ll; - else { - payload_ = v; - } - } - lsn_t(int64_t counter, uint8_t server) { - if ((counter & kCounterMask) == kCounterMask) counter = digitCountLSNMult - 1ll; - int64_t s = server * digitCountLSNMult; - payload_ = s + counter; + lsn_t() noexcept = default; + lsn_t(const lsn_t &) noexcept = default; + lsn_t(lsn_t &&) noexcept = default; + lsn_t &operator=(const lsn_t &) noexcept = default; + lsn_t &operator=(lsn_t &&) noexcept = default; + explicit lsn_t(int64_t v) : lsn_t(v % kMaxCounter, v / kMaxCounter) {} + lsn_t(int64_t counter, int16_t server) { + validateCounter(counter); + validateServerId(server); + payload_ = server * kMaxCounter + counter; } explicit operator int64_t() const { return payload_; } - bool operator==(lsn_t o) { return payload_ == o.payload_; } - bool operator!=(lsn_t o) { return payload_ != o.payload_; } + bool operator==(lsn_t o) const noexcept { return payload_ == o.payload_; } + bool operator!=(lsn_t o) const noexcept { return payload_ != o.payload_; } - int64_t SetServer(short s) { - if (s > 999) throw Error(errLogic, "Server id > 999"); - int64_t server = s * digitCountLSNMult; - int64_t serverOld = payload_ / digitCountLSNMult; - payload_ = payload_ - serverOld * digitCountLSNMult + server; + int64_t SetServer(short server) { + validateServerId(server); + payload_ = server * kMaxCounter + Counter(); return payload_; } - int64_t SetCounter(int64_t c) { - if (c >= digitCountLSNMult) throw Error(errLogic, "LSN Counter > digitCountLSNMult"); - int64_t server = payload_ / digitCountLSNMult; - payload_ = server * digitCountLSNMult + c; + int64_t SetCounter(int64_t counter) { + validateCounter(counter); + payload_ = Server() * kMaxCounter + counter; return payload_; } - int64_t Counter() const { - int64_t server = payload_ / digitCountLSNMult; - return payload_ - server * digitCountLSNMult; - } - short Server() const { return payload_ / digitCountLSNMult; } - bool isEmpty() const { return Counter() == digitCountLSNMult - 1ll; } + int64_t Counter() const noexcept { return payload_ % kMaxCounter; } + int16_t Server() const noexcept { return payload_ / kMaxCounter; } + bool isEmpty() const noexcept { return Counter() == kDefaultCounter; } - int compare(lsn_t o) { + int compare(lsn_t o) const { if (Server() != o.Server()) throw Error(errLogic, "Compare lsn from different server"); if (Counter() < o.Counter()) return -1; @@ -73,13 +68,28 @@ struct lsn_t { return 0; } - bool operator<(lsn_t o) { return compare(o) == -1; } - bool operator<=(lsn_t o) { return compare(o) <= 0; } - bool operator>(lsn_t o) { return compare(o) == 1; } - bool operator>=(lsn_t o) { return compare(o) >= 0; } + bool operator<(lsn_t o) const { return compare(o) == -1; } + bool operator<=(lsn_t o) const { return compare(o) <= 0; } + bool operator>(lsn_t o) const { return compare(o) == 1; } + bool operator>=(lsn_t o) const { return compare(o) >= 0; } + +private: + int64_t payload_ = kDefaultCounter; + static void validateServerId(int16_t server) { + if (server < kMinServerIDValue) { + throwValidation(errLogic, "Server id < %d", kMinServerIDValue); + } + if (server > kMaxServerIDValue) { + throwValidation(errLogic, "Server id > %d", kMaxServerIDValue); + } + } + static void validateCounter(int64_t counter) { + if (counter > kDefaultCounter) { + throwValidation(errLogic, "LSN Counter > Default LSN (%d)", kMaxCounter); + } + } -protected: - int64_t payload_ = digitCountLSNMult - 1ll; + [[noreturn]] static void throwValidation(ErrorCode, const char *, int64_t); }; struct LSNPair { diff --git a/cpp_src/core/namespace/namespace.cc b/cpp_src/core/namespace/namespace.cc index 733bb309e..dbf0cc362 100644 --- a/cpp_src/core/namespace/namespace.cc +++ b/cpp_src/core/namespace/namespace.cc @@ -23,7 +23,7 @@ void Namespace::CommitTransaction(Transaction& tx, QueryResults& result, const R if (needNamespaceCopy(nsl, tx)) { PerfStatCalculatorMT calc(nsl->updatePerfCounter_, enablePerfCounters); - auto lck = statCalculator.CreateLock(clonerMtx_, &ctx); + auto lck = statCalculator.CreateLock(clonerMtx_, ctx); nsl = ns_; if (needNamespaceCopy(nsl, tx)) { diff --git a/cpp_src/core/namespace/namespace.h b/cpp_src/core/namespace/namespace.h index 55dc59548..6722c4fb6 100644 --- a/cpp_src/core/namespace/namespace.h +++ b/cpp_src/core/namespace/namespace.h @@ -146,6 +146,9 @@ class Namespace { } StorageOpts GetStorageOpts(const RdxContext &ctx) { return handleInvalidation(NamespaceImpl::GetStorageOpts)(ctx); } void Refill(std::vector &items, const RdxContext &ctx) { handleInvalidation(NamespaceImpl::Refill)(items, ctx); } + void SetTagsMatcher(TagsMatcher &&tm, const RdxContext &ctx) { + return handleInvalidation(NamespaceImpl::SetTagsMatcher)(std::move(tm), ctx); + } void DumpIndex(std::ostream &os, std::string_view index, const RdxContext &ctx) { return handleInvalidation(NamespaceImpl::DumpIndex)(os, index, ctx); @@ -160,7 +163,7 @@ class Namespace { NamespaceImpl::Ptr getMainNs() const { return atomicLoadMainNs(); } NamespaceImpl::Ptr awaitMainNs(const RdxContext &ctx) const { if (hasCopy_.load(std::memory_order_acquire)) { - contexted_unique_lock lck(clonerMtx_, &ctx); + contexted_unique_lock lck(clonerMtx_, ctx); assertrx(!hasCopy_.load(std::memory_order_acquire)); return ns_; } diff --git a/cpp_src/core/namespace/namespaceimpl.cc b/cpp_src/core/namespace/namespaceimpl.cc index ef58a7116..7cabc99a8 100644 --- a/cpp_src/core/namespace/namespaceimpl.cc +++ b/cpp_src/core/namespace/namespaceimpl.cc @@ -657,6 +657,7 @@ void NamespaceImpl::SetSchema(std::string_view schema, const RdxContext& ctx) { } } + const auto initTmVer = tagsMatcher_.version(); schema_ = std::make_shared(schema); auto fields = schema_->GetPaths(); for (auto& field : fields) { @@ -667,6 +668,19 @@ void NamespaceImpl::SetSchema(std::string_view schema, const RdxContext& ctx) { saveSchemaToStorage(); addToWAL(schema, WalSetSchema, ctx); + if (initTmVer != tagsMatcher_.version()) { + const lsn_t lsn(wal_.Add(WALRecord(WalEmpty), lsn_t()), serverId_); + if (!ctx.fromReplication_) repl_.lastSelfLSN = lsn; + if (!repl_.temporary) { + WrSerializer wser; + wser.PutVarint(tagsMatcher_.version()); + wser.PutVarint(tagsMatcher_.stateToken()); + tagsMatcher_.serialize(wser); + // This record is matter for the online replication only + observers_->OnWALUpdate(LSNPair(lsn, ctx.fromReplication_ ? ctx.LSNs_.originLSN_ : lsn), name_, + WALRecord(WalTagsMatcher, wser.Slice())); + } + } } std::string NamespaceImpl::GetSchema(int format, const RdxContext& ctx) { @@ -1698,20 +1712,41 @@ void NamespaceImpl::CommitTransaction(Transaction& tx, QueryResults& result, NsC storageAdvice = storage_.AdviceBatching(); } - for (auto& step : tx.GetSteps()) { - if (step.query_) { - QueryResults qr; - qr.AddNamespace(this, true); - if (step.query_->type_ == QueryDelete) { - doDelete(*step.query_, qr, ctx); - } else { - doUpdate(*step.query_, qr, ctx); + for (auto&& step : tx.GetSteps()) { + switch (step.type_) { + case TransactionStep::Type::ModifyItem: { + const auto mode = std::get(step.data_).mode; + Item item = tx.GetItem(std::move(step)); + modifyItem(item, mode, ctx); + result.AddItem(item); + break; } - } else { - const auto modifyMode = step.modifyMode_; - Item item = tx.GetItem(std::move(step)); - modifyItem(item, modifyMode, ctx); - result.AddItem(item); + case TransactionStep::Type::Query: { + QueryResults qr; + qr.AddNamespace(this, true); + auto& data = std::get(step.data_); + if (data.query->type_ == QueryDelete) { + doDelete(*data.query, qr, ctx); + } else { + doUpdate(*data.query, qr, ctx); + } + break; + } + case TransactionStep::Type::Nop: + break; + case TransactionStep::Type::PutMeta: { + auto& data = std::get(step.data_); + putMeta(data.key, data.value, ctx.rdxContext); + break; + } + case TransactionStep::Type::SetTM: { + auto& data = std::get(step.data_); + auto tmCopy = data.tm; + setTagsMatcher(std::move(tmCopy), ctx); + break; + } + default: + std::abort(); } } @@ -2567,6 +2602,11 @@ std::shared_ptr NamespaceImpl::GetSchemaPtr(const RdxContext& ctx) return schema_; } +void NamespaceImpl::SetTagsMatcher(TagsMatcher&& tm, const RdxContext& ctx) { + auto wlck = wLock(ctx); + setTagsMatcher(std::move(tm), ctx); +} + void NamespaceImpl::LoadFromStorage(unsigned threadsCount, const RdxContext& ctx) { auto wlck = wLock(ctx); FlagGuardT nsLoadingGuard(nsIsLoading_); @@ -2647,6 +2687,31 @@ void NamespaceImpl::removeExpiredStrings(RdxActivityContext* ctx) { } } +void NamespaceImpl::setTagsMatcher(TagsMatcher&& tm, const NsContext& ctx) { + // NOTE: In v4 tm tokens here are always the same, but in v3 those tokens are not synchronized. Probably it should workd anyway + // if (tm.stateToken() != tagsMatcher_.stateToken()) { + // throw Error(errParams, "Tagsmatcher have different statetokens: %08X vs %08X", tagsMatcher_.stateToken(), tm.stateToken()); + // } + if (!ctx.rdxContext.fromReplication_) { + throw Error(errParams, "Tagsmatcher can be set from replication only"); + } + tagsMatcher_ = tm; + tagsMatcher_.UpdatePayloadType(payloadType_, false); + tagsMatcher_.setUpdated(); + + const lsn_t lsn(wal_.Add(WALRecord(WalEmpty, 0, ctx.inTransaction)), serverId_); + if (!repl_.temporary) { + WrSerializer ser; + ser.PutVarint(tagsMatcher_.version()); + ser.PutVarint(tagsMatcher_.stateToken()); + tagsMatcher_.serialize(ser); + observers_->OnWALUpdate(LSNPair(lsn, ctx.rdxContext.LSNs_.originLSN_), name_, + WALRecord(WalTagsMatcher, ser.Slice(), ctx.inTransaction)); + } + + saveTagsMatcherToStorage(true); +} + void NamespaceImpl::BackgroundRoutine(RdxActivityContext* ctx) { const RdxContext rdxCtx(ctx); const NsContext nsCtx(rdxCtx); diff --git a/cpp_src/core/namespace/namespaceimpl.h b/cpp_src/core/namespace/namespaceimpl.h index 31a6e8fb4..a19026ac1 100644 --- a/cpp_src/core/namespace/namespaceimpl.h +++ b/cpp_src/core/namespace/namespaceimpl.h @@ -138,7 +138,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. friend class ItemsLoader; friend class IndexInserters; - class NSUpdateSortedContext : public UpdateSortedContext { + class NSUpdateSortedContext final : public UpdateSortedContext { public: NSUpdateSortedContext(const NamespaceImpl &ns, SortType curSortId) : ns_(ns), sorted_indexes_(ns_.getSortedIdxCount()), curSortId_(curSortId) { @@ -282,6 +282,9 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. std::shared_ptr GetSchemaPtr(const RdxContext &ctx) const; int getNsNumber() const { return schema_ ? schema_->GetProtobufNsNumber() : 0; } IndexesCacheCleaner GetIndexesCacheCleaner() { return IndexesCacheCleaner{*this}; } + // Separate method for the v3/v4 replication compatibility. + // It should not be used outside of this scenario + void SetTagsMatcher(TagsMatcher &&tm, const RdxContext &ctx); void SetDestroyFlag() { dbDestroyed_ = true; } Error FlushStorage(const RdxContext &ctx) { const auto flushOpts = StorageFlushOpts().WithImmediateReopen(); @@ -303,10 +306,10 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. typedef contexted_shared_lock RLockT; typedef contexted_unique_lock WLockT; - RLockT RLock(const RdxContext &ctx) const { return RLockT(mtx_, &ctx); } + RLockT RLock(const RdxContext &ctx) const { return RLockT(mtx_, ctx); } WLockT WLock(const RdxContext &ctx) const { using namespace std::string_view_literals; - WLockT lck(mtx_, &ctx); + WLockT lck(mtx_, ctx); if (readonly_.load(std::memory_order_acquire)) { throw Error(errNamespaceInvalidated, "NS invalidated"sv); } @@ -373,6 +376,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. std::optional &&modifyData); void removeExpiredItems(RdxActivityContext *); void removeExpiredStrings(RdxActivityContext *); + void setTagsMatcher(TagsMatcher &&tm, const NsContext &ctx); Item newItem(); template diff --git a/cpp_src/core/nsselecter/querypreprocessor.cc b/cpp_src/core/nsselecter/querypreprocessor.cc index 6070eb478..6697aad44 100644 --- a/cpp_src/core/nsselecter/querypreprocessor.cc +++ b/cpp_src/core/nsselecter/querypreprocessor.cc @@ -501,7 +501,7 @@ bool QueryPreprocessor::mergeQueryEntries(size_t lhs, size_t rhs) { constexpr size_t kMinArraySizeToUseHashSet = 250; if (second.size() < kMinArraySizeToUseHashSet) { // Intersect via binary search + sort for small vectors - std::sort(first.begin(), first.end()); + boost::sort::pdqsort(first.begin(), first.end()); for (auto &&v : second) { if (std::binary_search(first.begin(), first.end(), v)) { setValues.emplace_back(std::move(v)); diff --git a/cpp_src/core/reindexerimpl.cc b/cpp_src/core/reindexerimpl.cc index 59e1f48bd..5b1040f6c 100644 --- a/cpp_src/core/reindexerimpl.cc +++ b/cpp_src/core/reindexerimpl.cc @@ -96,7 +96,7 @@ ReindexerImpl::StatsLocker::StatsLocker() { ReindexerImpl::StatsLocker::StatsLockT ReindexerImpl::StatsLocker::LockIfRequired(std::string_view sysNsName, const RdxContext& ctx) { auto found = mtxMap_.find(sysNsName); if (found != mtxMap_.end()) { - return StatsLockT(found->second, &ctx); + return StatsLockT(found->second, ctx); } // Do not create any lock, if namespace does not preset in the map return StatsLockT(); @@ -231,8 +231,9 @@ Error ReindexerImpl::Connect(const std::string& dsn, ConnectOpts opts) { if (!err.ok()) return err; if (enableStorage && opts.IsOpenNamespaces()) { - std::sort(foundNs.begin(), foundNs.end(), - [](const fs::DirEntry& ld, const fs::DirEntry& rd) { return ld.internalFilesCount > rd.internalFilesCount; }); + boost::sort::pdqsort_branchless(foundNs.begin(), foundNs.end(), [](const fs::DirEntry& ld, const fs::DirEntry& rd) noexcept { + return ld.internalFilesCount > rd.internalFilesCount; + }); const size_t maxLoadWorkers = ConcurrentNamespaceLoaders(); std::unique_ptr thrs(new std::thread[maxLoadWorkers]); std::atomic_flag hasNsErrors = ATOMIC_FLAG_INIT; @@ -303,7 +304,7 @@ Error ReindexerImpl::OpenNamespace(std::string_view nsName, const StorageOpts& o const auto rdxCtx = ctx.CreateRdxContext(ctx.NeedTraceActivity() ? (ser << "OPEN NAMESPACE " << nsName).Slice() : ""sv, activities_); { - SLock lock(mtx_, &rdxCtx); + SLock lock(mtx_, rdxCtx); auto it = namespaces_.find(nsName); if (it == namespaces_.end()) { // create new namespace if (!validateUserNsName(nsName)) { @@ -335,7 +336,7 @@ Error ReindexerImpl::addNamespace(const NamespaceDef& nsDef, const RdxContext& r Namespace::Ptr ns; try { { - SLock lock(mtx_, &rdxCtx); + SLock lock(mtx_, rdxCtx); if (namespaces_.find(nsDef.name) != namespaces_.end()) { return Error(errParams, "Namespace '%s' already exists", nsDef.name); } @@ -356,7 +357,7 @@ Error ReindexerImpl::addNamespace(const NamespaceDef& nsDef, const RdxContext& r ns->LoadFromStorage(kStorageLoadingThreads, rdxCtx); } { - ULock lock(mtx_, &rdxCtx); + ULock lock(mtx_, rdxCtx); namespaces_.insert({nsDef.name, ns}); } if (!nsDef.isTemporary) observers_.OnWALUpdate(LSNPair(), nsDef.name, WALRecord(WalNamespaceAdd)); @@ -376,7 +377,7 @@ Error ReindexerImpl::addNamespace(const NamespaceDef& nsDef, const RdxContext& r Error ReindexerImpl::openNamespace(std::string_view name, const StorageOpts& storageOpts, const RdxContext& rdxCtx) { try { { - SLock lock(mtx_, &rdxCtx); + SLock lock(mtx_, rdxCtx); auto nsIt = namespaces_.find(name); if (nsIt != namespaces_.end() && nsIt->second) { if (storageOpts.IsSlaveMode()) nsIt->second->setSlaveMode(rdxCtx); @@ -426,7 +427,7 @@ Error ReindexerImpl::closeNamespace(std::string_view nsName, const RdxContext& c Namespace::Ptr ns; Error err; try { - ULock lock(mtx_, &ctx); + ULock lock(mtx_, ctx); auto nsIt = namespaces_.find(nsName); if (nsIt == namespaces_.end()) { return Error(errNotFound, "Namespace '%s' does not exist", nsName); @@ -498,7 +499,7 @@ Error ReindexerImpl::RenameNamespace(std::string_view srcNsName, const std::stri const auto rdxCtx = ctx.CreateRdxContext( ctx.NeedTraceActivity() ? (ser << "RENAME " << srcNsName << " to " << dstNsName).Slice() : ""sv, activities_); { - SLock lock(mtx_, &rdxCtx); + SLock lock(mtx_, rdxCtx); auto srcIt = namespaces_.find(srcNsName); if (srcIt == namespaces_.end()) { return Error(errParams, "Namespace '%s' doesn't exist", srcNsName); @@ -540,7 +541,7 @@ Error ReindexerImpl::renameNamespace(std::string_view srcNsName, const std::stri { // Perform namespace flushes to minimize chances of the flush under lock - SLock lock(mtx_, &rdxCtx); + SLock lock(mtx_, rdxCtx); auto srcIt = namespaces_.find(srcNsName); srcNs = (srcIt != namespaces_.end()) ? srcIt->second : Namespace::Ptr(); lock.unlock(); @@ -554,7 +555,7 @@ Error ReindexerImpl::renameNamespace(std::string_view srcNsName, const std::stri } } - ULock lock(mtx_, &rdxCtx); + ULock lock(mtx_, rdxCtx); auto srcIt = namespaces_.find(srcNsName); if (srcIt == namespaces_.end()) { return Error(errParams, "Namespace '%s' doesn't exist", srcNsName); @@ -837,7 +838,7 @@ Error ReindexerImpl::Select(std::string_view query, QueryResults& result, const } struct ItemRefLess { - bool operator()(const ItemRef& lhs, const ItemRef& rhs) const { + bool operator()(const ItemRef& lhs, const ItemRef& rhs) const noexcept { if (lhs.Proc() == rhs.Proc()) { if (lhs.Nsid() == rhs.Nsid()) { return lhs.Id() < rhs.Id(); @@ -1200,7 +1201,7 @@ void ReindexerImpl::doSelect(const Query& q, QueryResults& result, NsLocker& result.Erase(itemRefVec.begin(), itemRefVec.end()); return; } - std::sort(itemRefVec.begin(), itemRefVec.end(), ItemRefLess()); + boost::sort::pdqsort(itemRefVec.begin(), itemRefVec.end(), ItemRefLess()); if (q.HasOffset()) { result.Erase(itemRefVec.begin(), itemRefVec.begin() + q.Offset()); } @@ -1224,7 +1225,7 @@ Error ReindexerImpl::Commit(std::string_view /*_namespace*/) { } Namespace::Ptr ReindexerImpl::getNamespace(std::string_view nsName, const RdxContext& ctx) { - SLock lock(mtx_, &ctx); + SLock lock(mtx_, ctx); auto nsIt = namespaces_.find(nsName); if (nsIt == namespaces_.end()) { throw Error(errParams, "Namespace '%s' does not exist", nsName); @@ -1235,7 +1236,7 @@ Namespace::Ptr ReindexerImpl::getNamespace(std::string_view nsName, const RdxCon } Namespace::Ptr ReindexerImpl::getNamespaceNoThrow(std::string_view nsName, const RdxContext& ctx) { - SLock lock(mtx_, &ctx); + SLock lock(mtx_, ctx); auto nsIt = namespaces_.find(nsName); return (nsIt == namespaces_.end()) ? nullptr : nsIt->second; } @@ -1279,7 +1280,7 @@ Error ReindexerImpl::DropIndex(std::string_view nsName, const IndexDef& indexDef } std::vector> ReindexerImpl::getNamespaces(const RdxContext& ctx) { - SLock lock(mtx_, &ctx); + SLock lock(mtx_, ctx); std::vector> ret; ret.reserve(namespaces_.size()); for (auto& ns : namespaces_) { @@ -1290,7 +1291,7 @@ std::vector> ReindexerImpl::getNamespaces std::vector ReindexerImpl::getNamespacesNames(const RdxContext& ctx) { std::vector ret; - SLock lock(mtx_, &ctx); + SLock lock(mtx_, ctx); ret.reserve(namespaces_.size()); for (auto& ns : namespaces_) { ret.emplace_back(); @@ -1322,7 +1323,7 @@ Error ReindexerImpl::EnumNamespaces(std::vector& defs, EnumNamespa for (auto& d : dirs) { if (d.isDir && d.name != "." && d.name != ".." && opts.MatchFilter(d.name)) { { - SLock lock(mtx_, &rdxCtx); + SLock lock(mtx_, rdxCtx); if (namespaces_.find(d.name) != namespaces_.end()) continue; } std::unique_ptr tmpNs{new NamespaceImpl(d.name, observers_)}; diff --git a/cpp_src/core/reindexerimpl.h b/cpp_src/core/reindexerimpl.h index f8ef5736d..d5ff74fbd 100644 --- a/cpp_src/core/reindexerimpl.h +++ b/cpp_src/core/reindexerimpl.h @@ -31,7 +31,7 @@ class ReindexerImpl { using Mutex = MarkedMutex; using StatsSelectMutex = MarkedMutex; struct NsLockerItem { - NsLockerItem(NamespaceImpl::Ptr ins = {}) : ns(std::move(ins)), count(1) {} + NsLockerItem(NamespaceImpl::Ptr ins = {}) noexcept : ns(std::move(ins)), count(1) {} NamespaceImpl::Ptr ns; NamespaceImpl::Locker::RLockT nsLck; unsigned count = 1; @@ -144,8 +144,9 @@ class ReindexerImpl { assertrx(0); } void Lock() { - std::sort(begin(), end(), [](const NsLockerItem &lhs, const NsLockerItem &rhs) { return lhs.ns.get() < rhs.ns.get(); }); - for (auto it = begin(); it != end(); ++it) { + boost::sort::pdqsort_branchless( + begin(), end(), [](const NsLockerItem &lhs, const NsLockerItem &rhs) noexcept { return lhs.ns.get() < rhs.ns.get(); }); + for (auto it = begin(), e = end(); it != e; ++it) { it->nsLck = it->ns->rLock(context_); } locked_ = true; diff --git a/cpp_src/core/transaction.cc b/cpp_src/core/transaction.cc index fc39bad7f..83f881da4 100644 --- a/cpp_src/core/transaction.cc +++ b/cpp_src/core/transaction.cc @@ -40,6 +40,14 @@ void Transaction::Modify(Query &&query) { if (impl_) impl_->Modify(std::move(query)); } +void Transaction::PutMeta(std::string_view key, std::string_view value) { + if (impl_) impl_->PutMeta(key, value); +} + +void Transaction::SetTagsMatcher(TagsMatcher &&tm) { + if (impl_) impl_->SetTagsMatcher(std::move(tm)); +} + Item Transaction::NewItem() { return impl_->NewItem(); } std::vector &Transaction::GetSteps() { diff --git a/cpp_src/core/transaction.h b/cpp_src/core/transaction.h index 0ab5c8b43..27709df4f 100644 --- a/cpp_src/core/transaction.h +++ b/cpp_src/core/transaction.h @@ -29,6 +29,8 @@ class Transaction { void Delete(Item &&item); void Modify(Item &&item, ItemModifyMode mode); void Modify(Query &&query); + void PutMeta(std::string_view key, std::string_view value); + void SetTagsMatcher(TagsMatcher &&tm); bool IsFree() { return impl_ == nullptr; } Item NewItem(); Item GetItem(TransactionStep &&st); diff --git a/cpp_src/core/transactionimpl.cc b/cpp_src/core/transactionimpl.cc index a7ccb4f16..e5c8e8853 100644 --- a/cpp_src/core/transactionimpl.cc +++ b/cpp_src/core/transactionimpl.cc @@ -7,18 +7,41 @@ namespace reindexer { void TransactionImpl::checkTagsMatcher(Item &item) { if (item.IsTagsUpdated()) { ItemImpl *ritem = item.impl_; - UpdateTagsMatcherFromItem(ritem); + if (ritem->Type().get() != payloadType_.get() || !tagsMatcher_.try_merge(ritem->tagsMatcher())) { + std::string jsonSliceBuf(ritem->GetJSON()); + + ItemImpl tmpItem(payloadType_, tagsMatcher_); + tmpItem.Value().SetLSN(ritem->Value().GetLSN()); + *ritem = std::move(tmpItem); + + auto err = ritem->FromJSON(jsonSliceBuf, nullptr); + if (!err.ok()) throw err; + + if (ritem->tagsMatcher().isUpdated() && !tagsMatcher_.try_merge(ritem->tagsMatcher())) + throw Error(errLogic, "Could not insert item. TagsMatcher was not merged."); + ritem->tagsMatcher() = tagsMatcher_; + ritem->tagsMatcher().setUpdated(); + } else { + ritem->tagsMatcher() = tagsMatcher_; + ritem->tagsMatcher().setUpdated(); + } tagsUpdated_ = true; } } Item TransactionImpl::NewItem() { std::unique_lock lock(mtx_); - return Item(new ItemImpl(payloadType_, tagsMatcher_, pkFields_)); + Item item(new ItemImpl(payloadType_, tagsMatcher_, pkFields_)); + item.impl_->tagsMatcher().clearUpdated(); + return item; } + Item TransactionImpl::GetItem(TransactionStep &&st) { std::unique_lock lock(mtx_); - return Item(new ItemImpl(payloadType_, tagsMatcher_, pkFields_, schema_, std::move(st.itemData_))); + auto &data = std::get(st.data_); + auto item = Item(new ItemImpl(payloadType_, tagsMatcher_, pkFields_, schema_, std::move(data.data))); + data.hadTmUpdate ? item.impl_->tagsMatcher().setUpdated() : item.impl_->tagsMatcher().clearUpdated(); + return item; } void TransactionImpl::ValidatePK(const FieldsSet &pkFields) { @@ -30,58 +53,57 @@ void TransactionImpl::ValidatePK(const FieldsSet &pkFields) { } } -void TransactionImpl::UpdateTagsMatcherFromItem(ItemImpl *ritem) { - if (ritem->Type().get() != payloadType_.get() || (ritem->tagsMatcher().isUpdated() && !tagsMatcher_.try_merge(ritem->tagsMatcher()))) { - std::string jsonSliceBuf(ritem->GetJSON()); - - ItemImpl tmpItem(payloadType_, tagsMatcher_); - tmpItem.Value().SetLSN(ritem->Value().GetLSN()); - *ritem = std::move(tmpItem); - - auto err = ritem->FromJSON(jsonSliceBuf, nullptr); - if (!err.ok()) throw err; - - if (ritem->tagsMatcher().isUpdated() && !tagsMatcher_.try_merge(ritem->tagsMatcher())) - throw Error(errLogic, "Could not insert item. TagsMatcher was not merged."); - ritem->tagsMatcher() = tagsMatcher_; - ritem->tagsMatcher().setUpdated(); - } - if (ritem->tagsMatcher().isUpdated()) { - ritem->tagsMatcher() = tagsMatcher_; - ritem->tagsMatcher().setUpdated(); - } -} - void TransactionImpl::Insert(Item &&item) { std::unique_lock lock(mtx_); checkTagsMatcher(item); - steps_.emplace_back(TransactionStep{std::move(item), ModeInsert}); + steps_.emplace_back(std::move(item), ModeInsert); } void TransactionImpl::Update(Item &&item) { std::unique_lock lock(mtx_); checkTagsMatcher(item); - steps_.emplace_back(TransactionStep{std::move(item), ModeUpdate}); + steps_.emplace_back(std::move(item), ModeUpdate); } void TransactionImpl::Upsert(Item &&item) { std::unique_lock lock(mtx_); checkTagsMatcher(item); - steps_.emplace_back(TransactionStep{std::move(item), ModeUpsert}); + steps_.emplace_back(std::move(item), ModeUpsert); } void TransactionImpl::Delete(Item &&item) { std::unique_lock lock(mtx_); checkTagsMatcher(item); - steps_.emplace_back(TransactionStep{std::move(item), ModeDelete}); + steps_.emplace_back(std::move(item), ModeDelete); } void TransactionImpl::Modify(Item &&item, ItemModifyMode mode) { std::unique_lock lock(mtx_); checkTagsMatcher(item); hasDeleteItemSteps_ = hasDeleteItemSteps_ || (mode == ModeDelete); - steps_.emplace_back(TransactionStep{std::move(item), mode}); + steps_.emplace_back(std::move(item), mode); } void TransactionImpl::Modify(Query &&query) { std::unique_lock lock(mtx_); - steps_.emplace_back(TransactionStep(std::move(query))); + steps_.emplace_back(std::move(query)); +} + +void TransactionImpl::PutMeta(std::string_view key, std::string_view value) { + if (key.empty()) { + throw Error(errLogic, "Empty meta key is not allowed in tx"); + } + + std::lock_guard lock(mtx_); + steps_.emplace_back(key, value); +} + +void TransactionImpl::SetTagsMatcher(TagsMatcher &&tm) { + std::lock_guard lock(mtx_); + // NOTE: In v4 tm tokens here are always the same, but in v3 those tokens are not synchronized. Probably it should workd anyway + // if (tm.stateToken() != tagsMatcher_.stateToken()) { + // throw Error(errParams, "Tx tm statetoken missmatch: %08X vs %08X", tagsMatcher_.stateToken(), tm.stateToken()); + // } + tagsMatcher_ = tm; + tagsMatcher_.UpdatePayloadType(payloadType_, false); + tagsUpdated_ = true; + steps_.emplace_back(std::move(tm)); } } // namespace reindexer diff --git a/cpp_src/core/transactionimpl.h b/cpp_src/core/transactionimpl.h index 89baa10d9..36403c28e 100644 --- a/cpp_src/core/transactionimpl.h +++ b/cpp_src/core/transactionimpl.h @@ -5,22 +5,46 @@ namespace reindexer { +struct TransactionItemStep { + ItemModifyMode mode; + bool hadTmUpdate; + ItemImplRawData data; +}; + +struct TransactionQueryStep { + std::unique_ptr query; +}; + +struct TransactionMetaStep { + std::string key; + std::string value; +}; + +struct TransactionTmStep { + TagsMatcher tm; +}; + class TransactionStep { public: - TransactionStep(Item &&item, ItemModifyMode modifyMode) : itemData_(std::move(*item.impl_)), modifyMode_(modifyMode), query_(nullptr) { + enum class Type : uint8_t { Nop, ModifyItem, Query, PutMeta, SetTM }; + + TransactionStep(Item &&item, ItemModifyMode modifyMode) + : data_(TransactionItemStep{modifyMode, item.IsTagsUpdated(), std::move(*item.impl_)}), type_(Type::ModifyItem) { delete item.impl_; item.impl_ = nullptr; } - TransactionStep(Query &&query) : modifyMode_(ModeUpdate), query_(new Query(std::move(query))) {} + TransactionStep(TagsMatcher &&tm) : data_(TransactionTmStep{std::move(tm)}), type_(Type::SetTM) {} + TransactionStep(Query &&query) : data_(TransactionQueryStep{std::make_unique(std::move(query))}), type_(Type::Query) {} + TransactionStep(std::string_view key, std::string_view value) + : data_(TransactionMetaStep{std::string(key), std::string(value)}), type_(Type::PutMeta) {} TransactionStep(const TransactionStep &) = delete; TransactionStep &operator=(const TransactionStep &) = delete; - TransactionStep(TransactionStep && /*rhs*/) noexcept = default; - TransactionStep &operator=(TransactionStep && /*rhs*/) = default; + TransactionStep(TransactionStep && /*rhs*/) = default; + TransactionStep &operator=(TransactionStep && /*rhs*/) = delete; - ItemImplRawData itemData_; - ItemModifyMode modifyMode_; - std::unique_ptr query_; + std::variant data_; + Type type_; }; class TransactionImpl { @@ -34,7 +58,9 @@ class TransactionImpl { nsName_(nsName), tagsUpdated_(false), hasDeleteItemSteps_(false), - startTime_(std::chrono::high_resolution_clock::now()) {} + startTime_(std::chrono::high_resolution_clock::now()) { + tagsMatcher_.clearUpdated(); + } void Insert(Item &&item); void Update(Item &&item); @@ -42,8 +68,9 @@ class TransactionImpl { void Delete(Item &&item); void Modify(Item &&item, ItemModifyMode mode); void Modify(Query &&item); + void PutMeta(std::string_view key, std::string_view value); + void SetTagsMatcher(TagsMatcher &&tm); - void UpdateTagsMatcherFromItem(ItemImpl *ritem); Item NewItem(); Item GetItem(TransactionStep &&st); void ValidatePK(const FieldsSet &pkFields); diff --git a/cpp_src/estl/contexted_locks.h b/cpp_src/estl/contexted_locks.h index ba8dc9045..2e60b0ec8 100644 --- a/cpp_src/estl/contexted_locks.h +++ b/cpp_src/estl/contexted_locks.h @@ -13,32 +13,27 @@ using std::adopt_lock_t; namespace reindexer { -const milliseconds kDefaultCondChkTime = milliseconds(20); +constexpr milliseconds kDefaultCondChkTime = milliseconds(20); template class contexted_unique_lock { public: using MutexType = _Mutex; - explicit contexted_unique_lock() : _M_mtx(nullptr), _M_owns(false), _M_context(nullptr), _M_chkTimeout(kDefaultCondChkTime) {} - explicit contexted_unique_lock(MutexType& __mtx, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(false), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); + explicit contexted_unique_lock() noexcept : _M_mtx(nullptr), _M_owns(false), _M_context(nullptr), _M_chkTimeout(kDefaultCondChkTime) {} + explicit contexted_unique_lock(MutexType& __mtx, Context& __context, milliseconds __chkTimeout = kDefaultCondChkTime) + : _M_mtx(&__mtx), _M_owns(false), _M_context(&__context), _M_chkTimeout(__chkTimeout) { lock(); } - explicit contexted_unique_lock(MutexType& __mtx, defer_lock_t, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(false), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); - } - explicit contexted_unique_lock(MutexType& __mtx, adopt_lock_t, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(true), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); - } - explicit contexted_unique_lock(MutexType& __mtx, try_to_lock_t, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(__mtx.try_lock()), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); - } - contexted_unique_lock(contexted_unique_lock&& lck) + explicit contexted_unique_lock(MutexType& __mtx, defer_lock_t, Context& __context, + milliseconds __chkTimeout = kDefaultCondChkTime) noexcept + : _M_mtx(&__mtx), _M_owns(false), _M_context(&__context), _M_chkTimeout(__chkTimeout) {} + explicit contexted_unique_lock(MutexType& __mtx, adopt_lock_t, Context& __context, + milliseconds __chkTimeout = kDefaultCondChkTime) noexcept + : _M_mtx(&__mtx), _M_owns(true), _M_context(&__context), _M_chkTimeout(__chkTimeout) {} + explicit contexted_unique_lock(MutexType& __mtx, try_to_lock_t, Context& __context, milliseconds __chkTimeout = kDefaultCondChkTime) + : _M_mtx(&__mtx), _M_owns(__mtx.try_lock()), _M_context(&__context), _M_chkTimeout(__chkTimeout) {} + contexted_unique_lock(contexted_unique_lock&& lck) noexcept : _M_mtx(lck._M_mtx), _M_owns(lck._M_owns), _M_context(lck._M_context), _M_chkTimeout(lck._M_chkTimeout) { lck._M_owns = false; lck._M_mtx = nullptr; @@ -50,7 +45,7 @@ class contexted_unique_lock { contexted_unique_lock(const contexted_unique_lock&) = delete; contexted_unique_lock& operator=(const contexted_unique_lock&) = delete; - contexted_unique_lock& operator=(contexted_unique_lock&& lck) { + contexted_unique_lock& operator=(contexted_unique_lock&& lck) noexcept { if (this != &lck) { if (_M_owns) unlock(); _M_mtx = lck._M_mtx; @@ -102,7 +97,7 @@ class contexted_unique_lock { MutexType* mutex() const noexcept { return _M_mtx; } private: - void _M_lockable() const { + void _M_lockable() const noexcept { if (_M_mtx == nullptr) assertrx(0); if (_M_owns) assertrx(0); } @@ -118,21 +113,17 @@ class contexted_shared_lock { public: using MutexType = _Mutex; - explicit contexted_shared_lock() : _M_mtx(nullptr), _M_owns(false), _M_context(nullptr), _M_chkTimeout(kDefaultCondChkTime) {} - explicit contexted_shared_lock(MutexType& __mtx, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(false), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); + explicit contexted_shared_lock() noexcept : _M_mtx(nullptr), _M_owns(false), _M_context(nullptr), _M_chkTimeout(kDefaultCondChkTime) {} + explicit contexted_shared_lock(MutexType& __mtx, Context& __context, milliseconds __chkTimeout = kDefaultCondChkTime) + : _M_mtx(&__mtx), _M_owns(false), _M_context(&__context), _M_chkTimeout(__chkTimeout) { lock(); } - explicit contexted_shared_lock(MutexType& __mtx, adopt_lock_t, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(true), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); - } - explicit contexted_shared_lock(MutexType& __mtx, try_to_lock_t, Context* __context, milliseconds __chkTimeout = kDefaultCondChkTime) - : _M_mtx(&__mtx), _M_owns(__mtx.try_lock()), _M_context(__context), _M_chkTimeout(__chkTimeout) { - assertrx(_M_context); - } - contexted_shared_lock(contexted_shared_lock&& lck) + explicit contexted_shared_lock(MutexType& __mtx, adopt_lock_t, Context& __context, + milliseconds __chkTimeout = kDefaultCondChkTime) noexcept + : _M_mtx(&__mtx), _M_owns(true), _M_context(&__context), _M_chkTimeout(__chkTimeout) {} + explicit contexted_shared_lock(MutexType& __mtx, try_to_lock_t, Context& __context, milliseconds __chkTimeout = kDefaultCondChkTime) + : _M_mtx(&__mtx), _M_owns(__mtx.try_lock()), _M_context(&__context), _M_chkTimeout(__chkTimeout) {} + contexted_shared_lock(contexted_shared_lock&& lck) noexcept : _M_mtx(lck._M_mtx), _M_owns(lck._M_owns), _M_context(lck._M_context), _M_chkTimeout(lck._M_chkTimeout) { lck._M_owns = false; lck._M_mtx = nullptr; @@ -144,7 +135,7 @@ class contexted_shared_lock { contexted_shared_lock(const contexted_shared_lock&) = delete; contexted_shared_lock& operator=(const contexted_shared_lock&) = delete; - contexted_shared_lock& operator=(contexted_shared_lock&& lck) { + contexted_shared_lock& operator=(contexted_shared_lock&& lck) noexcept { if (this != &lck) { if (_M_owns) unlock(); _M_mtx = lck._M_mtx; @@ -196,7 +187,7 @@ class contexted_shared_lock { MutexType* mutex() const noexcept { return _M_mtx; } private: - void _M_lockable() const { + void _M_lockable() const noexcept { if (_M_mtx == nullptr) assertrx(0); if (_M_owns) assertrx(0); } diff --git a/cpp_src/estl/h_vector.h b/cpp_src/estl/h_vector.h index 1f26945d3..bcba9a7b0 100644 --- a/cpp_src/estl/h_vector.h +++ b/cpp_src/estl/h_vector.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "debug_macros.h" #include "trivial_reverse_iterator.h" diff --git a/cpp_src/gtests/tests/API/api.cc b/cpp_src/gtests/tests/API/api.cc index bdb97049f..cf14719c5 100644 --- a/cpp_src/gtests/tests/API/api.cc +++ b/cpp_src/gtests/tests/API/api.cc @@ -1,9 +1,19 @@ #include #include "gtest/gtest.h" +#include "tools/assertrx.h" +#include "tools/fsops.h" -int main(int argc, char* argv[]) { +int main(int argc, char *argv[]) { srand(time(NULL)); ::testing::InitGoogleTest(&argc, argv); + +#ifndef _WIN32 + const char *tmpDir = getenv("REINDEXER_TEST_DB_ROOT"); + if (tmpDir && *tmpDir) { + reindexer::fs::SetTempDir(std::string(tmpDir)); + } +#endif // _WIN32 + return RUN_ALL_TESTS(); } diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/index.cc b/cpp_src/gtests/tests/fixtures/fuzzing/index.cc index cb9f50d4e..17f7f9c59 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/index.cc +++ b/cpp_src/gtests/tests/fixtures/fuzzing/index.cc @@ -13,7 +13,7 @@ reindexer::IndexDef Index::IndexDef(RandomGenerator& rnd, const NsScheme& scheme IndexOpts opts; const bool pk = rnd.PkIndex(isPk_); opts.PK(pk); - opts.Array(rnd.RndArrayField(isArray_) == IsArray::Yes); + opts.Array(rnd.RndArrayField(isArray_) == IsArrayT::Yes); opts.Sparse(rnd.RndSparseIndex(isSparse_)); opts.Dense(rnd.DenseIndex()); opts.RTreeType(static_cast(rnd.RndInt(IndexOpts::Linear, IndexOpts::RStar))); @@ -58,7 +58,7 @@ void Index::Dump(std::ostream& os, const NsScheme& scheme, size_t offset) const for (size_t i = 0; i <= offset; ++i) os << " "; os << "array: " << std::boolalpha << IsArray() << '\n'; for (size_t i = 0; i <= offset; ++i) os << " "; - os << "sparse: " << std::boolalpha << (IsSparse() == IsSparse::Yes) << '\n'; + os << "sparse: " << std::boolalpha << (IsSparse() == IsSparseT::Yes) << '\n'; for (size_t i = 0; i <= offset; ++i) os << " "; std::visit(reindexer::overloaded{[&](const Child& child) { os << "composite: false\n"; diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/index.h b/cpp_src/gtests/tests/fixtures/fuzzing/index.h index 80407211c..4fe90a47b 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/index.h +++ b/cpp_src/gtests/tests/fixtures/fuzzing/index.h @@ -22,9 +22,9 @@ class Index { }; using Children = std::vector; - Index(std::string name, IndexType type, IsArray isArray, IsSparse isSparse, Children content) noexcept + Index(std::string name, IndexType type, IsArrayT isArray, IsSparseT isSparse, Children content) noexcept : name_{std::move(name)}, type_{type}, content_{std::move(content)}, isArray_{isArray}, isSparse_{isSparse} {} - Index(std::string name, IndexType type, IsArray isArray, IsSparse isSparse, Child content) noexcept + Index(std::string name, IndexType type, IsArrayT isArray, IsSparseT isSparse, Child content) noexcept : name_{std::move(name)}, type_{type}, content_{std::move(content)}, isArray_{isArray}, isSparse_{isSparse} {} const std::string& Name() const& noexcept { return name_; } @@ -34,7 +34,7 @@ class Index { const auto& Content() const&& = delete; bool IsPk() const noexcept { return isPk_; } void SetPk() noexcept { isPk_ = true; } - bool IsArray() const noexcept { return isArray_ == IsArray::Yes; } + bool IsArray() const noexcept { return isArray_ == IsArrayT::Yes; } auto IsSparse() const noexcept { return isSparse_; } reindexer::IndexDef IndexDef(RandomGenerator&, const NsScheme&, const std::vector&) const; @@ -46,8 +46,8 @@ class Index { IndexType type_; std::variant content_; bool isPk_{false}; - enum IsArray isArray_ { IsArray::No }; - enum IsSparse isSparse_ { IsSparse::No }; + IsArrayT isArray_{IsArrayT::No}; + IsSparseT isSparse_{IsSparseT::No}; }; } // namespace fuzzing diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/ns.cc b/cpp_src/gtests/tests/fixtures/fuzzing/ns.cc index 5a520d4a0..831653a84 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/ns.cc +++ b/cpp_src/gtests/tests/fixtures/fuzzing/ns.cc @@ -99,7 +99,7 @@ Ns::Ns(std::string name, RandomGenerator::ErrFactorType errorFactor) usedIndexNames.insert(name); } - indexes_.emplace_back(std::move(name), indexType, rndGen_.RndArrayField(array ? IsArray::Yes : IsArray::No), IsSparse::No, + indexes_.emplace_back(std::move(name), indexType, rndGen_.RndArrayField(array ? IsArrayT::Yes : IsArrayT::No), IsSparseT::No, std::move(children)); } else { FieldPath fldPath; @@ -113,8 +113,8 @@ Ns::Ns(std::string name, RandomGenerator::ErrFactorType errorFactor) if (!rndGen_.RndErr()) continue; const auto fldType = rndGen_.RndFieldType(); indexes_.emplace_back(rndGen_.IndexName(usedIndexNames), rndGen_.RndIndexType({fldType}), - rndGen_.RndBool(0.5) ? IsArray::Yes : IsArray::No, - rndGen_.RndBool(0.5) ? IsSparse::Yes : IsSparse::No, Index::Child{fldType, std::move(fldPath)}); + rndGen_.RndBool(0.5) ? IsArrayT::Yes : IsArrayT::No, + rndGen_.RndBool(0.5) ? IsSparseT::Yes : IsSparseT::No, Index::Child{fldType, std::move(fldPath)}); } else { const auto fldType = scheme_.GetFieldType(fldPath); const auto isArray = scheme_.IsArray(fldPath); @@ -132,7 +132,7 @@ Ns::Ns(std::string name, RandomGenerator::ErrFactorType errorFactor) rndGen_.RndSparseIndex(fldType), Index::Child{fldType, std::move(fldPath)}); } if (const auto& idx = indexes_.back(); - !idx.IsArray() && idx.IsSparse() == IsSparse::No && + !idx.IsArray() && idx.IsSparse() == IsSparseT::No && std::get(idx.Content()).type != FieldType::Point) { // TODO remove point check after #1352 scalarIndexes.push_back(indexes_.size() - 1); } @@ -145,7 +145,7 @@ Ns::Ns(std::string name, RandomGenerator::ErrFactorType errorFactor) std::vector ii; for (size_t i = 0, s = indexes_.size(); i < s; ++i) { const auto& idx = indexes_[i]; - if (!idx.IsArray() && idx.IsSparse() == IsSparse::No && availablePkIndexType(idx.Type()) && + if (!idx.IsArray() && idx.IsSparse() == IsSparseT::No && availablePkIndexType(idx.Type()) && (std::holds_alternative(idx.Content()) || availablePkFieldType(std::get(idx.Content()).type))) { ii.push_back(i); } @@ -163,7 +163,7 @@ Ns::Ns(std::string name, RandomGenerator::ErrFactorType errorFactor) usedIndexNames.insert(name); } } - indexes_.emplace_back(std::move(name), rndGen_.RndPkIndexType({fldType}), IsArray::No, IsSparse::No, + indexes_.emplace_back(std::move(name), rndGen_.RndPkIndexType({fldType}), IsArrayT::No, IsSparseT::No, Index::Child{fldType, std::move(path)}); indexes_.back().SetPk(); } else { diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.cc b/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.cc index 8c027dbc6..0a1c92829 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.cc +++ b/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.cc @@ -77,13 +77,13 @@ size_t NsScheme::FieldsCount(const FieldPath& path) const noexcept { ref[path.back()].content); } -IsArray NsScheme::IsArray(const FieldPath& path) const noexcept { +IsArrayT NsScheme::IsArray(const FieldPath& path) const noexcept { if (path.empty()) return ns_.array; const Node::Children* ptr = &std::get(ns_.content); for (size_t i = 0, s = path.size() - 1; i < s; ++i) { assertrx(ptr->size() > path[i]); const auto& idx = (*ptr)[path[i]]; - if (idx.array == IsArray::Yes) return IsArray::Yes; + if (idx.array == IsArrayT::Yes) return IsArrayT::Yes; std::visit( reindexer::overloaded{[&ptr](const Node::Children& c) noexcept { ptr = &c; }, [](const Node::Child&) noexcept { assertrx(0); }}, idx.content); @@ -127,16 +127,16 @@ std::string NsScheme::GetJsonPath(const FieldPath& path) const noexcept { return res; } -void NsScheme::AddIndex(const FieldPath& path, size_t index, IsSparse isSparse) { +void NsScheme::AddIndex(const FieldPath& path, size_t index, IsSparseT isSparse) { assertrx(!path.empty()); - if (isSparse == IsSparse::No) { - ns_.sparse = IsSparse::No; + if (isSparse == IsSparseT::No) { + ns_.sparse = IsSparseT::No; } Node::Children* ptr = &std::get(ns_.content); for (size_t i = 0, s = path.size() - 1; i < s; ++i) { assertrx(ptr->size() > path[i]); - if (isSparse == IsSparse::No) { - (*ptr)[path[i]].sparse = IsSparse::No; + if (isSparse == IsSparseT::No) { + (*ptr)[path[i]].sparse = IsSparseT::No; } std::visit(reindexer::overloaded{[&ptr](Node::Children& c) noexcept { ptr = &c; }, [](Node::Child&) noexcept { assertrx(0); }}, (*ptr)[path[i]].content); @@ -148,14 +148,14 @@ void NsScheme::AddIndex(const FieldPath& path, size_t index, IsSparse isSparse) FieldPath NsScheme::AddRndPkField(RandomGenerator& rnd) { auto& children = std::get(ns_.content); children.emplace_back(Node{rnd.FieldName(generatedNames_), Node::Child{rnd.RndPkIndexFieldType()}}); - children.back().array = IsArray::No; - children.back().sparse = IsSparse::No; + children.back().array = IsArrayT::No; + children.back().sparse = IsSparseT::No; return {children.size() - 1}; } -void NsScheme::addIndex(Node& node, size_t index, IsSparse isSparse) { - if (isSparse == IsSparse::No) { - node.sparse = IsSparse::No; +void NsScheme::addIndex(Node& node, size_t index, IsSparseT isSparse) { + if (isSparse == IsSparseT::No) { + node.sparse = IsSparseT::No; } std::visit(reindexer::overloaded{[index](Node::Child& c) noexcept { c.indexes.push_back(index); }, [](Node::Children&) noexcept { assertrx(0); }}, @@ -175,14 +175,14 @@ void NsScheme::fillChildren(Node::Children& children, RandomGenerator& rnd, unsi children.back().array = rnd.RndArrayField(); } if (!canBeSparse && !rnd.RndErr()) { - children.back().sparse = IsSparse::No; + children.back().sparse = IsSparseT::No; } } else { children.emplace_back(Node{std::move(fName), Node::Child{type}}); if (type == FieldType::Point) { canBeSparse = false; canBeArray = false; - children.back().sparse = IsSparse::No; + children.back().sparse = IsSparseT::No; } if (canBeArray || rnd.RndErr()) { children.back().array = rnd.RndArrayField(); @@ -256,7 +256,7 @@ void NsScheme::toJson(reindexer::JsonBuilder& builder, const Node::Children& chi const std::vector& indexes) { for (const Node& n : children) { if (!rnd.NeedThisNode(n.sparse)) continue; - if (rnd.RndArrayField(n.array) == IsArray::Yes) { + if (rnd.RndArrayField(n.array) == IsArrayT::Yes) { auto arr = builder.Array(n.name); const size_t arrSize = rnd.ArraySize(); for (size_t i = 0; i < arrSize; ++i) { @@ -294,9 +294,9 @@ void NsScheme::Node::Dump(std::ostream& os, size_t offset) const { for (size_t i = 0; i <= offset; ++i) os << " "; os << "name: " << name << '\n'; for (size_t i = 0; i <= offset; ++i) os << " "; - os << "sparse: " << std::boolalpha << (sparse == IsSparse::Yes) << '\n'; + os << "sparse: " << std::boolalpha << (sparse == IsSparseT::Yes) << '\n'; for (size_t i = 0; i <= offset; ++i) os << " "; - os << "array: " << std::boolalpha << (array == IsArray::Yes) << '\n'; + os << "array: " << std::boolalpha << (array == IsArrayT::Yes) << '\n'; std::visit(reindexer::overloaded{[&](const Child& child) { for (size_t i = 0; i <= offset; ++i) os << " "; os << "type: " << child.type << '\n'; diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.h b/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.h index a63a76e38..8123d726f 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.h +++ b/cpp_src/gtests/tests/fixtures/fuzzing/ns_scheme.h @@ -29,8 +29,8 @@ class NsScheme { std::string name; std::variant content; - IsSparse sparse{IsSparse::Yes}; - IsArray array{IsArray::No}; + IsSparseT sparse{IsSparseT::Yes}; + IsArrayT array{IsArrayT::No}; void Dump(std::ostream&, size_t offset) const; }; @@ -43,17 +43,17 @@ class NsScheme { bool IsStruct(const FieldPath&) const noexcept; bool IsPoint(const FieldPath&) const noexcept; bool IsTtl(const FieldPath&, const std::vector&) const noexcept; - enum IsArray IsArray(const FieldPath&) const noexcept; + IsArrayT IsArray(const FieldPath&) const noexcept; FieldType GetFieldType(const FieldPath&) const noexcept; void SetFieldType(const FieldPath&, FieldType) noexcept; std::string GetJsonPath(const FieldPath&) const noexcept; - void AddIndex(const FieldPath&, size_t index, IsSparse); + void AddIndex(const FieldPath&, size_t index, IsSparseT); void NewItem(reindexer::WrSerializer&, RandomGenerator&, const std::vector&); void Dump(std::ostream& os, size_t offset) const { ns_.Dump(os, offset); } FieldPath AddRndPkField(RandomGenerator&); private: - static void addIndex(Node&, size_t index, IsSparse); + static void addIndex(Node&, size_t index, IsSparseT); void fillChildren(Node::Children&, RandomGenerator&, unsigned level, bool& canBeArray, bool& canBeSparse); const Node::Children& findLastContainer(const FieldPath&) const noexcept; Node::Children& findLastContainer(const FieldPath&) noexcept; diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.cc b/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.cc index e207a6667..82d85a68e 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.cc +++ b/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.cc @@ -228,7 +228,7 @@ FieldPath RandomGenerator::RndScalarField(const NsScheme& nsScheme) { const int end = idx + size; while (idx < end) { res.back() = idx % size; - if (nsScheme.IsArray(res) == IsArray::No && !nsScheme.IsPoint(res)) break; + if (nsScheme.IsArray(res) == IsArrayT::No && !nsScheme.IsPoint(res)) break; ++idx; } if (idx == end) return {}; diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.h b/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.h index 4960f28c3..bebb70490 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.h +++ b/cpp_src/gtests/tests/fixtures/fuzzing/random_generator.h @@ -60,20 +60,20 @@ class RandomGenerator { IndexType RndIndexType(const std::vector&); IndexType RndPkIndexType(const std::vector&); IndexType RndIndexType(IndexType); - IsArray RndArrayField() { return RndBool(0.2) ? IsArray::Yes : IsArray::No; } - IsArray RndArrayField(IsArray array) { + IsArrayT RndArrayField() { return RndBool(0.2) ? IsArrayT::Yes : IsArrayT::No; } + IsArrayT RndArrayField(IsArrayT array) { if (RndErr()) { - return array == IsArray::Yes ? IsArray::No : IsArray::Yes; + return array == IsArrayT::Yes ? IsArrayT::No : IsArrayT::Yes; } return array; } size_t ArraySize(); bool PkIndex(bool pk) { return RndErr() ? RndBool(0.5) : pk; } - IsSparse RndSparseIndex(FieldType fldType) { + IsSparseT RndSparseIndex(FieldType fldType) { const bool couldBeSparse = fldType != FieldType::Struct && fldType != FieldType::Uuid; // TODO remove uuid #1470 - return (couldBeSparse ? RndBool(0.2) : RndErr()) ? IsSparse::Yes : IsSparse::No; + return (couldBeSparse ? RndBool(0.2) : RndErr()) ? IsSparseT::Yes : IsSparseT::No; } - bool RndSparseIndex(IsSparse isSparse) { return (isSparse == IsSparse::Yes) != RndErr(); } + bool RndSparseIndex(IsSparseT isSparse) { return (isSparse == IsSparseT::Yes) != RndErr(); } bool DenseIndex() { return RndBool(0.2); } int64_t ExpiredIndex() { return RndInt(0, 100'000); } // TODO size_t IndexesCount(); @@ -137,7 +137,7 @@ class RandomGenerator { return err; } char RndChar() { return rndChar_(gen_); } - bool NeedThisNode(IsSparse sparse) { return sparse == IsSparse::Yes ? RndBool(0.5) : !RndErr(); } + bool NeedThisNode(IsSparseT sparse) { return sparse == IsSparseT::Yes ? RndBool(0.5) : !RndErr(); } int RndIntValue() { enum Size : uint8_t { Short, Long, END = Long }; switch (RndWhich()) { diff --git a/cpp_src/gtests/tests/fixtures/fuzzing/types.h b/cpp_src/gtests/tests/fixtures/fuzzing/types.h index db97087c6..98ec500a8 100644 --- a/cpp_src/gtests/tests/fixtures/fuzzing/types.h +++ b/cpp_src/gtests/tests/fixtures/fuzzing/types.h @@ -23,7 +23,7 @@ enum class IndexType { Store, Hash, Tree, Ttl, FastFT, FuzzyFT, RTree, END = RTr std::string_view ToText(IndexType); std::ostream& operator<<(std::ostream&, IndexType); -enum class IsArray : bool { Yes = true, No = false }; -enum class IsSparse : bool { Yes = true, No = false }; +enum class IsArrayT : bool { Yes = true, No = false }; +enum class IsSparseT : bool { Yes = true, No = false }; } // namespace fuzzing diff --git a/cpp_src/readme.md b/cpp_src/readme.md index 05c6e0cfb..b496cca9e 100644 --- a/cpp_src/readme.md +++ b/cpp_src/readme.md @@ -46,7 +46,7 @@ yum update yum install reindexer-server ``` -Available distros: `centos-7`, `fedora-36`, `fedora-37`, `redos-7` +Available distros: `centos-7`, `fedora-38`, `fedora-39`, `redos-7` ### Ubuntu/Debian diff --git a/cpp_src/replicator/replicator.cc b/cpp_src/replicator/replicator.cc index b44862777..93e8bdaa4 100644 --- a/cpp_src/replicator/replicator.cc +++ b/cpp_src/replicator/replicator.cc @@ -705,6 +705,20 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp slaveNs->CommitTransaction(tx, res, rdxContext); tx = Transaction{}; } break; + case WalTagsMatcher: { + std::lock_guard lck(syncMtx_); + Transaction &tx = transactions_[slaveNs.get()]; + if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Transaction was not initiated.", nsName, config_.serverId); + + TagsMatcher tm; + Serializer ser(rec.data.data(), rec.data.size()); + const auto version = ser.GetVarint(); + const auto stateToken = ser.GetVarint(); + tm.deserialize(ser, version, stateToken); + logPrintf(LogInfo, "[repl:%s]:%d Got new tagsmatcher replicated via tx: { state_token: %08X, version: %d }", nsName, + config_.serverId, stateToken, version); + tx.SetTagsMatcher(std::move(tm)); + } break; case WalEmpty: case WalReplState: case WalItemUpdate: @@ -718,6 +732,9 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp case WalForceSync: case WalSetSchema: case WalWALSync: + case WalResetLocalWal: + case WalRawItem: + case WalShallowItem: return Error(errLogic, "Unexpected for transaction WAL rec type %d\n", int(rec.type)); } return {}; @@ -850,10 +867,24 @@ Error Replicator::applyWALRecord(LSNPair LSNs, std::string_view nsName, Namespac slaveNs->SetSchema(rec.data, rdxContext); stat.schemasSet++; break; + case WalTagsMatcher: { + checkNoOpenedTransaction(nsName, slaveNs); + TagsMatcher tm; + Serializer ser(rec.data.data(), rec.data.size()); + const auto version = ser.GetVarint(); + const auto stateToken = ser.GetVarint(); + tm.deserialize(ser, version, stateToken); + logPrintf(LogInfo, "[repl:%s]:%d Got new tagsmatcher replicated via single record: { state_token: %08X, version: %d }", nsName, + config_.serverId, stateToken, version); + slaveNs->SetTagsMatcher(std::move(tm), rdxContext); + } break; case WalEmpty: case WalItemUpdate: case WalInitTransaction: case WalCommitTransaction: + case WalResetLocalWal: + case WalRawItem: + case WalShallowItem: return Error(errLogic, "Unexpected WAL rec type %d\n", int(rec.type)); } return err; @@ -982,6 +1013,24 @@ Error Replicator::syncMetaForced(Namespace::Ptr &slaveNs, std::string_view nsNam // Callback from WAL updates pusher void Replicator::OnWALUpdate(LSNPair LSNs, std::string_view nsName, const WALRecord &wrec) { + try { + onWALUpdateImpl(LSNs, nsName, wrec); + } catch (Error &e) { + logPrintf(LogError, "[repl:%s]:%d Exception on WAL update: %s", nsName, config_.serverId, e.what()); + assertrx_dbg(false); + resync_.send(); + } catch (std::exception &e) { + logPrintf(LogError, "[repl:%s]:%d Exception on WAL update (std::exception): %s", nsName, config_.serverId, e.what()); + assertrx_dbg(false); + resync_.send(); + } catch (...) { + logPrintf(LogError, "[repl:%s]:%d Exception on WAL update: ", nsName, config_.serverId); + assertrx_dbg(false); + resync_.send(); + } +} + +void Replicator::onWALUpdateImpl(LSNPair LSNs, std::string_view nsName, const WALRecord &wrec) { auto sId = LSNs.originLSN_.Server(); if (sId != 0) { // sId = 0 for configurations without specifying a server id if (sId == config_.serverId) { diff --git a/cpp_src/replicator/replicator.h b/cpp_src/replicator/replicator.h index 0a2c9b4bf..5a3949a9d 100644 --- a/cpp_src/replicator/replicator.h +++ b/cpp_src/replicator/replicator.h @@ -94,7 +94,8 @@ class Replicator : public IUpdatesObserver { // Push update to the queue to apply it later void pushPendingUpdate(LSNPair LSNs, std::string_view nsName, const WALRecord &wrec); - void OnWALUpdate(LSNPair LSNs, std::string_view nsName, const WALRecord &walRec) override final; + void OnWALUpdate(LSNPair LSNs, std::string_view nsName, const WALRecord &wrec) override final; + void onWALUpdateImpl(LSNPair LSNs, std::string_view nsName, const WALRecord &wrec); void OnUpdatesLost(std::string_view nsName) override final; void OnConnectionState(const Error &err) override final; diff --git a/cpp_src/replicator/walrecord.cc b/cpp_src/replicator/walrecord.cc index e97d56757..b2222f6ec 100644 --- a/cpp_src/replicator/walrecord.cc +++ b/cpp_src/replicator/walrecord.cc @@ -19,8 +19,9 @@ void WALRecord::Pack(WrSerializer &ser) const { ser.PutVarUint(inTransaction ? (type | TxBit) : type); switch (type) { case WalItemUpdate: + case WalShallowItem: ser.PutUInt32(id); - break; + return; case WalUpdateQuery: case WalIndexAdd: case WalIndexDrop: @@ -30,29 +31,32 @@ void WALRecord::Pack(WrSerializer &ser) const { case WalForceSync: case WalWALSync: case WalSetSchema: + case WalTagsMatcher: ser.PutVString(data); - break; + return; case WalPutMeta: ser.PutVString(putMeta.key); ser.PutVString(putMeta.value); - break; + return; case WalItemModify: ser.PutVString(itemModify.itemCJson); ser.PutVarUint(itemModify.modifyMode); ser.PutVarUint(itemModify.tmVersion); - break; + return; + case WalRawItem: + ser.PutUInt32(rawItem.id); + ser.PutVString(rawItem.itemCJson); + return; case WalEmpty: - ser.Reset(); - break; case WalNamespaceAdd: case WalNamespaceDrop: case WalInitTransaction: case WalCommitTransaction: - break; - default: - fprintf(stderr, "Unexpected WAL rec type %d\n", int(type)); - std::abort(); + case WalResetLocalWal: + return; } + fprintf(stderr, "Unexpected WAL rec type %d\n", int(type)); + std::abort(); } WALRecord::WALRecord(span packed) { @@ -71,8 +75,9 @@ WALRecord::WALRecord(span packed) { } switch (type) { case WalItemUpdate: + case WalShallowItem: id = ser.GetUInt32(); - break; + return; case WalUpdateQuery: case WalIndexAdd: case WalIndexDrop: @@ -82,27 +87,31 @@ WALRecord::WALRecord(span packed) { case WalForceSync: case WalWALSync: case WalSetSchema: + case WalTagsMatcher: data = ser.GetVString(); - break; + return; case WalPutMeta: putMeta.key = ser.GetVString(); putMeta.value = ser.GetVString(); - break; + return; case WalItemModify: itemModify.itemCJson = ser.GetVString(); itemModify.modifyMode = ser.GetVarUint(); itemModify.tmVersion = ser.GetVarUint(); - break; + return; + case WalRawItem: + rawItem.id = ser.GetUInt32(); + rawItem.itemCJson = ser.GetVString(); + return; case WalEmpty: case WalNamespaceAdd: case WalNamespaceDrop: case WalInitTransaction: case WalCommitTransaction: - break; - default: - logPrintf(LogWarning, "Unexpected WAL rec type %d\n", int(type)); - break; + case WalResetLocalWal: + return; } + logPrintf(LogError, "Unexpected WAL rec type %d\n", int(type)); } static std::string_view wrecType2Str(WALRecType t) { @@ -142,9 +151,16 @@ static std::string_view wrecType2Str(WALRecType t) { return "WalWALSync"sv; case WalSetSchema: return "WalSetSchema"sv; - default: - return ""sv; + case WalRawItem: + return "WalRawItem"sv; + case WalShallowItem: + return "WalShallowItem"sv; + case WalTagsMatcher: + return "WalTagsMatcher"sv; + case WalResetLocalWal: + return "WalResetLocalWal"sv; } + return ""sv; } WrSerializer &WALRecord::Dump(WrSerializer &ser, const std::function &cjsonViewer) const { @@ -156,10 +172,13 @@ WrSerializer &WALRecord::Dump(WrSerializer &ser, const std::function &cjsonViewer) const { @@ -191,8 +210,10 @@ void WALRecord::GetJSON(JsonBuilder &jb, const std::function(reinterpret_cast(data.data()), data.size())) {} diff --git a/cpp_src/replicator/walrecord.h b/cpp_src/replicator/walrecord.h index 1b6a0e9e0..bb25452eb 100644 --- a/cpp_src/replicator/walrecord.h +++ b/cpp_src/replicator/walrecord.h @@ -30,6 +30,10 @@ enum WALRecType { WalForceSync = 14, WalSetSchema = 15, WalWALSync = 16, + WalTagsMatcher = 17, + WalResetLocalWal = 18, + WalRawItem = 19, + WalShallowItem = 20, }; class WrSerializer; @@ -57,8 +61,8 @@ struct WALRecord { explicit WALRecord(WALRecType _type, std::string_view key, std::string_view value) : type(_type), putMeta{key, value} {} explicit WALRecord(WALRecType _type, std::string_view cjson, int tmVersion, int modifyMode, bool inTx = false) : type(_type), itemModify{cjson, tmVersion, modifyMode}, inTransaction(inTx) {} - WrSerializer &Dump(WrSerializer &ser, const std::function& cjsonViewer) const; - void GetJSON(JsonBuilder &jb, const std::function& cjsonViewer) const; + WrSerializer &Dump(WrSerializer &ser, const std::function &cjsonViewer) const; + void GetJSON(JsonBuilder &jb, const std::function &cjsonViewer) const; void Pack(WrSerializer &ser) const; SharedWALRecord GetShared(int64_t lsn, int64_t upstreamLSN, std::string_view nsName) const; @@ -75,6 +79,10 @@ struct WALRecord { std::string_view key; std::string_view value; } putMeta; + struct { + IdType id; + std::string_view itemCJson; + } rawItem; }; bool inTransaction = false; mutable SharedWALRecord shared_; diff --git a/cpp_src/replicator/walselecter.cc b/cpp_src/replicator/walselecter.cc index 23da38c5c..12eec12d5 100644 --- a/cpp_src/replicator/walselecter.cc +++ b/cpp_src/replicator/walselecter.cc @@ -106,6 +106,10 @@ void WALSelecter::operator()(QueryResults &result, SelectCtx ¶ms) { case WalNamespaceRename: case WalForceSync: case WalWALSync: + case WalRawItem: + case WalShallowItem: + case WalTagsMatcher: + case WalResetLocalWal: std::abort(); } } diff --git a/cpp_src/server/contrib/server.yml b/cpp_src/server/contrib/server.yml index 2df0c33cd..bdc4fe6bc 100644 --- a/cpp_src/server/contrib/server.yml +++ b/cpp_src/server/contrib/server.yml @@ -2738,8 +2738,8 @@ definitions: type: integer default: 20000 minimum: 0 - maximum: 65535 - description: "Maximum documents count which will be processed in merge query results. Increasing this value may refine ranking of queries with high frequency words, but will decrease search speed" + maximum: 65000 + description: "Maximum documents count which will be processed in merge query results. Increasing this value may refine ranking of queries with high frequency words, but will decrease search speed" extra_word_symbols: type: string default: "-/+" @@ -3984,7 +3984,14 @@ definitions: description: "Enable network traffic compression" cluster_id: type: integer + default: 2 description: "Cluser ID - must be same for client and for master" + server_id: + type: integer + default: 0 + minimun: 0 + maximum: 999 + description: "Node identifier. Should be unique for each node in the replicated cluster (non-unique IDs are also allowed, but may lead to the inconsistency in some cases" force_sync_on_logic_error: type: boolean description: "force resync on logic error conditions" diff --git a/cpp_src/server/httpserver.cc b/cpp_src/server/httpserver.cc index 65bd6dc2f..27e2adc13 100644 --- a/cpp_src/server/httpserver.cc +++ b/cpp_src/server/httpserver.cc @@ -226,11 +226,9 @@ int HTTPServer::GetDatabases(http::Context &ctx) { } if (sortDirection) { - std::sort(dbs.begin(), dbs.end(), [sortDirection](const std::string &lhs, const std::string &rhs) { - if (sortDirection > 0) - return collateCompare(lhs, rhs, SortingPrioritiesTable()) < 0; - else - return collateCompare(lhs, rhs, SortingPrioritiesTable()) > 0; + boost::sort::pdqsort(dbs.begin(), dbs.end(), [sortDirection](const std::string &lhs, const std::string &rhs) { + return (sortDirection > 0) ? (collateCompare(lhs, rhs, SortingPrioritiesTable()) < 0) + : (collateCompare(lhs, rhs, SortingPrioritiesTable()) > 0); }); } @@ -320,11 +318,9 @@ int HTTPServer::GetNamespaces(http::Context &ctx) { } if (sortDirection) { - std::sort(nsDefs.begin(), nsDefs.end(), [sortDirection](const NamespaceDef &lhs, const NamespaceDef &rhs) { - if (sortDirection > 0) - return collateCompare(lhs.name, rhs.name, SortingPrioritiesTable()) < 0; - else - return collateCompare(lhs.name, rhs.name, SortingPrioritiesTable()) > 0; + boost::sort::pdqsort(nsDefs.begin(), nsDefs.end(), [sortDirection](const NamespaceDef &lhs, const NamespaceDef &rhs) { + return (sortDirection > 0) ? (collateCompare(lhs.name, rhs.name, SortingPrioritiesTable()) < 0) + : (collateCompare(lhs.name, rhs.name, SortingPrioritiesTable()) > 0); }); } @@ -543,9 +539,9 @@ int HTTPServer::GetMetaList(http::Context &ctx) { return jsonStatus(ctx, http::HttpStatus(err)); } if (sortDirection == Asc) { - std::sort(keys.begin(), keys.end()); + boost::sort::pdqsort(keys.begin(), keys.end()); } else if (sortDirection == Desc) { - std::sort(keys.begin(), keys.end(), std::greater()); + boost::sort::pdqsort(keys.begin(), keys.end(), std::greater()); } auto keysIt = keys.begin(); auto keysEnd = keys.end(); diff --git a/cpp_src/tools/fsops.cc b/cpp_src/tools/fsops.cc index e7d8db748..5fbfcae71 100644 --- a/cpp_src/tools/fsops.cc +++ b/cpp_src/tools/fsops.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include "errors.h" #include "tools/oscompat.h" @@ -177,7 +178,16 @@ std::string GetCwd() { return std::string(getcwd(buff, FILENAME_MAX)); } +static std::string tmpDir; +static std::mutex tmpDirMtx; + std::string GetTempDir() { + { + std::lock_guard lck(tmpDirMtx); + if (!tmpDir.empty()) { + return tmpDir; + } + } #ifdef _WIN32 char tmpBuf[512]; *tmpBuf = 0; @@ -190,6 +200,11 @@ std::string GetTempDir() { #endif } +void SetTempDir(std::string &&dir) noexcept { + std::lock_guard lck(tmpDirMtx); + tmpDir = std::move(dir); +} + std::string GetHomeDir() { const char *homeDir = getenv("HOME"); if (homeDir && *homeDir) return homeDir; @@ -355,5 +370,6 @@ std::string GetRelativePath(const std::string &path, unsigned maxUp) { rpath.append(path.begin() + same, path.end()); return rpath; } + } // namespace fs } // namespace reindexer diff --git a/cpp_src/tools/fsops.h b/cpp_src/tools/fsops.h index 46d56db4c..015ef0a66 100644 --- a/cpp_src/tools/fsops.h +++ b/cpp_src/tools/fsops.h @@ -37,6 +37,7 @@ TimeStats StatTime(const std::string &path); std::string GetCwd(); std::string GetDirPath(const std::string &path); std::string GetTempDir(); +void SetTempDir(std::string &&dir) noexcept; std::string GetHomeDir(); std::string GetRelativePath(const std::string &path, unsigned maxUp = 1024); inline int Rename(const std::string &from, const std::string &to) { return rename(from.c_str(), to.c_str()); } diff --git a/cpp_src/tools/jsonstring.h b/cpp_src/tools/jsonstring.h index 34c6eeb56..36601af0a 100644 --- a/cpp_src/tools/jsonstring.h +++ b/cpp_src/tools/jsonstring.h @@ -71,12 +71,15 @@ inline void encode(uint8_t *p, uint64_t l, std::vector> p[-3] = (uptr >> 8) & 0xFF; p[-4] = (uptr >> 16) & 0xFF; p[-5] = (uptr >> 24) & 0xFF; - if constexpr (sizeof(uintptr_t) == 8) { - p[-6] = (uptr >> 32) & 0xFF; - p[-7] = (uptr >> 40) & 0xFF; - p[-8] = (uptr >> 48) & 0xFF; - p[-9] = (uptr >> 56) & 0xFF; - } +#if UINTPTR_MAX == 0xFFFFFFFF +#elif UINTPTR_MAX == 0xFFFFFFFFFFFFFFFF + p[-6] = (uptr >> 32) & 0xFF; + p[-7] = (uptr >> 40) & 0xFF; + p[-8] = (uptr >> 48) & 0xFF; + p[-9] = (uptr >> 56) & 0xFF; +#else + static_assert(false, "Unexpected uintptr_t size"); +#endif } else { // Put length p[0] = l & 0xFF; diff --git a/cpp_src/vendor/gason/gason.h b/cpp_src/vendor/gason/gason.h index 6e227f7a8..76c218d37 100644 --- a/cpp_src/vendor/gason/gason.h +++ b/cpp_src/vendor/gason/gason.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "estl/span.h" #include "tools/jsonstring.h" diff --git a/describer.go b/describer.go index ed494bed8..b356b3cba 100644 --- a/describer.go +++ b/describer.go @@ -85,7 +85,7 @@ type CacheMemStat struct { type LsnT struct { // Operation counter Counter int64 `json:"counter"` - // Node identifyer + // Node identifier ServerId int `json:"server_id"` } diff --git a/fulltext.md b/fulltext.md index e741225a2..672ab7e52 100644 --- a/fulltext.md +++ b/fulltext.md @@ -378,10 +378,10 @@ FtTyposDetailedConfig: config for more precise typos algorithm tuning. | | Parameter name | Type | Description | Default value | |---|:----------------------------:|:--------:|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|:-------------:| -| | MaxTypoDistance | int | Maximum distance between symbols in initial and target words to perform substitution. Check [typos handling](#typos-handling-details) section for detailed description. | 0 | -| | MaxSymbolPermutationDistance | int | aximum distance between same symbols in initial and target words to perform substitution (to handle cases, when two symbolws were switched with each other). Check [typos handling](#typos-handling-details) section for detailed description. | 1 | -| | MaxMissingLetters | int | Maximum number of symbols, which may be removed from the initial term to transform it into the result word. Check [typos handling](#typos-handling-details) section for detailed description. | 2 | -| | MaxExtraLetters | int | Maximum number of symbols, which may be added to the initial term to transform it into the result word. Check [typos handling](#typos-handling-details) section for detailed description. | 2 | +| | MaxTypoDistance | int | Maximum distance between symbols in initial and target words to perform substitution. Check [typos handling](#typos-handling-details) section for detailed description. | 0 | +| | MaxSymbolPermutationDistance | int | aximum distance between same symbols in initial and target words to perform substitution (to handle cases, when two symbolws were switched with each other). Check [typos handling](#typos-handling-details) section for detailed description. | 1 | +| | MaxMissingLetters | int | Maximum number of symbols, which may be removed from the initial term to transform it into the result word. Check [typos handling](#typos-handling-details) section for detailed description. | 2 | +| | MaxExtraLetters | int | Maximum number of symbols, which may be added to the initial term to transform it into the result word. Check [typos handling](#typos-handling-details) section for detailed description. | 2 | ### Base ranking config diff --git a/test/builtinserver_test.go b/test/builtinserver_test.go index 978e23c6d..2b96c59b1 100644 --- a/test/builtinserver_test.go +++ b/test/builtinserver_test.go @@ -19,7 +19,7 @@ func TestBuiltinServer(t *testing.T) { cfg1 := config.DefaultServerConfig() cfg1.Net.HTTPAddr = "0:29088" cfg1.Net.RPCAddr = "0:26534" - cfg1.Storage.Path = "/tmp/rx_builtinserver_test1" + cfg1.Storage.Path = "/tmp/reindex_builtinserver_test1" os.RemoveAll(cfg1.Storage.Path) rx1 := reindexer.NewReindex("builtinserver://xxx", reindexer.WithServerConfig(time.Second*100, cfg1)) @@ -30,7 +30,7 @@ func TestBuiltinServer(t *testing.T) { cfg2 := config.DefaultServerConfig() cfg2.Net.HTTPAddr = "0:29089" cfg2.Net.RPCAddr = "0:26535" - cfg2.Storage.Path = "/tmp/rx_builtinserver_test2" + cfg2.Storage.Path = "/tmp/reindex_builtinserver_test2" os.RemoveAll(cfg2.Storage.Path) rx2 := reindexer.NewReindex("builtinserver://xxx", reindexer.WithServerConfig(time.Second*100, cfg2)) @@ -46,7 +46,7 @@ func TestBuiltinServer(t *testing.T) { cfg4 := config.DefaultServerConfig() cfg4.Net.HTTPAddr = "0:29090" cfg4.Net.RPCAddr = "0:26536" - cfg4.Storage.Path = "/tmp/rx_builtinserver_test4" + cfg4.Storage.Path = "/tmp/reindex_builtinserver_test4" cfg4.Net.UnixRPCAddr = "/tmp/reindexer_builtinserver_test.sock" os.RemoveAll(cfg4.Storage.Path) diff --git a/test/compatibility_test/compatibility_test.sh b/test/compatibility_test/compatibility_test.sh index 9300f0e77..31b893ff3 100755 --- a/test/compatibility_test/compatibility_test.sh +++ b/test/compatibility_test/compatibility_test.sh @@ -39,19 +39,19 @@ test_outdated_instance() { echo "====Master: ${master_cmd}" echo "====Slave: ${slave_cmd}" init_storages - ${master_cmd} --db "${master_db_path}" -l0 --serverlog=\"\" --corelog=\"\" --httplog=\"\" --rpclog=\"\" & + ${master_cmd} --db "${master_db_path}" -l0 --serverlog=\"reindexer_master_$3.1.log\" --corelog=\"reindexer_master_$3.1.log\" --httplog=\"\" --rpclog=\"\" & master_pid=$! sleep 4 go run ${script_dir}/filler.go --dsn "${master_dsn}/${db_name}" --offset 0 echo "====Force sync" - ${slave_cmd} --db "${slave_db_path}" -p 9089 -r 6535 -l0 --serverlog=\"\" --corelog=\"\" --httplog=\"\" --rpclog=\"\" & + ${slave_cmd} --db "${slave_db_path}" -p 9089 -r 6535 -l0 --serverlog=\"reindexer_slave_$3.1.log\" --corelog=\"reindexer_slave_$3.1.log\" --httplog=\"\" --rpclog=\"\" & slave_pid=$! sleep 5 kill $slave_pid wait $slave_pid go run ${script_dir}/filler.go --dsn "${master_dsn}/${db_name}" --offset 100 echo "====Sync by WAL" - ${slave_cmd} --db "${slave_db_path}" -p 9089 -r 6535 -l0 --serverlog=\"\" --corelog=\"\" --httplog=\"\" --rpclog=\"\" & + ${slave_cmd} --db "${slave_db_path}" -p 9089 -r 6535 -l0 --serverlog=\"reindexer_slave_$3.2.log\" --corelog=\"reindexer_slave_$3.2.log\" --httplog=\"\" --rpclog=\"\" & slave_pid=$! sleep 5 echo "====Online sync" @@ -71,8 +71,8 @@ echo "====Installing reindexer package====" echo "====URL: ${rpm_url}" yum install -y ${rpm_url} > /dev/null || true echo "====Checking outdated slave====" -test_outdated_instance "build/cpp_src/cmd/reindexer_server/reindexer_server" "reindexer_server" +test_outdated_instance "build/cpp_src/cmd/reindexer_server/reindexer_server" "reindexer_server" "1" echo "====Checking outdated master====" -test_outdated_instance "reindexer_server" "build/cpp_src/cmd/reindexer_server/reindexer_server" +test_outdated_instance "reindexer_server" "build/cpp_src/cmd/reindexer_server/reindexer_server" "2" clear_artifacts diff --git a/test/fields_autogen_test.go b/test/fields_autogen_test.go index 0813cb0af..2f25b6176 100644 --- a/test/fields_autogen_test.go +++ b/test/fields_autogen_test.go @@ -26,6 +26,9 @@ func init() { } func TestAutogen(t *testing.T) { + + currentSerial := 0 + t.Run("field should be updated with current timestamp using NOW() function", func(t *testing.T) { precepts := []string{"updated_time=NOW()"} item := TestItemAutogen{} @@ -51,9 +54,10 @@ func TestAutogen(t *testing.T) { item := TestItemAutogen{} err := DB.Upsert(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 - assert.Equal(t, 1, item.Age) - assert.Equal(t, int64(1), item.Genre) + assert.Equal(t, currentSerial, item.Age) + assert.Equal(t, int64(currentSerial), item.Genre) t.Run("serial field should be increased by 5 after 5 iterations (must be equal 6 after previous test)", func(t *testing.T) { precepts := []string{"genre=SERIAL()", "age=serial()"} @@ -61,70 +65,84 @@ func TestAutogen(t *testing.T) { for i := 0; i < 5; i++ { err := DB.Upsert(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 } - assert.Equal(t, 6, item.Age) - assert.Equal(t, int64(6), item.Genre) + assert.Equal(t, currentSerial, item.Age) + assert.Equal(t, int64(currentSerial), item.Genre) }) }) t.Run("fill on insert, update, upsert", func(t *testing.T) { - precepts := []string{"updated_time=NOW()"} + precepts := []string{"updated_time=NOW()", "age=SERIAL()"} item := TestItemAutogen{ID: rand.Intn(100000000)} _, err := DB.Insert(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 assert.GreaterOrEqual(t, item.UpdatedTime, time.Now().Unix()-1) assert.LessOrEqual(t, item.UpdatedTime, time.Now().Unix()) + assert.Equal(t, currentSerial, item.Age) item = TestItemAutogen{} err = DB.Upsert(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 assert.GreaterOrEqual(t, item.UpdatedTime, time.Now().Unix()-1) assert.LessOrEqual(t, item.UpdatedTime, time.Now().Unix()) + assert.Equal(t, currentSerial, item.Age) item = TestItemAutogen{} _, err = DB.Update(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 assert.GreaterOrEqual(t, item.UpdatedTime, time.Now().Unix()-1) assert.LessOrEqual(t, item.UpdatedTime, time.Now().Unix()) + assert.Equal(t, currentSerial, item.Age) }) - t.Run("fill on upsert not exist item", func(t *testing.T) { - precepts := []string{"updated_time=NOW()"} + t.Run("fill on upsert nonexist item", func(t *testing.T) { + precepts := []string{"updated_time=NOW()", "age=SERIAL()"} item := TestItemAutogen{ID: rand.Intn(100000000)} err := DB.Upsert(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 assert.GreaterOrEqual(t, item.UpdatedTime, time.Now().Unix()-1) assert.LessOrEqual(t, item.UpdatedTime, time.Now().Unix()) + assert.Equal(t, currentSerial, item.Age) }) - t.Run("not fill on update not exist item", func(t *testing.T) { - precepts := []string{"updated_time=NOW()"} + t.Run("doesn't fill on update nonexist item", func(t *testing.T) { + precepts := []string{"updated_time=NOW()", "age=SERIAL()"} item := TestItemAutogen{ID: rand.Intn(100000000)} count, err := DB.Update(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 // remove after 1602 assert.Equal(t, 0, count) assert.Equal(t, int64(0), item.UpdatedTime) + assert.Equal(t, 0, item.Age) }) - t.Run("not fill on insert exist item", func(t *testing.T) { - precepts := []string{"updated_time=NOW()"} + t.Run("doesn't fill on insert exist item", func(t *testing.T) { + precepts := []string{"updated_time=NOW()", "age=SERIAL()"} id := rand.Intn(100000000) item := TestItemAutogen{ID: id} count, err := DB.Insert(ns, &item, precepts...) require.NoError(t, err) + currentSerial += 1 assert.Equal(t, 1, count) assert.GreaterOrEqual(t, item.UpdatedTime, time.Now().Unix()-1) assert.LessOrEqual(t, item.UpdatedTime, time.Now().Unix()) + assert.Equal(t, currentSerial, item.Age) item = TestItemAutogen{ID: id} count, err = DB.Insert(ns, &item, precepts...) require.NoError(t, err) assert.Equal(t, 0, count) assert.Equal(t, int64(0), item.UpdatedTime) + assert.Equal(t, 0, item.Age) }) } diff --git a/test/slave_empty_storage_test.go b/test/slave_empty_storage_test.go index 3c05a0bad..7f4d889ef 100644 --- a/test/slave_empty_storage_test.go +++ b/test/slave_empty_storage_test.go @@ -21,7 +21,7 @@ func TestSlaveEmptyStorage(t *testing.T) { cfgMaster := config.DefaultServerConfig() cfgMaster.Net.HTTPAddr = "0:29088" cfgMaster.Net.RPCAddr = "0:26534" - cfgMaster.Storage.Path = "/tmp/rx_master1" + cfgMaster.Storage.Path = "/tmp/reindex_master1" os.RemoveAll(cfgMaster.Storage.Path) rxMaster := reindexer.NewReindex("builtinserver://xxx", reindexer.WithServerConfig(time.Second*100, cfgMaster)) { @@ -45,7 +45,7 @@ namespaces: []` cfgSlave := config.DefaultServerConfig() cfgSlave.Net.HTTPAddr = "0:29089" cfgSlave.Net.RPCAddr = "0:26535" - cfgSlave.Storage.Path = "/tmp/rx_slave2" + cfgSlave.Storage.Path = "/tmp/reindex_slave2" os.RemoveAll(cfgSlave.Storage.Path) rxSlave := reindexer.NewReindex("builtinserver://xxx", reindexer.WithServerConfig(time.Second*100, cfgSlave)) { diff --git a/test/uuid_test.go b/test/uuid_test.go index 699cbab73..4316030e3 100644 --- a/test/uuid_test.go +++ b/test/uuid_test.go @@ -415,17 +415,17 @@ func TestUuidClientBuiltinserver(t *testing.T) { ns := "test_uuid_builtinserver_connect" t.Run("test add uuid index on non-indexed string", func(t *testing.T) { - path1 := "/tmp/rx_uuid11" + path1 := "/tmp/reindex_uuid11" rx1 := configureAndStartServer("0:29188", "0:26634", path1) defer rx1.Close() { - f, err := os.OpenFile("/tmp/rx_uuid11"+"/uudb/replication.conf", os.O_RDWR|os.O_CREATE, 0644) + f, err := os.OpenFile("/tmp/reindex_uuid11"+"/uudb/replication.conf", os.O_RDWR|os.O_CREATE, 0644) assert.NoError(t, err) _, err = f.Write([]byte(masterConfig)) assert.NoError(t, err) } - path2 := "/tmp/rx_uuid12" + path2 := "/tmp/reindex_uuid12" rx2 := configureAndStartServer("0:29189", "0:26635", path2) defer rx2.Close() { @@ -486,7 +486,7 @@ func TestUuidClientBuiltinserver(t *testing.T) { }) t.Run("test update index from string to uuid", func(t *testing.T) { - path1 := "/tmp/rx_uuid11" + path1 := "/tmp/reindex_uuid11" rx1 := configureAndStartServer("0:29188", "0:26634", path1) defer rx1.Close() { @@ -496,7 +496,7 @@ func TestUuidClientBuiltinserver(t *testing.T) { assert.NoError(t, err) } - path2 := "/tmp/rx_uuid12" + path2 := "/tmp/reindex_uuid12" rx2 := configureAndStartServer("0:29189", "0:26635", path2) defer rx2.Close() {