Skip to content

Commit

Permalink
Update to version v3.20.0
Browse files Browse the repository at this point in the history
  • Loading branch information
IISannikov committed Oct 30, 2023
1 parent b0303b6 commit 8fe18b8
Show file tree
Hide file tree
Showing 304 changed files with 8,567 additions and 4,472 deletions.
12 changes: 6 additions & 6 deletions bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *r

rdSer := newSerializer(out.GetBuf())
rawQueryParams := rdSer.readRawQueryParams(func(nsid int) {
ns.cjsonState.ReadPayloadType(&rdSer.Serializer)
ns.cjsonState.ReadPayloadType(&rdSer.Serializer, db.binding, ns.name)
})

if rawQueryParams.count == 0 {
Expand All @@ -71,7 +71,7 @@ func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *r

if len(precepts) > 0 && (resultp.cptr != 0 || resultp.data != nil) && reflect.TypeOf(item).Kind() == reflect.Ptr {
nsArrEntry := nsArrayEntry{ns, ns.cjsonState.Copy()}
if _, err := unpackItem(&nsArrEntry, &resultp, false, true, item); err != nil {
if _, err := unpackItem(db.binding, &nsArrEntry, &resultp, false, true, item); err != nil {
return 0, err
}
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func (db *reindexerImpl) getNS(namespace string) (*reindexerNamespace, error) {
return ns, nil
}

func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) {
func unpackItem(bin bindings.RawBinding, ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) {
useCache := item == nil && (ns.deepCopyIface || allowUnsafe) && !nonCacheableData
needCopy := ns.deepCopyIface && !allowUnsafe
var err error
Expand All @@ -133,7 +133,7 @@ func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool,
item = citem.item
} else {
item = reflect.New(ns.rtype).Interface()
dec := ns.localCjsonState.NewDecoder(item, logger)
dec := ns.localCjsonState.NewDecoder(item, bin)
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
Expand All @@ -159,7 +159,7 @@ func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool,
if item == nil {
item = reflect.New(ns.rtype).Interface()
}
dec := ns.localCjsonState.NewDecoder(item, logger)
dec := ns.localCjsonState.NewDecoder(item, bin)
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
Expand Down Expand Up @@ -425,7 +425,7 @@ func (db *reindexerImpl) updateQuery(ctx context.Context, q *Query) *Iterator {
ser := newSerializer(result.GetBuf())
// skip total count
rawQueryParams := ser.readRawQueryParams(func(nsid int) {
ns.cjsonState.ReadPayloadType(&ser.Serializer)
ns.cjsonState.ReadPayloadType(&ser.Serializer, db.binding, ns.name)
})

for i := 0; i < rawQueryParams.count; i++ {
Expand Down
32 changes: 27 additions & 5 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ type Logger interface {
Printf(level int, fmt string, msg ...interface{})
}

var logger Logger
// Separate mutexes for logger object itself and for reindexer_enable_logger call:
// logMtx provides safe access to the logger
// logEnableMtx provides atomic logic for (enable + set) and (disable + reset) procedures
// This logger is global to easily export it into CGO (however it may lead to some confusion if there are multiple builtin instances in the app)
var logMtx sync.RWMutex
var logEnableMtx sync.Mutex
var logger Logger
var emptyLogger bindings.NullLogger

var enableDebug bool

var bufPool sync.Pool
Expand Down Expand Up @@ -600,18 +607,33 @@ func CGoLogger(level int, msg string) {
}
}

func (binding *Builtin) EnableLogger(log bindings.Logger) {
func (binding *Builtin) setLogger(log bindings.Logger) {
logMtx.Lock()
defer logMtx.Unlock()
logger = log
}

func (binding *Builtin) EnableLogger(log bindings.Logger) {
logEnableMtx.Lock()
defer logEnableMtx.Unlock()
binding.setLogger(log)
C.reindexer_enable_go_logger()
}

func (binding *Builtin) DisableLogger() {
logMtx.Lock()
defer logMtx.Unlock()
logEnableMtx.Lock()
defer logEnableMtx.Unlock()
C.reindexer_disable_go_logger()
logger = nil
binding.setLogger(nil)
}

func (binding *Builtin) GetLogger() bindings.Logger {
logMtx.RLock()
defer logMtx.RUnlock()
if logger != nil {
return logger
}
return &emptyLogger
}

func (binding *Builtin) ReopenLogFiles() error {
Expand Down
4 changes: 4 additions & 0 deletions bindings/builtinserver/builtinserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func (server *BuiltinServer) DisableLogger() {
server.builtin.DisableLogger()
}

func (server *BuiltinServer) GetLogger() bindings.Logger {
return server.builtin.GetLogger()
}

func (server *BuiltinServer) ReopenLogFiles() error {
return err2go(C.reopen_log_files(server.svc))
}
Expand Down
25 changes: 18 additions & 7 deletions bindings/builtinserver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ type StorageConf struct {
Autorepair bool `yaml:"autorepair"`
}

const ServerThreadingDedicated = "dedicated"
const ServerThreadingShared = "shared"

type NetConf struct {
HTTPAddr string `yaml:"httpaddr"`
RPCAddr string `yaml:"rpcaddr"`
WebRoot string `yaml:"webroot"`
Security bool `yaml:"security"`
HTTPAddr string `yaml:"httpaddr"`
HTTPThreading string `yaml:"http_threading"` // "dedicated" or "shared"
RPCAddr string `yaml:"rpcaddr"`
RPCThreading string `yaml:"rpc_threading"` // "dedicated" or "shared"
UnixRPCAddr string `yaml:"urpcaddr"`
UnixRPCThreading string `yaml:"urpc_threading"` // "dedicated" or "shared"
WebRoot string `yaml:"webroot"`
Security bool `yaml:"security"`
HttpReadTimeoutSec int `yaml:"http_read_timeout,omitempty"`
HttpWriteTimeoutSec int `yaml:"http_write_timeout,omitempty"`
}

type LoggerConf struct {
Expand Down Expand Up @@ -69,9 +78,11 @@ func DefaultServerConfig() *ServerConfig {
Autorepair: false,
},
Net: NetConf{
HTTPAddr: "0.0.0.0:9088",
RPCAddr: "0.0.0.0:6534",
Security: false,
HTTPAddr: "0.0.0.0:9088",
HTTPThreading: "shared",
RPCAddr: "0.0.0.0:6534",
RPCThreading: "shared",
Security: false,
},
Logger: LoggerConf{
ServerLog: "stdout",
Expand Down
2 changes: 1 addition & 1 deletion bindings/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package bindings

const CInt32Max = int(^uint32(0) >> 1)

const ReindexerVersion = "v3.19.0"
const ReindexerVersion = "v3.20.0"

// public go consts from type_consts.h and reindexer_ctypes.h
const (
Expand Down
16 changes: 12 additions & 4 deletions bindings/cproto/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,20 @@ func (c *connection) deadlineTicker() {
}

func (c *connection) connect(ctx context.Context) (err error) {
dsn := c.owner.getActiveDSN()
var d net.Dialer
c.conn, err = d.DialContext(ctx, "tcp", c.owner.getActiveDSN().Host)
if err != nil {
return err
if dsn.Scheme == "cproto" {
if c.conn, err = d.DialContext(ctx, "tcp", dsn.Host); err != nil {
return err
}
c.conn.(*net.TCPConn).SetNoDelay(true)
} else {
d.LocalAddr = nil
if c.conn, err = d.DialContext(ctx, "unix", dsn.Host); err != nil {
return err
}
}
c.conn.(*net.TCPConn).SetNoDelay(true)

c.rdBuf = bufio.NewReaderSize(c.conn, bufsCap)

go c.writeLoop()
Expand Down
50 changes: 38 additions & 12 deletions bindings/cproto/cproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math"
"net"
"net/url"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,11 +29,13 @@ const (
opWr = 1
)

var logger Logger
var logMtx sync.RWMutex
var emptyLogger bindings.NullLogger

func init() {
bindings.RegisterBinding("cproto", new(NetCProto))
if runtime.GOOS != "windows" {
bindings.RegisterBinding("ucproto", new(NetCProto))
}
}

type Logger interface {
Expand All @@ -52,6 +56,8 @@ type NetCProto struct {
appName string
termCh chan struct{}
lock sync.RWMutex
logger Logger
logMtx sync.RWMutex
}

type dsn struct {
Expand Down Expand Up @@ -122,6 +128,16 @@ func (binding *NetCProto) Init(u []url.URL, options ...interface{}) (err error)
}

binding.dsn.url = u
for i := 0; i < len(binding.dsn.url); i++ {
if binding.dsn.url[i].Scheme == "ucproto" {
addrs := strings.Split(binding.dsn.url[i].Path, ":")
if len(addrs) != 2 {
return fmt.Errorf("rq: unexpected URL format for ucproto: '%s'. Expecting '<unix socket>:/<db name>", binding.dsn.url[i].Path)
}
binding.dsn.url[i].Host = addrs[0]
binding.dsn.url[i].Path = addrs[1]
}
}
binding.connectDSN(context.Background(), connPoolSize, connPoolLBAlgorithm)
binding.termCh = make(chan struct{})
go binding.pinger()
Expand Down Expand Up @@ -391,16 +407,26 @@ func (binding *NetCProto) OnChangeCallback(f func()) {
}

func (binding *NetCProto) EnableLogger(log bindings.Logger) {
logMtx.Lock()
defer logMtx.Unlock()
logger = log
binding.logMtx.Lock()
defer binding.logMtx.Unlock()
binding.logger = log
}

func (binding *NetCProto) DisableLogger() {
logMtx.Lock()
defer logMtx.Unlock()
logger = nil
binding.logMtx.Lock()
defer binding.logMtx.Unlock()
binding.logger = nil
}

func (binding *NetCProto) GetLogger() bindings.Logger {
binding.logMtx.RLock()
defer binding.logMtx.RUnlock()
if binding.logger != nil {
return binding.logger
}
return &emptyLogger
}

func (binding *NetCProto) ReopenLogFiles() error {
fmt.Println("cproto binding ReopenLogFiles method is dummy")
return nil
Expand Down Expand Up @@ -465,10 +491,10 @@ func (binding *NetCProto) getAllConns() []*connection {
}

func (binding *NetCProto) logMsg(level int, fmt string, msg ...interface{}) {
logMtx.RLock()
defer logMtx.RUnlock()
if logger != nil {
logger.Printf(level, fmt, msg)
binding.logMtx.RLock()
defer binding.logMtx.RUnlock()
if binding.logger != nil {
binding.logger.Printf(level, fmt, msg)
}
}

Expand Down
11 changes: 9 additions & 2 deletions bindings/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type TxCtx struct {
UserCtx context.Context
}

// FetchMore interface for partial loading results (used in cproto)
// FetchMore interface for partial loading results (used in cproto/ucproto)
type FetchMore interface {
Fetch(ctx context.Context, offset, limit int, asJson bool) (err error)
}
Expand All @@ -130,6 +130,12 @@ type Logger interface {
Printf(level int, fmt string, msg ...interface{})
}

type NullLogger struct {
}

func (NullLogger) Printf(level int, fmt string, msg ...interface{}) {
}

func NewError(text string, code int) error {
return Error{text, code}
}
Expand Down Expand Up @@ -196,6 +202,7 @@ type RawBinding interface {
Commit(ctx context.Context, namespace string) error
EnableLogger(logger Logger)
DisableLogger()
GetLogger() Logger
ReopenLogFiles() error
Ping(ctx context.Context) error
Finalize() error
Expand Down Expand Up @@ -248,7 +255,7 @@ const (
LBPowerOfTwoChoices
)

// OptionConnPoolLoadBalancing sets algorithm, which will be used to choose connection for cproto requests' balancing
// OptionConnPoolLoadBalancing sets algorithm, which will be used to choose connection for cproto/ucproto requests' balancing
type OptionConnPoolLoadBalancing struct {
Algorithm LoadBalancingAlgorithm
}
Expand Down
58 changes: 56 additions & 2 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,59 @@
# Version 3.20.0 (30.10.2023)
## Core
- [fea] Added crash query report for Update/Delete queries
- [fea] Added configs for LRU index cache size (check section `cache` in the `namespaces` entity of the `#config`-namespace)
- [fea] Optimized CJSON deserialization
- [fea] Optimized WAL size (unchaged indexes and schemas will not be added into the WAL anymore)
- [fea] Added atomic rollback for multiple field UPDATE-queries in case of errors during query execution (currently it is atomic on the level of the individual document)
- [fix] Fixed column indexes optimization (some of the comparators have become noticeably more effective)
- [fix] Added PK check for UPDATE-queries
- [fix] Fixed JOINs on composite indexes
- [fix] Fixed select functions with MERGE queries for cases when the same namespace is merged multiple times
- [fix] Fixed non-indexed string/int fields convertion on index creation in cases when index type is not equal to the field type
- [fix] Disabled leveldb's log files (in rare cases those logs could lead to the problems with storage reopenning)
- [fix] Disable background indexes optimization for the temporary namespaces
- [fix] Removed attempts to reopen storage with flush errors befors it's destruction
- [fix] Some of the storage flushes were moved outside of the unique namespaces lock
- [fix] Fixed directories deletion for Windows

## Replication
- [fea] Reduced memory consumptio for online-updates
- [fix] Fixed updates size calculation during max allowed updates size check (now it checks actual allocated memory, not the data size)
- [fix] Namespaces config applying after FORCE-sync replication
- [fix] Fixed some rare tagsmatcher conflicts in replicated namespaces in case when namespace was not replicted previously and had some data in it
- [fix] Fixed some unnecessary force syncs after go clients connection
- [fix] Fixed documents duplication after PK change in UPDATE-query
- [fix] Fixed logical race in cascade replication resync

## Reindexer server
- [fea] Added support for UNIX domain sockets for RPC-server. Disabled by default (check `urpcaddr`-flag in server's config)
- [fea] Added support for default timeouts in HTTP-server (support for `http-write-timeout` and `http-read-timeout` options in server's config)
- [fea] Added support for `Request-Timeout` header in HTTP-requests to setup individual timeout for each request

## Reindexer tool
- [fea] Added support for UNIX domain sockets (dsn format: `ucproto://<socket_path>:/<dbname>`, example: `ucproto:///tmp/reindexer.sock:/my_db`)

## Go connector
- [fea] Added support for UNIX domain sockets in cproto-binding (dsn format: `ucproto://<socket_path>:/<dbname>`, example: `ucproto:///tmp/reindexer.sock:/my_db`)
- [fix] Fixed deadlock in `EnableLogger` method betweenn Go and C++ mutexes

## Face
- [fea] Added the ability to use hot keys to navigate over the UI
- [fea] Added the link to the documentation to the left bar menu
- [fea] Changed the column filter with the case sensitive one
- [fea] Added validation of the JSON paths field to the Index Config page
- [fea] Added the wal_size field to the Namespace config
- [fea] Added the preselect_us and field_type sections fto the Explain page
- [fix] Fixed the horizontal scroll for the Grid view
- [fix] Fixed the data sorting on the Items page during the namespace changing
- [fix] Fixed the "undefined items" error during the db changing
- [fix] Fixed the console issues for the ttl index
- [fix] Made the Namespace refactoring
- [fix] Fixed the operations with the last column for the Grid view

## Build
- [fea] Added support for almalinux-9 builds in [dependencies.sh](dependencies.sh)

# Version 3.19.0 (16.09.2023)
## Core
- [fea] Added background namespaces deleteion. Previously in some cases atomic transactions could cause the namespace's deletion in the querie's execution thread, which lead to the latency spike
Expand Down Expand Up @@ -1905,5 +1961,3 @@
- [ref] EnableStorage method was deprecated
- [fix] Query builder did not reset opOR after InnerJoin

## Misc

Loading

0 comments on commit 8fe18b8

Please sign in to comment.