Skip to content

Commit

Permalink
[dbg] Print update type in the online replication
Browse files Browse the repository at this point in the history
  • Loading branch information
reindexer-bot committed Oct 14, 2023
1 parent b0303b6 commit e35e757
Show file tree
Hide file tree
Showing 252 changed files with 6,911 additions and 3,858 deletions.
21 changes: 16 additions & 5 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ type Logger interface {
Printf(level int, fmt string, msg ...interface{})
}

var logger Logger
// Separate mutexes for logger object itself and for reindexer_enable_logger call:
// logMtx provides safe access to the logger
// logEnableMtx provides atomic logic for (enable + set) and (disable + reset) procedures
var logMtx sync.RWMutex
var logEnableMtx sync.Mutex
var logger Logger

var enableDebug bool

var bufPool sync.Pool
Expand Down Expand Up @@ -600,18 +605,24 @@ func CGoLogger(level int, msg string) {
}
}

func (binding *Builtin) EnableLogger(log bindings.Logger) {
func (binding *Builtin) setLogger(log bindings.Logger) {
logMtx.Lock()
defer logMtx.Unlock()
logger = log
}

func (binding *Builtin) EnableLogger(log bindings.Logger) {
logEnableMtx.Lock()
defer logEnableMtx.Unlock()
binding.setLogger(log)
C.reindexer_enable_go_logger()
}

func (binding *Builtin) DisableLogger() {
logMtx.Lock()
defer logMtx.Unlock()
logEnableMtx.Lock()
defer logEnableMtx.Unlock()
C.reindexer_disable_go_logger()
logger = nil
binding.setLogger(nil)
}

func (binding *Builtin) ReopenLogFiles() error {
Expand Down
25 changes: 18 additions & 7 deletions bindings/builtinserver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ type StorageConf struct {
Autorepair bool `yaml:"autorepair"`
}

const ServerThreadingDedicated = "dedicated"
const ServerThreadingShared = "shared"

type NetConf struct {
HTTPAddr string `yaml:"httpaddr"`
RPCAddr string `yaml:"rpcaddr"`
WebRoot string `yaml:"webroot"`
Security bool `yaml:"security"`
HTTPAddr string `yaml:"httpaddr"`
HTTPThreading string `yaml:"http_threading"` // "dedicated" or "shared"
RPCAddr string `yaml:"rpcaddr"`
RPCThreading string `yaml:"rpc_threading"` // "dedicated" or "shared"
UnixRPCAddr string `yaml:"urpcaddr"`
UnixRPCThreading string `yaml:"urpc_threading"` // "dedicated" or "shared"
WebRoot string `yaml:"webroot"`
Security bool `yaml:"security"`
HttpReadTimeoutSec int `yaml:"http_read_timeout,omitempty"`
HttpWriteTimeoutSec int `yaml:"http_write_timeout,omitempty"`
}

type LoggerConf struct {
Expand Down Expand Up @@ -69,9 +78,11 @@ func DefaultServerConfig() *ServerConfig {
Autorepair: false,
},
Net: NetConf{
HTTPAddr: "0.0.0.0:9088",
RPCAddr: "0.0.0.0:6534",
Security: false,
HTTPAddr: "0.0.0.0:9088",
HTTPThreading: "shared",
RPCAddr: "0.0.0.0:6534",
RPCThreading: "shared",
Security: false,
},
Logger: LoggerConf{
ServerLog: "stdout",
Expand Down
16 changes: 12 additions & 4 deletions bindings/cproto/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,20 @@ func (c *connection) deadlineTicker() {
}

func (c *connection) connect(ctx context.Context) (err error) {
dsn := c.owner.getActiveDSN()
var d net.Dialer
c.conn, err = d.DialContext(ctx, "tcp", c.owner.getActiveDSN().Host)
if err != nil {
return err
if dsn.Scheme == "cproto" {
if c.conn, err = d.DialContext(ctx, "tcp", dsn.Host); err != nil {
return err
}
c.conn.(*net.TCPConn).SetNoDelay(true)
} else {
d.LocalAddr = nil
if c.conn, err = d.DialContext(ctx, "unix", dsn.Host); err != nil {
return err
}
}
c.conn.(*net.TCPConn).SetNoDelay(true)

c.rdBuf = bufio.NewReaderSize(c.conn, bufsCap)

go c.writeLoop()
Expand Down
15 changes: 15 additions & 0 deletions bindings/cproto/cproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math"
"net"
"net/url"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -32,6 +34,9 @@ var logMtx sync.RWMutex

func init() {
bindings.RegisterBinding("cproto", new(NetCProto))
if runtime.GOOS != "windows" {
bindings.RegisterBinding("ucproto", new(NetCProto))
}
}

type Logger interface {
Expand Down Expand Up @@ -122,6 +127,16 @@ func (binding *NetCProto) Init(u []url.URL, options ...interface{}) (err error)
}

binding.dsn.url = u
for i := 0; i < len(binding.dsn.url); i++ {
if binding.dsn.url[i].Scheme == "ucproto" {
addrs := strings.Split(binding.dsn.url[i].Path, ":")
if len(addrs) != 2 {
return fmt.Errorf("rq: unexpected URL format for ucproto: '%s'. Expecting '<unix socket>:/<db name>", binding.dsn.url[i].Path)
}
binding.dsn.url[i].Host = addrs[0]
binding.dsn.url[i].Path = addrs[1]
}
}
binding.connectDSN(context.Background(), connPoolSize, connPoolLBAlgorithm)
binding.termCh = make(chan struct{})
go binding.pinger()
Expand Down
4 changes: 2 additions & 2 deletions bindings/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type TxCtx struct {
UserCtx context.Context
}

// FetchMore interface for partial loading results (used in cproto)
// FetchMore interface for partial loading results (used in cproto/ucproto)
type FetchMore interface {
Fetch(ctx context.Context, offset, limit int, asJson bool) (err error)
}
Expand Down Expand Up @@ -248,7 +248,7 @@ const (
LBPowerOfTwoChoices
)

// OptionConnPoolLoadBalancing sets algorithm, which will be used to choose connection for cproto requests' balancing
// OptionConnPoolLoadBalancing sets algorithm, which will be used to choose connection for cproto/ucproto requests' balancing
type OptionConnPoolLoadBalancing struct {
Algorithm LoadBalancingAlgorithm
}
Expand Down
2 changes: 0 additions & 2 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -1905,5 +1905,3 @@
- [ref] EnableStorage method was deprecated
- [fix] Query builder did not reset opOR after InnerJoin

## Misc

2 changes: 1 addition & 1 deletion cjson/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (dec *Decoder) Decode(cjson []byte, dest interface{}) (err error) {
}
}()

fieldsoutcnt := make([]int, 64, 64)
fieldsoutcnt := make([]int, MaxIndexes)
ctagsPath := make([]int, 0, 8)

dec.decodeValue(nil, ser, reflect.ValueOf(dest), fieldsoutcnt, ctagsPath)
Expand Down
12 changes: 10 additions & 2 deletions cpp_src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ endif ()

set (EXTRA_FLAGS "")

if (WITH_ASAN AND WITH_TSAN)
message(FATAL_ERROR "You cannot use the ASAN and TSAN options at the same time, CMake will exit.")
endif()

if (WITH_ASAN)
set (EXTRA_FLAGS "-fsanitize=address")
add_definitions(-DREINDEX_WITH_ASAN)
Expand Down Expand Up @@ -272,6 +276,9 @@ else()
list(APPEND SRCS ${KOISHI_PATH}/fcontext/fcontext.c ${KOISHI_PATH}/fcontext/fcontext.hpp)
endif()

# Static LevelDB v1.23 is built with -fno-rtti by default. To inherit our logger from leveldb's logger, this file must be built with -fno-rtti to
set_source_files_properties(${REINDEXER_SOURCE_PATH}/core/storage/leveldblogger.cc PROPERTIES COMPILE_FLAGS "-fno-rtti")

list(APPEND REINDEXER_LIBRARIES reindexer)
add_library(${TARGET} STATIC ${HDRS} ${SRCS} ${VENDORS})
add_definitions(-DREINDEX_CORE_BUILD=1)
Expand Down Expand Up @@ -695,15 +702,16 @@ if (NOT WIN32)
SET(CMAKE_INSTALL_DEFAULT_COMPONENT_NAME "server")
SET(DIST_INCLUDE_FILES
"tools/errors.h" "tools/serializer.h" "tools/varint.h" "tools/stringstools.h" "tools/customhash.h" "tools/assertrx.h" "tools/jsonstring.h"
"tools/verifying_updater.h"
"core/reindexer.h" "core/type_consts.h" "core/item.h" "core/payload/payloadvalue.h" "core/payload/payloadiface.h" "core/indexopts.h"
"core/namespacedef.h" "core/keyvalue/variant.h" "core/keyvalue/geometry.h" "core/sortingprioritiestable.h"
"core/rdxcontext.h" "core/activity_context.h" "core/type_consts_helpers.h" "core/payload/fieldsset.h" "core/payload/payloadtype.h"
"core/cbinding/reindexer_c.h" "core/cbinding/reindexer_ctypes.h" "core/transaction.h" "core/payload/payloadfieldtype.h" "core/reindexerconfig.h"
"core/query/query.h" "core/query/queryentry.h" "core/queryresults/queryresults.h" "core/indexdef.h" "core/queryresults/aggregationresult.h"
"core/queryresults/itemref.h" "core/namespace/stringsholder.h" "core/keyvalue/key_string.h" "core/key_value_type.h" "core/keyvalue/uuid.h"
"core/expressiontree.h" "core/lsn.h" "core/cjson/tagspath.h" "core/cjson/ctag.h"
"estl/cow.h" "estl/overloaded.h" "estl/one_of.h" "estl/h_vector.h" "estl/mutex.h" "estl/intrusive_ptr.h" "estl/trivial_reverse_iterator.h"
"estl/span.h" "estl/chunk.h" "estl/fast_hash_traits.h" "estl/debug_macros.h" "estl/defines.h"
"estl/cow.h" "estl/overloaded.h" "estl/one_of.h" "estl/h_vector.h" "estl/mutex.h" "estl/intrusive_ptr.h" "estl/trivial_reverse_iterator.h"
"estl/span.h" "estl/chunk.h" "estl/fast_hash_traits.h" "estl/debug_macros.h" "estl/defines.h"
"client/reindexer.h" "client/item.h" "client/reindexerconfig.h" "client/queryresults.h" "client/resultserializer.h"
"client/internalrdxcontext.h" "client/transaction.h"
"client/cororeindexer.h" "client/coroqueryresults.h" "client/corotransaction.h"
Expand Down
2 changes: 1 addition & 1 deletion cpp_src/client/cororeindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ CoroReindexer& CoroReindexer::operator=(CoroReindexer&& rdx) noexcept {
Error CoroReindexer::Connect(const std::string& dsn, dynamic_loop& loop, const client::ConnectOpts& opts) {
return impl_->Connect(dsn, loop, opts);
}
Error CoroReindexer::Stop() { return impl_->Stop(); }
void CoroReindexer::Stop() { impl_->Stop(); }
Error CoroReindexer::AddNamespace(const NamespaceDef& nsDef) { return impl_->AddNamespace(nsDef, ctx_); }
Error CoroReindexer::OpenNamespace(std::string_view nsName, const StorageOpts& storage) {
return impl_->OpenNamespace(nsName, ctx_, storage);
Expand Down
5 changes: 3 additions & 2 deletions cpp_src/client/cororeindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ class CoroReindexer {
CoroReindexer &operator=(CoroReindexer &&) noexcept;

/// Connect - connect to reindexer server
/// @param dsn - uri of server and database, like: `cproto://user@password:127.0.0.1:6534/dbname`
/// @param dsn - uri of server and database, like: `cproto://user@password:127.0.0.1:6534/dbname` or
/// `ucproto://user@password:/tmp/reindexer.sock:/dbname`
/// @param loop - event loop for connections and coroutines handling
/// @param opts - Connect options. May contaion any of <br>
Error Connect(const std::string &dsn, dynamic_loop &loop, const client::ConnectOpts &opts = client::ConnectOpts());
/// Stop - shutdown connector
Error Stop();
void Stop();
/// Open or create namespace
/// @param nsName - Name of namespace
/// @param opts - Storage options. Can be one of <br>
Expand Down
18 changes: 12 additions & 6 deletions cpp_src/client/cororpcclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ CoroRPCClient::CoroRPCClient(const ReindexerConfig& config) : config_(config) {
CoroRPCClient::~CoroRPCClient() { Stop(); }

Error CoroRPCClient::Connect(const std::string& dsn, ev::dynamic_loop& loop, const client::ConnectOpts& opts) {
using namespace std::string_view_literals;
if (conn_.IsRunning()) {
return Error(errLogic, "Client is already started");
}
Expand All @@ -34,9 +35,15 @@ Error CoroRPCClient::Connect(const std::string& dsn, ev::dynamic_loop& loop, con
if (!connectData.uri.parse(dsn)) {
return Error(errParams, "%s is not valid uri", dsn);
}
if (connectData.uri.scheme() != "cproto") {
#ifdef _WIN32
if (connectData.uri.scheme() != "cproto"sv) {
return Error(errParams, "Scheme must be cproto");
}
#else
if (connectData.uri.scheme() != "cproto"sv && connectData.uri.scheme() != "ucproto"sv) {
return Error(errParams, "Scheme must be either cproto or ucproto");
}
#endif
connectData.opts = cproto::CoroClientConnection::Options(
config_.ConnectTimeout, config_.RequestTimeout, opts.IsCreateDBIfMissing(), opts.HasExpectedClusterID(), opts.ExpectedClusterID(),
config_.ReconnectAttempts, config_.EnableCompression, config_.RequestDedicatedThread, config_.AppName);
Expand All @@ -46,13 +53,12 @@ Error CoroRPCClient::Connect(const std::string& dsn, ev::dynamic_loop& loop, con
return errOK;
}

Error CoroRPCClient::Stop() {
void CoroRPCClient::Stop() {
terminate_ = true;
conn_.Stop();
resubWg_.wait();
loop_ = nullptr;
terminate_ = false;
return errOK;
}

Error CoroRPCClient::AddNamespace(const NamespaceDef& nsDef, const InternalRdxContext& ctx) {
Expand Down Expand Up @@ -236,7 +242,7 @@ Error CoroRPCClient::Delete(const Query& query, CoroQueryResults& result, const
query.Serialize(ser);

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q._namespace)); });
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = CoroQueryResults(&conn_, std::move(nsArray), 0, config_.FetchAmount, config_.RequestTimeout);

Expand All @@ -257,7 +263,7 @@ Error CoroRPCClient::Update(const Query& query, CoroQueryResults& result, const
query.Serialize(ser);

NsArray nsArray;
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q._namespace)); });
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });

result = CoroQueryResults(&conn_, std::move(nsArray), 0, config_.FetchAmount, config_.RequestTimeout);

Expand Down Expand Up @@ -322,7 +328,7 @@ Error CoroRPCClient::selectImpl(const Query& query, CoroQueryResults& result, se
}
NsArray nsArray;
query.Serialize(qser);
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q._namespace)); });
query.WalkNested(true, true, [this, &nsArray](const Query& q) { nsArray.push_back(getNamespace(q.NsName())); });
h_vector<int32_t, 4> vers;
for (auto& ns : nsArray) {
vers.push_back(ns->tagsMatcher_.version() ^ ns->tagsMatcher_.stateToken());
Expand Down
2 changes: 1 addition & 1 deletion cpp_src/client/cororpcclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CoroRPCClient {
~CoroRPCClient();

Error Connect(const std::string &dsn, ev::dynamic_loop &loop, const client::ConnectOpts &opts);
Error Stop();
void Stop();

Error OpenNamespace(std::string_view nsName, const InternalRdxContext &ctx,
const StorageOpts &opts = StorageOpts().Enabled().CreateIfMissing());
Expand Down
8 changes: 4 additions & 4 deletions cpp_src/client/item.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ class Item {
/// Get status of item
/// @return data slice with JSON. Returned slice is allocated in temporary Item's buffer, and can be invalidated by any next operation
/// with Item
Error Status() { return status_; }
Error Status() const noexcept { return status_; }
/// Get internal ID of item
/// @return ID of item
int GetID() const noexcept { return id_; }
/// Get internal version of item
/// @return version of item
int NumFields();
/// Get count of indexed fields
/// @return count of indexed fields
int NumFields() const noexcept;
/// Set additional percepts for modify operation
/// @param precepts - strings in format "fieldName=Func()"
void SetPrecepts(const std::vector<std::string> &precepts);
Expand Down
2 changes: 1 addition & 1 deletion cpp_src/client/itemimpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void ItemImpl::FromCJSON(std::string_view slice) {
Payload pl = GetPayload();
CJsonDecoder decoder(tagsMatcher_);
ser_.Reset();
decoder.Decode(pl, rdser, ser_);
decoder.Decode<>(pl, rdser, ser_);

if (!rdser.Eof() && rdser.Pos() != tmOffset) {
throw Error(errParseJson, "Internal error - left unparsed data %d", rdser.Pos());
Expand Down
5 changes: 0 additions & 5 deletions cpp_src/client/queryresults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ void QueryResults::fetchNextResults() {
rawResult_.assign(rawResult.begin() + ser.Pos(), rawResult.end());
}

QueryResults::~QueryResults() {}

h_vector<std::string_view, 1> QueryResults::GetNamespaces() const {
h_vector<std::string_view, 1> ret;
ret.reserve(nsArray_.size());
Expand Down Expand Up @@ -310,8 +308,5 @@ QueryResults::Iterator &QueryResults::Iterator::operator++() {
return *this;
}

bool QueryResults::Iterator::operator!=(const Iterator &other) const { return idx_ != other.idx_; }
bool QueryResults::Iterator::operator==(const Iterator &other) const { return idx_ == other.idx_; }

} // namespace client
} // namespace reindexer
Loading

0 comments on commit e35e757

Please sign in to comment.