diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 79ecb736d..1c1a23316 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -73,12 +73,6 @@ jobs: workload_build_context4: ../.. workload_build_options4: -f Dockerfile --build-arg SRC_PATH=native/query --build-arg JOB_NAME=workload-native-query - language_id5: 'native-query-with-pool' - workload_path5: 'tests/slo' - language5: 'Native ydb-go-sdk/v3 over query-service with session pool' - workload_build_context5: ../.. - workload_build_options5: -f Dockerfile --build-arg SRC_PATH=native/query/with/pool --build-arg JOB_NAME=workload-native-query-with-pool - - uses: actions/upload-artifact@v4 if: always() with: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 79e81bf1d..6793180db 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -66,6 +66,7 @@ jobs: YDB_LOCAL_SURVIVE_RESTART: true YDB_USE_IN_MEMORY_PDISKS: true YDB_TABLE_ENABLE_PREPARED_DDL: true + YDB_ENABLE_COLUMN_TABLES: true options: '-h localhost' env: OS: ubuntu-latest @@ -119,6 +120,7 @@ jobs: YDB_USE_IN_MEMORY_PDISKS: true YDB_TABLE_ENABLE_PREPARED_DDL: true YDB_FEATURE_FLAGS: enable_topic_service_tx + YDB_ENABLE_COLUMN_TABLES: true options: '-h localhost' env: OS: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index c8106c84d..062288078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ -* Changed `trace.Table` and `trace.Query` traces +* Changed `trace.Table` and `trace.Query` traces +* Implemented `internal/pool` the same as table client pool from `internal/table.Client` ## v3.79.0 * Added commit messages for topic listener diff --git a/internal/pool/defaults.go b/internal/pool/defaults.go index a6701222d..a56470e5f 100644 --- a/internal/pool/defaults.go +++ b/internal/pool/defaults.go @@ -1,3 +1,11 @@ package pool -const DefaultLimit = 50 +import ( + "time" +) + +const ( + DefaultLimit = 50 + defaultCreateTimeout = 5 * time.Second + defaultCloseTimeout = time.Second +) diff --git a/internal/pool/errors.go b/internal/pool/errors.go index 1fc05fc4a..2e1129e52 100644 --- a/internal/pool/errors.go +++ b/internal/pool/errors.go @@ -2,6 +2,11 @@ package pool import ( "errors" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + grpcCodes "google.golang.org/grpc/codes" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) var ( @@ -10,3 +15,25 @@ var ( errPoolIsOverflow = errors.New("pool is overflow") errNoProgress = errors.New("no progress") ) + +func isRetriable(err error) bool { + if err == nil { + panic(err) + } + + switch { + case + xerrors.Is(err, errPoolIsOverflow), + xerrors.IsRetryableError(err), + xerrors.IsOperationError(err, Ydb.StatusIds_OVERLOADED), + xerrors.IsTransportError( + err, + grpcCodes.ResourceExhausted, + grpcCodes.DeadlineExceeded, + grpcCodes.Unavailable, + ): + return true + default: + return false + } +} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 45f82fbb1..7fcb4cf5e 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -2,15 +2,18 @@ package pool import ( "context" + "fmt" "sync" "time" + "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xlist" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/retry" - "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) type ( @@ -21,32 +24,52 @@ type ( } Config[PT Item[T], T any] struct { trace *Trace + clock clockwork.Clock limit int - createItem func(ctx context.Context) (PT, error) createTimeout time.Duration + createItem func(ctx context.Context) (PT, error) closeTimeout time.Duration + closeItem func(ctx context.Context, item PT) + idleThreshold time.Duration + } + itemInfo[PT Item[T], T any] struct { + idle *xlist.Element[PT] + touched time.Time + } + waitChPool[PT Item[T], T any] interface { + GetOrNew() *chan PT + Put(t *chan PT) } Pool[PT Item[T], T any] struct { config Config[PT, T] createItem func(ctx context.Context) (PT, error) closeItem func(ctx context.Context, item PT) - sema chan struct{} - mu xsync.RWMutex - idle []PT + mu xsync.RWMutex + createInProgress int // KIKIMR-9163: in-create-process counter + index map[PT]itemInfo[PT, T] + idle xlist.List[PT] + waitQ xlist.List[*chan PT] + waitChPool waitChPool[PT, T] done chan struct{} } - option[PT Item[T], T any] func(p *Config[PT, T]) + option[PT Item[T], T any] func(c *Config[PT, T]) ) -func WithCreateFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] { +func WithCreateItemFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] { return func(c *Config[PT, T]) { c.createItem = f } } +func withCloseItemFunc[PT Item[T], T any](f func(ctx context.Context, item PT)) option[PT, T] { + return func(c *Config[PT, T]) { + c.closeItem = f + } +} + func WithCreateItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] { return func(c *Config[PT, T]) { c.createTimeout = t @@ -71,15 +94,40 @@ func WithTrace[PT Item[T], T any](t *Trace) option[PT, T] { } } +func WithIdleThreshold[PT Item[T], T any](idleThreshold time.Duration) option[PT, T] { + return func(c *Config[PT, T]) { + c.idleThreshold = idleThreshold + } +} + +func WithClock[PT Item[T], T any](clock clockwork.Clock) option[PT, T] { + return func(c *Config[PT, T]) { + c.clock = clock + } +} + func New[PT Item[T], T any]( ctx context.Context, opts ...option[PT, T], ) *Pool[PT, T] { p := &Pool[PT, T]{ config: Config[PT, T]{ - trace: &Trace{}, - limit: DefaultLimit, - createItem: defaultCreateItem[T, PT], + trace: &Trace{}, + clock: clockwork.NewRealClock(), + limit: DefaultLimit, + createItem: defaultCreateItem[T, PT], + createTimeout: defaultCreateTimeout, + closeTimeout: defaultCloseTimeout, + }, + index: make(map[PT]itemInfo[PT, T]), + idle: xlist.New[PT](), + waitQ: xlist.New[*chan PT](), + waitChPool: &xsync.Pool[chan PT]{ + New: func() *chan PT { + ch := make(chan PT) + + return &ch + }, }, done: make(chan struct{}), } @@ -101,22 +149,12 @@ func New[PT Item[T], T any]( } } - p.createItem = makeCreateItemFunc(p.config, p.done, func(item PT) error { - return xsync.WithLock(&p.mu, func() error { - if len(p.idle) >= p.config.limit { - return xerrors.WithStackTrace(errPoolIsOverflow) - } - - p.appendItemToIdle(item) - - return nil - }) - }) - p.closeItem = makeAsyncCloseItemFunc[PT, T]( - p.config.closeTimeout, p.done, - ) - p.sema = make(chan struct{}, p.config.limit) - p.idle = make([]PT, 0, p.config.limit) + p.createItem = makeAsyncCreateItemFunc(p) + if p.config.closeItem != nil { + p.closeItem = p.config.closeItem + } else { + p.closeItem = makeAsyncCloseItemFunc[PT, T](p) + } return p } @@ -128,30 +166,57 @@ func defaultCreateItem[T any, PT Item[T]](context.Context) (PT, error) { return &item, nil } -// makeCreateItemFunc wraps the createItem function with timeout handling -func makeCreateItemFunc[PT Item[T], T any]( //nolint:funlen - config Config[PT, T], - done <-chan struct{}, - appendToIdle func(item PT) error, +// makeAsyncCreateItemFunc wraps the createItem function with timeout handling +func makeAsyncCreateItemFunc[PT Item[T], T any]( //nolint:funlen + p *Pool[PT, T], ) func(ctx context.Context) (PT, error) { return func(ctx context.Context) (PT, error) { - ch := make(chan struct { - item PT - err error - }) + if !xsync.WithLock(&p.mu, func() bool { + if len(p.index)+p.createInProgress < p.config.limit { + p.createInProgress++ + + return true + } + + return false + }) { + return nil, xerrors.WithStackTrace(errPoolIsOverflow) + } + defer func() { + p.mu.WithLock(func() { + p.createInProgress-- + }) + }() + + var ( + ch = make(chan struct { + item PT + err error + }) + done = make(chan struct{}) + ) + + defer close(done) go func() { defer close(ch) - createCtx, cancelCreate := xcontext.WithDone(xcontext.ValueOnly(ctx), done) + createCtx, cancelCreate := xcontext.WithDone(xcontext.ValueOnly(ctx), p.done) defer cancelCreate() - if d := config.createTimeout; d > 0 { + if d := p.config.createTimeout; d > 0 { createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d) defer cancelCreate() } - newItem, err := config.createItem(createCtx) + newItem, err := p.config.createItem(createCtx) + if newItem != nil { + p.mu.WithLock(func() { + p.index[newItem] = itemInfo[PT, T]{ + touched: p.config.clock.Now(), + } + }) + } select { case ch <- struct { @@ -161,25 +226,23 @@ func makeCreateItemFunc[PT Item[T], T any]( //nolint:funlen item: newItem, err: xerrors.WithStackTrace(err), }: - default: + case <-done: if newItem == nil { return } - if appendToIdleErr := appendToIdle(newItem); appendToIdleErr != nil { - _ = newItem.Close(ctx) - } + _ = p.putItem(createCtx, newItem) } }() select { - case <-done: + case <-p.done: return nil, xerrors.WithStackTrace(errClosedPool) case <-ctx.Done(): return nil, xerrors.WithStackTrace(ctx.Err()) case result, has := <-ch: if !has { - return nil, xerrors.WithStackTrace(errNoProgress) + return nil, xerrors.WithStackTrace(xerrors.Retryable(errNoProgress)) } if result.err != nil { @@ -198,17 +261,10 @@ func makeCreateItemFunc[PT Item[T], T any]( //nolint:funlen func (p *Pool[PT, T]) stats() Stats { return Stats{ Limit: p.config.limit, - Idle: len(p.idle), - Wait: 0, - CreateInProgress: 0, - } -} - -func (p *Pool[PT, T]) onChangeStats() { - if onChange := p.config.trace.OnChange; onChange != nil { - onChange(xsync.WithRLock(&p.mu, func() Stats { - return p.stats() - })) + Index: len(p.index), + Idle: p.idle.Len(), + Wait: p.waitQ.Len(), + CreateInProgress: p.createInProgress, } } @@ -216,112 +272,17 @@ func (p *Pool[PT, T]) Stats() Stats { p.mu.RLock() defer p.mu.RUnlock() - return Stats{ - Limit: p.config.limit, - Idle: len(p.idle), - } -} - -func (p *Pool[PT, T]) getItemFromIdle() (item PT) { - p.mu.Lock() - defer p.mu.Unlock() - - if len(p.idle) == 0 { - return nil - } - - item, p.idle = p.idle[0], p.idle[1:] - go p.onChangeStats() - - return item -} - -func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { - if onGet := p.config.trace.OnGet; onGet != nil { - onDone := onGet(&ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"), - ) - if onDone != nil { - defer func() { - onDone(item, 0, finalErr) - }() - } - } - - item = p.getItemFromIdle() - - if item != nil { - if item.IsAlive() { - return item, nil - } - - p.closeItem(ctx, item) - - return nil, xerrors.WithStackTrace(xerrors.Retryable(errItemIsNotAlive)) - } - - newItem, err := p.createItem(ctx) - if err != nil { - return nil, xerrors.WithStackTrace(xerrors.Retryable(err)) - } - - return newItem, nil -} - -// p.mu must be locked -func (p *Pool[PT, T]) appendItemToIdle(item PT) { - p.idle = append(p.idle, item) - go p.onChangeStats() -} - -func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) { - if onPut := p.config.trace.OnPut; onPut != nil { - onDone := onPut(&ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"), - item, - ) - if onDone != nil { - defer func() { - onDone(finalErr) - }() - } - } - if !item.IsAlive() { - p.closeItem(ctx, item) - - return xerrors.WithStackTrace(errItemIsNotAlive) - } - - p.mu.Lock() - defer p.mu.Unlock() - - select { - case <-p.done: - _ = item.Close(ctx) - - return xerrors.WithStackTrace(errClosedPool) - default: - if len(p.idle) >= p.config.limit { - p.closeItem(ctx, item) - - return xerrors.WithStackTrace(errPoolIsOverflow) - } - - p.appendItemToIdle(item) - - return nil - } + return p.stats() } func makeAsyncCloseItemFunc[PT Item[T], T any]( - closeTimeout time.Duration, - done <-chan struct{}, + p *Pool[PT, T], ) func(ctx context.Context, item PT) { return func(ctx context.Context, item PT) { - closeItemCtx, closeItemCancel := xcontext.WithDone(xcontext.ValueOnly(ctx), done) + closeItemCtx, closeItemCancel := xcontext.WithDone(xcontext.ValueOnly(ctx), p.done) defer closeItemCancel() - if d := closeTimeout; d > 0 { + if d := p.config.closeTimeout; d > 0 { closeItemCtx, closeItemCancel = xcontext.WithTimeout(ctx, d) defer closeItemCancel() } @@ -332,6 +293,12 @@ func makeAsyncCloseItemFunc[PT Item[T], T any]( } } +func (p *Pool[PT, T]) changeState(changeState func() Stats) { + if stats, onChange := changeState(), p.config.trace.OnChange; onChange != nil { + onChange(stats) + } +} + func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) { if onTry := p.config.trace.OnTry; onTry != nil { onDone := onTry(&ctx, @@ -349,12 +316,7 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item return xerrors.WithStackTrace(errClosedPool) case <-ctx.Done(): return xerrors.WithStackTrace(ctx.Err()) - case p.sema <- struct{}{}: - go p.onChangeStats() - defer func() { - <-p.sema - go p.onChangeStats() - }() + default: } item, err := p.getItem(ctx) @@ -384,7 +346,6 @@ func (p *Pool[PT, T]) With( opts ...retry.Option, ) (finalErr error) { var attempts int - if onWith := p.config.trace.OnWith; onWith != nil { onDone := onWith(&ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"), @@ -397,21 +358,16 @@ func (p *Pool[PT, T]) With( } err := retry.Retry(ctx, func(ctx context.Context) error { + attempts++ err := p.try(ctx, f) if err != nil { return xerrors.WithStackTrace(err) } return nil - }, append(opts, retry.WithTrace(&trace.Retry{ - OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { - return func(info trace.RetryLoopDoneInfo) { - attempts = info.Attempts - } - }, - }))...) + }, opts...) if err != nil { - return xerrors.WithStackTrace(err) + return xerrors.WithStackTrace(fmt.Errorf("pool.With failed with %d attempts: %w", attempts, err)) } return nil @@ -429,32 +385,399 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) { } } - close(p.done) + select { + case <-p.done: + return xerrors.WithStackTrace(errClosedPool) - p.mu.Lock() - defer p.mu.Unlock() + default: + close(p.done) - errs := xsync.Set[error]{} + p.mu.Lock() + defer p.mu.Unlock() - var wg sync.WaitGroup - wg.Add(len(p.idle)) + p.changeState(func() Stats { + p.config.limit = 0 - for _, item := range p.idle { - go func(item PT) { - defer wg.Done() + for el := p.waitQ.Front(); el != nil; el = el.Next() { + close(*el.Value) + } - if err := item.Close(ctx); err != nil { - errs.Add(err) + p.waitQ.Clear() + + var wg sync.WaitGroup + wg.Add(p.idle.Len()) + + for el := p.idle.Front(); el != nil; el = el.Next() { + go func(item PT) { + defer wg.Done() + p.closeItem(ctx, item) + }(el.Value) + delete(p.index, el.Value) } - }(item) + + wg.Wait() + + p.idle.Clear() + + return p.stats() + }) + + return nil } - wg.Wait() +} + +// getWaitCh returns pointer to a channel of sessions. +// +// Note that returning a pointer reduces allocations on sync.Pool usage – +// sync.Client.Get() returns empty interface, which leads to allocation for +// non-pointer values. +func (p *Pool[PT, T]) getWaitCh() *chan PT { //nolint:gocritic + return p.waitChPool.GetOrNew() +} - p.idle = nil +// putWaitCh receives pointer to a channel and makes it available for further +// use. +// Note that ch MUST NOT be owned by any goroutine at the call moment and ch +// MUST NOT contain any value. +func (p *Pool[PT, T]) putWaitCh(ch *chan PT) { //nolint:gocritic + p.waitChPool.Put(ch) +} - if errs.Size() > 0 { - return xerrors.WithStackTrace(xerrors.Join(errs.Values()...)) +// p.mu must be held. +func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) { + el := p.idle.Front() + if el == nil { + return + } + item = el.Value + info, has := p.index[item] + if !has || el != info.idle { + panic(fmt.Sprintf("inconsistent index: (%v, %+v, %+v)", has, el, info.idle)) } - return nil + return item, info.touched +} + +// removes first session from idle and resets the keepAliveCount +// to prevent session from dying in the internalPoolGC after it was returned +// to be used only in outgoing functions that make session busy. +// p.mu must be held. +func (p *Pool[PT, T]) removeFirstIdle() PT { + idle, _ := p.peekFirstIdle() + if idle != nil { + info := p.removeIdle(idle) + p.index[idle] = info + } + + return idle +} + +// p.mu must be held. +func (p *Pool[PT, T]) notifyAboutIdle(idle PT) (notified bool) { + for el := p.waitQ.Front(); el != nil; el = p.waitQ.Front() { + // Some goroutine is waiting for a session. + // + // It could be in this states: + // 1) Reached the select code and awaiting for a value in channel. + // 2) Reached the select code but already in branch of deadline + // cancellation. In this case it is locked on p.mu.Lock(). + // 3) Not reached the select code and thus not reading yet from the + // channel. + // + // For cases (2) and (3) we close the channel to signal that goroutine + // missed something and may want to retry (especially for case (3)). + // + // After that we taking a next waiter and repeat the same. + var ch *chan PT + p.changeState(func() Stats { + ch = p.waitQ.Remove(el) //nolint:scopelint + + return p.stats() + }) + select { + case *ch <- idle: + // Case (1). + return true + + case <-p.done: + // Case (2) or (3). + close(*ch) + + default: + // Case (2) or (3). + close(*ch) + } + } + + return false +} + +// p.mu must be held. +func (p *Pool[PT, T]) removeIdle(item PT) itemInfo[PT, T] { + info, has := p.index[item] + if !has || info.idle == nil { + panic("inconsistent session client index") + } + + p.changeState(func() Stats { + p.idle.Remove(info.idle) + info.idle = nil + p.index[item] = info + + return p.stats() + }) + + return info +} + +// p.mu must be held. +func (p *Pool[PT, T]) pushIdle(item PT, now time.Time) { + info, has := p.index[item] + if !has { + panic("trying to store item created outside of the client") + } + if info.idle != nil { + panic("inconsistent item client index") + } + + p.changeState(func() Stats { + info.touched = now + info.idle = p.idle.PushBack(item) + p.index[item] = info + + return p.stats() + }) +} + +const maxAttempts = 100 + +func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { //nolint:funlen + var ( + start = p.config.clock.Now() + attempt int + lastErr error + ) + + if onGet := p.config.trace.OnGet; onGet != nil { + onDone := onGet(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"), + ) + if onDone != nil { + defer func() { + onDone(item, attempt, finalErr) + }() + } + } + + for ; attempt < maxAttempts; attempt++ { + select { + case <-p.done: + return nil, xerrors.WithStackTrace(errClosedPool) + default: + } + + if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif + return p.removeFirstIdle() + }); item != nil { + if item.IsAlive() { + info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] { + info, has := p.index[item] + if !has { + panic("no index for item") + } + + return info + }) + + if p.config.idleThreshold > 0 && p.config.clock.Since(info.touched) > p.config.idleThreshold { + p.closeItem(ctx, item) + p.mu.WithLock(func() { + p.changeState(func() Stats { + delete(p.index, item) + + return p.stats() + }) + }) + + continue + } + + return item, nil + } + } + + item, createItemErr := p.createItem(ctx) + if item != nil { + return item, nil + } + + if !isRetriable(createItemErr) { + return nil, xerrors.WithStackTrace(createItemErr) + } + + item, waitFromChErr := p.waitFromCh(ctx) + if item != nil { + return item, nil + } + + if waitFromChErr != nil && !isRetriable(waitFromChErr) { + return nil, xerrors.WithStackTrace(waitFromChErr) + } + + lastErr = xerrors.WithStackTrace(xerrors.Join(createItemErr, waitFromChErr)) + } + + p.mu.RLock() + defer p.mu.RUnlock() + + return nil, xerrors.WithStackTrace( + fmt.Errorf("failed to get item from pool after %d attempts and %v, pool has %d items (%d busy, "+ + "%d idle, %d create_in_progress): %w", attempt, p.config.clock.Since(start), len(p.index), + len(p.index)-p.idle.Len(), p.idle.Len(), p.createInProgress, lastErr, + ), + ) +} + +//nolint:funlen +func (p *Pool[PT, T]) waitFromCh(ctx context.Context) (item PT, finalErr error) { + var ( + ch *chan PT + el *xlist.Element[*chan PT] + ) + + p.mu.WithLock(func() { + p.changeState(func() Stats { + ch = p.getWaitCh() + el = p.waitQ.PushBack(ch) + + return p.stats() + }) + }) + + if onWait := p.config.trace.onWait; onWait != nil { + onDone := onWait() + if onDone != nil { + defer func() { + onDone(item, finalErr) + }() + } + } + + var deadliine <-chan time.Time + if timeout := p.config.createTimeout; timeout > 0 { + t := p.config.clock.NewTimer(timeout) + defer t.Stop() + + deadliine = t.Chan() + } + + select { + case <-p.done: + p.mu.WithLock(func() { + p.changeState(func() Stats { + p.waitQ.Remove(el) + + return p.stats() + }) + }) + + return nil, xerrors.WithStackTrace(errClosedPool) + + case item, ok := <-*ch: + // Note that race may occur and some goroutine may try to write + // session into channel after it was enqueued but before it being + // read here. In that case we will receive nil here and will retry. + // + // The same way will work when some session become deleted - the + // nil value will be sent into the channel. + if ok { + // Put only filled and not closed channel back to the Client. + // That is, we need to avoid races on filling reused channel + // for the next waiter – session could be lost for a long time. + p.putWaitCh(ch) + } + + return item, nil + + case <-deadliine: + p.mu.WithLock(func() { + p.changeState(func() Stats { + p.waitQ.Remove(el) + + return p.stats() + }) + }) + + return nil, nil + + case <-ctx.Done(): + p.mu.WithLock(func() { + p.changeState(func() Stats { + p.waitQ.Remove(el) + + return p.stats() + }) + }) + + return nil, xerrors.WithStackTrace(ctx.Err()) + } +} + +// p.mu must be free. +func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) { + if onPut := p.config.trace.OnPut; onPut != nil { + onDone := onPut(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"), + item, + ) + if onDone != nil { + defer func() { + onDone(finalErr) + }() + } + } + select { + case <-p.done: + p.closeItem(ctx, item) + p.mu.WithLock(func() { + p.changeState(func() Stats { + delete(p.index, item) + + return p.stats() + }) + }) + + return xerrors.WithStackTrace(errClosedPool) + default: + p.mu.Lock() + defer p.mu.Unlock() + + if !item.IsAlive() { + p.closeItem(ctx, item) + p.changeState(func() Stats { + delete(p.index, item) + + return p.stats() + }) + + return xerrors.WithStackTrace(errItemIsNotAlive) + } + + if p.idle.Len() >= p.config.limit { + p.closeItem(ctx, item) + p.changeState(func() Stats { + delete(p.index, item) + + return p.stats() + }) + + return xerrors.WithStackTrace(errPoolIsOverflow) + } + + if !p.notifyAboutIdle(item) { + p.pushIdle(item, p.config.clock.Now()) + } + + return nil + } } diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index a0d9d40fc..14504f5c3 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -1,30 +1,105 @@ package pool import ( + "bytes" "context" "errors" + "fmt" + "path" + "runtime" + "runtime/debug" "sync" "sync/atomic" "testing" "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) -type testItem struct { - v uint32 +type ( + testItem struct { + v uint32 - onClose func() error - onIsAlive func() bool + closed bytes.Buffer + + onClose func() error + onIsAlive func() bool + } + testWaitChPool struct { + xsync.Pool[chan *testItem] + testHookGetWaitCh func() + } +) + +var defaultTrace = &Trace{ + OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) { + return func(limit int) { + } + }, + OnClose: func(ctx *context.Context, call stack.Caller) func(err error) { + return func(err error) { + } + }, + OnTry: func(ctx *context.Context, call stack.Caller) func(err error) { + return func(err error) { + } + }, + OnWith: func(ctx *context.Context, call stack.Caller) func(attempts int, err error) { + return func(attempts int, err error) { + } + }, + OnPut: func(ctx *context.Context, call stack.Caller, item any) func(err error) { + return func(err error) { + } + }, + OnGet: func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) { + return func(item any, attempts int, err error) { + } + }, + onWait: func() func(item any, err error) { + return func(item any, err error) { + } + }, + OnChange: func(stats Stats) { + }, +} + +func (p *testWaitChPool) GetOrNew() *chan *testItem { + if p.testHookGetWaitCh != nil { + p.testHookGetWaitCh() + } + + return p.Pool.GetOrNew() +} + +func (p *testWaitChPool) whenWantWaitCh() <-chan struct{} { + var ( + prev = p.testHookGetWaitCh + ch = make(chan struct{}) + ) + p.testHookGetWaitCh = func() { + p.testHookGetWaitCh = prev + close(ch) + } + + return ch } -func (t testItem) IsAlive() bool { +func (p *testWaitChPool) Put(ch *chan *testItem) {} + +func (t *testItem) IsAlive() bool { if t.onIsAlive != nil { return t.onIsAlive() } @@ -32,11 +107,19 @@ func (t testItem) IsAlive() bool { return true } -func (t testItem) ID() string { +func (t *testItem) ID() string { return "" } -func (t testItem) Close(context.Context) error { +func (t *testItem) Close(context.Context) error { + if t.closed.Len() > 0 { + debug.PrintStack() + fmt.Println(t.closed.String()) + panic("item already closed") + } + + t.closed.Write(debug.Stack()) + if t.onClose != nil { return t.onClose() } @@ -44,30 +127,65 @@ func (t testItem) Close(context.Context) error { return nil } +func caller() string { + _, file, line, _ := runtime.Caller(2) + + return fmt.Sprintf("%s:%d", path.Base(file), line) +} + +func mustGetItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T]) PT { + s, err := p.getItem(context.Background()) + if err != nil { + t.Helper() + t.Fatalf("%s: %v", caller(), err) + } + + return s +} + +func mustPutItem[PT Item[T], T any](t testing.TB, p *Pool[PT, T], item PT) { + if err := p.putItem(context.Background(), item); err != nil { + t.Helper() + t.Fatalf("%s: %v", caller(), err) + } +} + +func mustClose(t testing.TB, pool closer.Closer) { + if err := pool.Close(context.Background()); err != nil { + t.Helper() + t.Fatalf("%s: %v", caller(), err) + } +} + func TestPool(t *testing.T) { rootCtx := xtest.Context(t) t.Run("New", func(t *testing.T) { t.Run("Default", func(t *testing.T) { - p := New[*testItem, testItem](rootCtx) + p := New[*testItem, testItem](rootCtx, + WithTrace[*testItem, testItem](defaultTrace), + ) err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { return nil }) require.NoError(t, err) }) t.Run("WithLimit", func(t *testing.T) { - p := New[*testItem, testItem](rootCtx, WithLimit[*testItem, testItem](1)) + p := New[*testItem, testItem](rootCtx, WithLimit[*testItem, testItem](1), + WithTrace[*testItem, testItem](defaultTrace), + ) require.EqualValues(t, 1, p.config.limit) }) - t.Run("WithCreateFunc", func(t *testing.T) { + t.Run("WithCreateItemFunc", func(t *testing.T) { var newCounter int64 p := New(rootCtx, WithLimit[*testItem, testItem](1), - WithCreateFunc(func(context.Context) (*testItem, error) { + WithCreateItemFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&newCounter, 1) var v testItem return &v, nil }), + WithTrace[*testItem, testItem](defaultTrace), ) err := p.With(rootCtx, func(ctx context.Context, item *testItem) error { return nil @@ -76,13 +194,275 @@ func TestPool(t *testing.T) { require.EqualValues(t, p.config.limit, atomic.LoadInt64(&newCounter)) }) }) + t.Run("Close", func(t *testing.T) { + counter := 0 + xtest.TestManyTimes(t, func(t testing.TB) { + counter++ + defer func() { + if counter%1000 == 0 { + t.Logf("%d times test passed", counter) + } + }() + + var ( + created atomic.Int32 + closed = [...]bool{false, false, false} + ) + + p := New[*testItem, testItem](rootCtx, + WithLimit[*testItem, testItem](3), + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { + var ( + idx = created.Add(1) - 1 + v = testItem{ + v: 0, + onClose: func() error { + closed[idx] = true + + return nil + }, + } + ) + + return &v, nil + }), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + WithTrace[*testItem, testItem](defaultTrace), + ) + + defer func() { + _ = p.Close(context.Background()) + }() + + require.Empty(t, p.index) + require.Zero(t, p.idle.Len()) + + var ( + s1 = mustGetItem(t, p) + s2 = mustGetItem(t, p) + s3 = mustGetItem(t, p) + ) + + require.Len(t, p.index, 3) + require.Zero(t, p.idle.Len()) + + mustPutItem(t, p, s1) + mustPutItem(t, p, s2) + + require.Len(t, p.index, 3) + require.Equal(t, 2, p.idle.Len()) + + mustClose(t, p) + + require.Len(t, p.index, 1) + require.Zero(t, p.idle.Len()) + + require.True(t, closed[0]) // idle item in pool + require.True(t, closed[1]) // idle item in pool + require.False(t, closed[2]) // item extracted from idle but closed later on putItem + + require.ErrorIs(t, p.putItem(context.Background(), s3), errClosedPool) + + require.True(t, closed[2]) // after putItem s3 must be closed + }) + t.Run("WhenWaiting", func(t *testing.T) { + for _, test := range []struct { + name string + racy bool + }{ + { + name: "normal", + racy: false, + }, + { + name: "racy", + racy: true, + }, + } { + t.Run(test.name, func(t *testing.T) { + var ( + get = make(chan struct{}) + wait = make(chan struct{}) + got = make(chan error) + ) + waitChPool := &testWaitChPool{ + Pool: xsync.Pool[chan *testItem]{ + New: func() *chan *testItem { + ch := make(chan *testItem) + + return &ch + }, + }, + } + p := New[*testItem, testItem](rootCtx, + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + WithLimit[*testItem, testItem](1), + WithTrace[*testItem, testItem](&Trace{ + onWait: func() func(item any, err error) { + wait <- struct{}{} + + return nil + }, + }), + ) + p.waitChPool = waitChPool + defer func() { + _ = p.Close(context.Background()) + }() + + // first call getItem creates an item and store in index + // second call getItem from pool with limit === 1 will skip + // create item step (because pool have not enough space for + // creating new items) and will freeze until wait free item from pool + mustGetItem(t, p) + + go func() { + p.config.trace.OnGet = func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) { + get <- struct{}{} + + return nil + } + + _, err := p.getItem(context.Background()) + got <- err + }() + + regWait := waitChPool.whenWantWaitCh() + <-get // Await for getter blocked on awaiting session. + <-regWait // Let the getter register itself in the wait queue. + + if test.racy { + // We are testing the case, when session consumer registered + // himself in the wait queue, but not ready to receive the + // session when session arrives (that is, stuck between + // pushing channel in the list and reading from the channel). + _ = p.Close(context.Background()) + <-wait + } else { + // We are testing the normal case, when session consumer registered + // himself in the wait queue and successfully blocked on + // reading from signaling channel. + <-wait + // Let the waiting goroutine to block on reading from channel. + _ = p.Close(context.Background()) + } + + const timeout = time.Second + select { + case err := <-got: + if !xerrors.Is(err, errClosedPool) { + t.Fatalf( + "unexpected error: %q; want %q'", + err, errClosedPool, + ) + } + case <-p.config.clock.After(timeout): + t.Fatalf("no result after %s", timeout) + } + }) + } + }) + t.Run("IdleSessions", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + var ( + idleThreshold = 4 * time.Second + closedCount atomic.Int64 + fakeClock = clockwork.NewFakeClock() + ) + p := New[*testItem, testItem](rootCtx, + WithLimit[*testItem, testItem](2), + WithCreateItemTimeout[*testItem, testItem](0), + WithCreateItemFunc[*testItem, testItem](func(ctx context.Context) (*testItem, error) { + v := testItem{ + v: 0, + onClose: func() error { + closedCount.Add(1) + + return nil + }, + } + + return &v, nil + }), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + // replace default async closer for sync testing + withCloseItemFunc[*testItem, testItem](func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + WithClock[*testItem, testItem](fakeClock), + WithIdleThreshold[*testItem, testItem](idleThreshold), + WithTrace[*testItem, testItem](defaultTrace), + ) + + s1 := mustGetItem(t, p) + s2 := mustGetItem(t, p) + + // Put both items at the absolutely same time. + // That is, both items must be updated their touched timestamp. + mustPutItem(t, p, s1) + mustPutItem(t, p, s2) + + require.Len(t, p.index, 2) + require.Equal(t, 2, p.idle.Len()) + + // Move clock to longer than idleThreshold + fakeClock.Advance(idleThreshold + time.Nanosecond) + + // on get item from idle list the pool must check the item idle timestamp + // both existing items must be closed + // getItem must create a new item and return it from getItem + s3 := mustGetItem(t, p) + + require.Len(t, p.index, 1) + + if !closedCount.CompareAndSwap(2, 0) { + t.Fatal("unexpected number of closed items") + } + + // Move time to idleThreshold / 2 - this emulate a "spent" some time working within item. + fakeClock.Advance(idleThreshold / 2) + + // Now put that item back + // pool must update a touched timestamp of item + mustPutItem(t, p, s3) + + // Move time to idleThreshold / 2 + // Total time since last updating touched timestampe is more than idleThreshold + fakeClock.Advance(idleThreshold/2 + time.Nanosecond) + + require.Len(t, p.index, 1) + require.Equal(t, 1, p.idle.Len()) + + s4 := mustGetItem(t, p) + require.Equal(t, s3, s4) + require.Len(t, p.index, 1) + require.Equal(t, 0, p.idle.Len()) + mustPutItem(t, p, s4) + + _ = p.Close(context.Background()) + + require.Empty(t, p.index) + require.Equal(t, 0, p.idle.Len()) + }, xtest.StopAfter(3*time.Second)) + }) + }) t.Run("Retry", func(t *testing.T) { t.Run("CreateItem", func(t *testing.T) { t.Run("context", func(t *testing.T) { t.Run("Cancelled", func(t *testing.T) { var counter int64 p := New(rootCtx, - WithCreateFunc(func(context.Context) (*testItem, error) { + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&counter, 1) if atomic.LoadInt64(&counter) < 10 { @@ -103,7 +483,9 @@ func TestPool(t *testing.T) { t.Run("DeadlineExceeded", func(t *testing.T) { var counter int64 p := New(rootCtx, - WithCreateFunc(func(context.Context) (*testItem, error) { + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&counter, 1) if atomic.LoadInt64(&counter) < 10 { @@ -125,7 +507,9 @@ func TestPool(t *testing.T) { t.Run("OnTransportError", func(t *testing.T) { var counter int64 p := New(rootCtx, - WithCreateFunc(func(context.Context) (*testItem, error) { + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&counter, 1) if atomic.LoadInt64(&counter) < 10 { @@ -146,7 +530,9 @@ func TestPool(t *testing.T) { t.Run("OnOperationError", func(t *testing.T) { var counter int64 p := New(rootCtx, - WithCreateFunc(func(context.Context) (*testItem, error) { + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&counter, 1) if atomic.LoadInt64(&counter) < 10 { @@ -164,27 +550,72 @@ func TestPool(t *testing.T) { require.NoError(t, err) require.GreaterOrEqual(t, atomic.LoadInt64(&counter), int64(10)) }) - }) - }) - t.Run("On", func(t *testing.T) { - t.Run("Context", func(t *testing.T) { - t.Run("Canceled", func(t *testing.T) { - ctx, cancel := context.WithCancel(rootCtx) - cancel() - p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1)) - err := p.With(ctx, func(ctx context.Context, testItem *testItem) error { - return nil + t.Run("NilNil", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + limit := 100 + ctx, cancel := xcontext.WithTimeout( + context.Background(), + 55*time.Second, + ) + defer cancel() + p := New[*testItem, testItem](rootCtx, + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + ) + defer func() { + _ = p.Close(context.Background()) + }() + r := xrand.New(xrand.WithLock()) + errCh := make(chan error, limit*10) + fn := func(wg *sync.WaitGroup) { + defer wg.Done() + childCtx, childCancel := xcontext.WithTimeout( + ctx, + time.Duration(r.Int64(int64(time.Second))), + ) + defer childCancel() + s, err := p.createItem(childCtx) + if s == nil && err == nil { + errCh <- fmt.Errorf("unexpected result: <%v, %w>", s, err) + } + } + wg := &sync.WaitGroup{} + wg.Add(limit * 10) + for i := 0; i < limit*10; i++ { + go fn(wg) + } + go func() { + wg.Wait() + close(errCh) + }() + for e := range errCh { + t.Fatal(e) + } }) - require.ErrorIs(t, err, context.Canceled) }) - t.Run("DeadlineExceeded", func(t *testing.T) { - ctx, cancel := context.WithTimeout(rootCtx, 0) - cancel() - p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1)) - err := p.With(ctx, func(ctx context.Context, testItem *testItem) error { - return nil + }) + t.Run("On", func(t *testing.T) { + t.Run("Context", func(t *testing.T) { + t.Run("Canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(rootCtx) + cancel() + p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1)) + err := p.With(ctx, func(ctx context.Context, testItem *testItem) error { + return nil + }) + require.ErrorIs(t, err, context.Canceled) + }) + t.Run("DeadlineExceeded", func(t *testing.T) { + ctx, cancel := context.WithTimeout(rootCtx, 0) + cancel() + p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1)) + err := p.With(ctx, func(ctx context.Context, testItem *testItem) error { + return nil + }) + require.ErrorIs(t, err, context.DeadlineExceeded) }) - require.ErrorIs(t, err, context.DeadlineExceeded) }) }) }) @@ -197,7 +628,7 @@ func TestPool(t *testing.T) { ) p := New(rootCtx, WithLimit[*testItem, testItem](1), - WithCreateFunc(func(context.Context) (*testItem, error) { + WithCreateItemFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&createCounter, 1) v := &testItem{ @@ -210,6 +641,10 @@ func TestPool(t *testing.T) { return v, nil }), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), ) err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { return nil @@ -219,109 +654,229 @@ func TestPool(t *testing.T) { err = p.Close(rootCtx) require.NoError(t, err) require.EqualValues(t, atomic.LoadInt64(&createCounter), atomic.LoadInt64(&closeCounter)) - }, xtest.StopAfter(time.Second)) + }) }) t.Run("IsAlive", func(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { var ( - newItems int64 - deleteItems int64 + newItems atomic.Int64 + deleteItems atomic.Int64 expErr = xerrors.Retryable(errors.New("expected error"), xerrors.InvalidObject()) ) p := New(rootCtx, WithLimit[*testItem, testItem](1), - WithCreateFunc(func(context.Context) (*testItem, error) { - atomic.AddInt64(&newItems, 1) + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { + newItems.Add(1) v := &testItem{ onClose: func() error { - atomic.AddInt64(&deleteItems, 1) + deleteItems.Add(1) return nil }, onIsAlive: func() bool { - return atomic.LoadInt64(&newItems) >= 10 + return newItems.Load() >= 10 }, } return v, nil }), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), ) - p.closeItem = func(ctx context.Context, item *testItem) { - _ = item.Close(ctx) - } err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { - if atomic.LoadInt64(&newItems) < 10 { + if newItems.Load() < 10 { return expErr } return nil }) require.NoError(t, err) - require.GreaterOrEqual(t, atomic.LoadInt64(&newItems), int64(9)) - require.GreaterOrEqual(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems)) + require.GreaterOrEqual(t, newItems.Load(), int64(9)) + require.GreaterOrEqual(t, newItems.Load(), deleteItems.Load()) err = p.Close(rootCtx) require.NoError(t, err) - require.EqualValues(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems)) - }, xtest.StopAfter(5*time.Second)) + require.EqualValues(t, newItems.Load(), deleteItems.Load()) + }, xtest.StopAfter(3*time.Second)) }) }) - t.Run("Stress", func(t *testing.T) { - xtest.TestManyTimes(t, func(t testing.TB) { - trace := &Trace{ - OnChange: func(info Stats) { - require.GreaterOrEqual(t, info.Limit, info.Idle) - }, + t.Run("With", func(t *testing.T) { + t.Run("ExplicitSessionClose", func(t *testing.T) { + var ( + created atomic.Int32 + closed atomic.Int32 + ) + assertCreated := func(exp int32) { + if act := created.Load(); act != exp { + t.Errorf( + "unexpected number of created items: %v; want %v", + act, exp, + ) + } } - p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace)) - var wg sync.WaitGroup - wg.Add(DefaultLimit*2 + 1) - for range make([]struct{}, DefaultLimit*2) { - go func() { - defer wg.Done() - err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { - return nil - }) - if err != nil && !xerrors.Is(err, errClosedPool, context.Canceled) { - t.Failed() - } - }() + assertClosed := func(exp int32) { + if act := closed.Load(); act != exp { + t.Errorf( + "unexpected number of closed items: %v; want %v", + act, exp, + ) + } } - go func() { - defer wg.Done() - time.Sleep(time.Millisecond) - err := p.Close(rootCtx) - require.NoError(t, err) + p := New[*testItem, testItem](rootCtx, + WithLimit[*testItem, testItem](1), + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCreateItemFunc(func(context.Context) (*testItem, error) { + created.Add(1) + v := testItem{ + v: 0, + onClose: func() error { + closed.Add(1) + + return nil + }, + } + + return &v, nil + }), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + ) + defer func() { + _ = p.Close(context.Background()) }() - wg.Wait() - }, xtest.StopAfter(14*time.Second)) - }) - t.Run("ParallelCreation", func(t *testing.T) { - xtest.TestManyTimes(t, func(t testing.TB) { - trace := &Trace{ - OnChange: func(info Stats) { - require.Equal(t, DefaultLimit, info.Limit) - require.LessOrEqual(t, info.Idle, DefaultLimit) - }, - } - p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace)) - var wg sync.WaitGroup - for range make([]struct{}, DefaultLimit*10) { - wg.Add(1) + + s := mustGetItem(t, p) + assertCreated(1) + + mustPutItem(t, p, s) + assertClosed(0) + + mustGetItem(t, p) + assertCreated(1) + + p.closeItem(context.Background(), s) + delete(p.index, s) + assertClosed(1) + + mustGetItem(t, p) + assertCreated(2) + }) + t.Run("Racy", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + trace := &Trace{ + OnChange: func(stats Stats) { + require.GreaterOrEqual(t, stats.Limit, stats.Idle) + }, + } + p := New[*testItem, testItem](rootCtx, + WithTrace[*testItem, testItem](trace), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + ) + r := xrand.New(xrand.WithLock()) + var wg sync.WaitGroup + wg.Add(DefaultLimit*2 + 1) + for range make([]struct{}, DefaultLimit*2) { + go func() { + defer wg.Done() + childCtx, childCancel := xcontext.WithTimeout( + rootCtx, + time.Duration(r.Int64(int64(time.Second))), + ) + defer childCancel() + err := p.With(childCtx, func(ctx context.Context, testItem *testItem) error { + return nil + }) + if err != nil && !xerrors.Is(err, errClosedPool, context.Canceled) { + t.Failed() + } + }() + } go func() { defer wg.Done() - err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { - return nil - }) - if err != nil && !xerrors.Is(err, errClosedPool, context.Canceled) { - t.Failed() - } - stats := p.Stats() - require.LessOrEqual(t, stats.Idle, DefaultLimit) + time.Sleep(time.Millisecond) + err := p.Close(rootCtx) + require.NoError(t, err) }() + wg.Wait() + }) + }) + t.Run("ParallelCreation", func(t *testing.T) { + xtest.TestManyTimes(t, func(t testing.TB) { + trace := &Trace{ + OnChange: func(stats Stats) { + require.Equal(t, DefaultLimit, stats.Limit) + require.LessOrEqual(t, stats.Idle, DefaultLimit) + }, + } + p := New[*testItem, testItem](rootCtx, + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + WithTrace[*testItem, testItem](trace), + ) + var wg sync.WaitGroup + for range make([]struct{}, DefaultLimit*10) { + wg.Add(1) + go func() { + defer wg.Done() + err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { + return nil + }) + if err != nil && !xerrors.Is(err, errClosedPool, context.Canceled) { + t.Failed() + } + stats := p.Stats() + require.LessOrEqual(t, stats.Idle, DefaultLimit) + }() + } + + wg.Wait() + }) + }) + t.Run("PutInFull", func(t *testing.T) { + p := New(rootCtx, + WithLimit[*testItem, testItem](1), + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + ) + item := mustGetItem(t, p) + if err := p.putItem(context.Background(), item); err != nil { + t.Fatalf("unexpected error on put session into non-full client: %v, wand: %v", err, nil) } - wg.Wait() - }, xtest.StopAfter(14*time.Second)) + if err := p.putItem(context.Background(), &testItem{}); !xerrors.Is(err, errPoolIsOverflow) { + t.Fatalf("unexpected error on put item into full pool: %v, wand: %v", err, errPoolIsOverflow) + } + }) + t.Run("PutTwice", func(t *testing.T) { + p := New(rootCtx, + WithLimit[*testItem, testItem](2), + WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond), + WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond), + // replace default async closer for sync testing + withCloseItemFunc(func(ctx context.Context, item *testItem) { + _ = item.Close(ctx) + }), + ) + item := mustGetItem(t, p) + mustPutItem(t, p, item) + + require.Panics(t, func() { + _ = p.putItem(context.Background(), item) + }) + }) }) } diff --git a/internal/pool/trace.go b/internal/pool/trace.go index ae0300210..74620eaf8 100644 --- a/internal/pool/trace.go +++ b/internal/pool/trace.go @@ -14,7 +14,7 @@ type ( OnWith func(ctx *context.Context, call stack.Caller) func(attempts int, err error) OnPut func(ctx *context.Context, call stack.Caller, item any) func(err error) OnGet func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) - onWait func() func(item any, err error) //nolint:unused + onWait func() func(item any, err error) OnChange func(Stats) } ) diff --git a/internal/query/client.go b/internal/query/client.go index bd8d856b4..78a7467b3 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -16,6 +16,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" @@ -276,11 +277,22 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO onDone(attempts, finalErr) }() - err := do(ctx, c.pool, func(ctx context.Context, s *Session) error { - attempts++ + err := do(ctx, c.pool, + func(ctx context.Context, s *Session) error { + attempts++ - return op(ctx, s) - }, options.ParseDoOpts(c.config.Trace(), opts...).RetryOpts()...) + return op(ctx, s) + }, + append([]retry.Option{ + retry.WithTrace(&trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + }), + }, options.ParseDoOpts(c.config.Trace(), opts...).RetryOpts()...)..., + ) return err } @@ -289,13 +301,11 @@ func doTx( ctx context.Context, pool sessionPool, op query.TxOperation, - t *trace.Query, - opts ...options.DoTxOption, + txSettings tx.Settings, + opts ...retry.Option, ) (finalErr error) { - doTxOpts := options.ParseDoTxOpts(t, opts...) - err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) { - tx, err := s.Begin(ctx, doTxOpts.TxSettings()) + tx, err := s.Begin(ctx, txSettings) if err != nil { return xerrors.WithStackTrace(err) } @@ -319,7 +329,7 @@ func doTx( } return nil - }, doTxOpts.RetryOpts()...) + }, opts...) if err != nil { return xerrors.WithStackTrace(err) } @@ -515,17 +525,33 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options onDone = trace.QueryOnDoTx(c.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.(*Client).DoTx"), ) + doTxOpts = options.ParseDoTxOpts(opts...) attempts = 0 ) defer func() { onDone(attempts, finalErr) }() - err := doTx(ctx, c.pool, func(ctx context.Context, tx query.TxActor) error { - attempts++ + err := doTx(ctx, c.pool, + func(ctx context.Context, tx query.TxActor) error { + attempts++ - return op(ctx, tx) - }, c.config.Trace(), opts...) + return op(ctx, tx) + }, + doTxOpts.TxSettings(), + append( + []retry.Option{ + retry.WithTrace(&trace.Retry{ + OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) { + return func(info trace.RetryLoopDoneInfo) { + attempts = info.Attempts + } + }, + }), + }, + doTxOpts.RetryOpts()..., + )..., + ) if err != nil { return xerrors.WithStackTrace(err) } @@ -542,7 +568,25 @@ func newPool( pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())), pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()), pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()), - pool.WithCreateFunc(createSession), + pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) { + var ( + createCtx context.Context + cancelCreate context.CancelFunc + ) + if d := cfg.SessionCreateTimeout(); d > 0 { + createCtx, cancelCreate = xcontext.WithTimeout(ctx, d) + } else { + createCtx, cancelCreate = xcontext.WithCancel(ctx) + } + defer cancelCreate() + + s, err := createSession(createCtx) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return s, nil + }), ) } diff --git a/internal/query/client_test.go b/internal/query/client_test.go index a62502079..adfc303ac 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/query" @@ -249,7 +250,7 @@ func TestClient(t *testing.T) { }() return tx.Exec(ctx, "") - }, &trace.Query{}) + }, tx.NewSettings(tx.WithDefaultTxMode())) require.NoError(t, err) }) t.Run("RetryableError", func(t *testing.T) { @@ -274,7 +275,7 @@ func TestClient(t *testing.T) { } return nil - }, &trace.Query{}) + }, tx.NewSettings(tx.WithDefaultTxMode())) require.NoError(t, err) require.Equal(t, 10, counter) }) @@ -1167,7 +1168,7 @@ func testPool( ) *pool.Pool[*Session, Session] { return pool.New[*Session, Session](ctx, pool.WithLimit[*Session, Session](1), - pool.WithCreateFunc(createSession), + pool.WithCreateItemFunc(createSession), ) } diff --git a/internal/query/options/retry.go b/internal/query/options/retry.go index af7c0f207..11a458e49 100644 --- a/internal/query/options/retry.go +++ b/internal/query/options/retry.go @@ -114,12 +114,10 @@ func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) { return s } -func ParseDoTxOpts(t *trace.Query, opts ...DoTxOption) (s *doTxSettings) { +func ParseDoTxOpts(opts ...DoTxOption) (s *doTxSettings) { s = &doTxSettings{ txSettings: tx.NewSettings(tx.WithDefaultTxMode()), - doSettings: doSettings{ - trace: t, - }, + doSettings: doSettings{}, } for _, opt := range opts { diff --git a/tests/integration/basic_example_native_test.go b/tests/integration/basic_example_native_test.go index 1766bce58..be4270400 100644 --- a/tests/integration/basic_example_native_test.go +++ b/tests/integration/basic_example_native_test.go @@ -6,13 +6,11 @@ package integration import ( "context" "fmt" - "math" "net/http" "os" "path" "runtime/debug" "strings" - "sync" "sync/atomic" "testing" "time" @@ -25,7 +23,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/balancers" "github.com/ydb-platform/ydb-go-sdk/v3/config" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/log" "github.com/ydb-platform/ydb-go-sdk/v3/meta" @@ -38,123 +35,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -type poolStats struct { - xsync.Mutex - - inFlightSessions map[string]struct{} - openSessions map[string]struct{} - inPoolSessions map[string]struct{} - limit int -} - -func (s *poolStats) print(t testing.TB) { - s.Lock() - defer s.Unlock() - t.Log("poolStats:") - t.Log(" - limit :", s.limit) - t.Log(" - open :", len(s.openSessions)) - t.Log(" - in-pool :", len(s.inPoolSessions)) - t.Log(" - in-flight :", len(s.inFlightSessions)) -} - -func (s *poolStats) check(t testing.TB) { - s.Lock() - defer s.Unlock() - if s.limit < 0 { - t.Fatalf("negative limit: %d", s.limit) - } - if len(s.inFlightSessions) > len(s.inPoolSessions) { - t.Fatalf("len(in_flight) > len(pool) (%d > %d)", len(s.inFlightSessions), len(s.inPoolSessions)) - } - if len(s.inPoolSessions) > s.limit { - t.Fatalf("len(pool) > limit (%d > %d)", len(s.inPoolSessions), s.limit) - } -} - -func (s *poolStats) max() int { - s.Lock() - defer s.Unlock() - return s.limit -} - -func (s *poolStats) addToOpen(t testing.TB, id string) { - defer s.check(t) - - s.Lock() - defer s.Unlock() - - if _, ok := s.openSessions[id]; ok { - t.Fatalf("session '%s' add to open sessions twice", id) - } - - s.openSessions[id] = struct{}{} -} - -func (s *poolStats) removeFromOpen(t testing.TB, id string) { - defer s.check(t) - - s.Lock() - defer s.Unlock() - - if _, ok := s.openSessions[id]; !ok { - t.Fatalf("session '%s' already removed from open sessions", id) - } - - delete(s.openSessions, id) -} - -func (s *poolStats) addToPool(t testing.TB, id string) { - defer s.check(t) - - s.Lock() - defer s.Unlock() - - if _, ok := s.inPoolSessions[id]; ok { - t.Fatalf("session '%s' add to pool twice", id) - } - - s.inPoolSessions[id] = struct{}{} -} - -func (s *poolStats) removeFromPool(t testing.TB, id string) { - defer s.check(t) - - s.Lock() - defer s.Unlock() - - if _, ok := s.inPoolSessions[id]; !ok { - t.Fatalf("session '%s' already removed from pool", id) - } - - delete(s.inPoolSessions, id) -} - -func (s *poolStats) addToInFlight(t testing.TB, id string) { - defer s.check(t) - - s.Lock() - defer s.Unlock() - - if _, ok := s.inFlightSessions[id]; ok { - t.Fatalf("session '%s' add to in-flight twice", id) - } - - s.inFlightSessions[id] = struct{}{} -} - -func (s *poolStats) removeFromInFlight(t testing.TB, id string) { - defer s.check(t) - - s.Lock() - defer s.Unlock() - - if _, ok := s.inFlightSessions[id]; !ok { - return - } - - delete(s.inFlightSessions, id) -} - func TestBasicExampleNative(sourceTest *testing.T) { //nolint:gocyclo t := xtest.MakeSyncedTest(sourceTest) folder := t.Name() @@ -171,64 +51,9 @@ func TestBasicExampleNative(sourceTest *testing.T) { //nolint:gocyclo totalConsumedUnits.Add(meta.ConsumedUnits(md)) }) - s := &poolStats{ - limit: math.MaxInt32, - openSessions: make(map[string]struct{}), - inPoolSessions: make(map[string]struct{}), - inFlightSessions: make(map[string]struct{}), - } - defer func() { - s.Lock() - defer s.Unlock() - if len(s.inFlightSessions) != 0 { - t.Errorf("'in-flight' not a zero after closing table client: %v", s.inFlightSessions) - } - if len(s.openSessions) != 0 { - t.Errorf("'openSessions' not a zero after closing table client: %v", s.openSessions) - } - if len(s.inPoolSessions) != 0 { - t.Errorf("'inPoolSessions' not a zero after closing table client: %v", s.inPoolSessions) - } - }() - var ( - limit = 50 - - sessionsMtx sync.Mutex - sessions = make(map[string]struct{}, limit) - + limit = 50 shutdowned atomic.Bool - - shutdownTrace = trace.Table{ - OnPoolSessionAdd: func(info trace.TablePoolSessionAddInfo) { - sessionsMtx.Lock() - defer sessionsMtx.Unlock() - sessions[info.Session.ID()] = struct{}{} - }, - OnPoolGet: func( - info trace.TablePoolGetStartInfo, - ) func( - trace.TablePoolGetDoneInfo, - ) { - return func(info trace.TablePoolGetDoneInfo) { - if info.Session == nil { - return - } - if shutdowned.Load() { - return - } - if info.Session.Status() != table.SessionClosing { - return - } - sessionsMtx.Lock() - defer sessionsMtx.Unlock() - if _, has := sessions[info.Session.ID()]; !has { - return - } - t.Fatalf("old session returned from pool after shutdown") - } - }, - } ) db, err := ydb.Open(ctx, @@ -274,67 +99,6 @@ func TestBasicExampleNative(sourceTest *testing.T) { //nolint:gocyclo ydb.WithPanicCallback(func(e interface{}) { t.Fatalf("panic recovered:%v:\n%s", e, debug.Stack()) }), - ydb.WithTraceTable( - *shutdownTrace.Compose( - &trace.Table{ - OnInit: func( - info trace.TableInitStartInfo, - ) func( - trace.TableInitDoneInfo, - ) { - return func(info trace.TableInitDoneInfo) { - s.WithLock(func() { - s.limit = info.Limit - }) - } - }, - OnSessionNew: func( - info trace.TableSessionNewStartInfo, - ) func( - trace.TableSessionNewDoneInfo, - ) { - return func(info trace.TableSessionNewDoneInfo) { - if info.Error == nil { - s.addToOpen(t, info.Session.ID()) - } - } - }, - OnSessionDelete: func( - info trace.TableSessionDeleteStartInfo, - ) func( - trace.TableSessionDeleteDoneInfo, - ) { - s.removeFromOpen(t, info.Session.ID()) - return nil - }, - OnPoolSessionAdd: func(info trace.TablePoolSessionAddInfo) { - s.addToPool(t, info.Session.ID()) - }, - OnPoolSessionRemove: func(info trace.TablePoolSessionRemoveInfo) { - s.removeFromPool(t, info.Session.ID()) - }, - OnPoolGet: func( - info trace.TablePoolGetStartInfo, - ) func( - trace.TablePoolGetDoneInfo, - ) { - return func(info trace.TablePoolGetDoneInfo) { - if info.Error == nil { - s.addToInFlight(t, info.Session.ID()) - } - } - }, - OnPoolPut: func( - info trace.TablePoolPutStartInfo, - ) func( - trace.TablePoolPutDoneInfo, - ) { - s.removeFromInFlight(t, info.Session.ID()) - return nil - }, - }, - ), - ), ) if err != nil { t.Fatal(err) @@ -350,8 +114,6 @@ func TestBasicExampleNative(sourceTest *testing.T) { //nolint:gocyclo return nil }); err != nil { t.Fatalf("pool not initialized: %+v", err) - } else if s.max() != limit { - t.Fatalf("pool size not applied: %+v", s) } // prepare scheme diff --git a/tests/slo/database/sql/main.go b/tests/slo/database/sql/main.go index 6244e917a..f1408bfb8 100644 --- a/tests/slo/database/sql/main.go +++ b/tests/slo/database/sql/main.go @@ -13,6 +13,7 @@ import ( "slo/internal/config" "slo/internal/generator" + "slo/internal/log" "slo/internal/workers" ) @@ -30,8 +31,8 @@ func main() { panic(fmt.Errorf("create config failed: %w", err)) } - fmt.Println("program started") - defer fmt.Println("program finished") + log.Println("program started") + defer log.Println("program finished") ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) defer cancel() @@ -48,7 +49,7 @@ func main() { _ = s.close(shutdownCtx) }() - fmt.Println("db init ok") + log.Println("db init ok") switch cfg.Mode { case config.CreateMode: @@ -56,7 +57,7 @@ func main() { if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("create table ok") + log.Println("create table ok") gen := generator.New(0) @@ -83,14 +84,14 @@ func main() { panic(err) } - fmt.Println("entries write ok") + log.Println("entries write ok") case config.CleanupMode: err = s.dropTable(ctx) if err != nil { panic(fmt.Errorf("drop table failed: %w", err)) } - fmt.Println("cleanup table ok") + log.Println("cleanup table ok") case config.RunMode: gen := generator.New(cfg.InitialDataCount) @@ -103,7 +104,7 @@ func main() { if err != nil { panic(fmt.Errorf("workers close failed: %w", err)) } - fmt.Println("workers close ok") + log.Println("workers close ok") }() wg := sync.WaitGroup{} diff --git a/tests/slo/gorm/main.go b/tests/slo/gorm/main.go index ef4a9a079..e43a8b0ef 100644 --- a/tests/slo/gorm/main.go +++ b/tests/slo/gorm/main.go @@ -13,6 +13,7 @@ import ( "slo/internal/config" "slo/internal/generator" + "slo/internal/log" "slo/internal/workers" ) @@ -30,8 +31,8 @@ func main() { panic(fmt.Errorf("create config failed: %w", err)) } - fmt.Println("program started") - defer fmt.Println("program finished") + log.Println("program started") + defer log.Println("program finished") ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) defer cancel() @@ -48,7 +49,7 @@ func main() { _ = s.close(shutdownCtx) }() - fmt.Println("db init ok") + log.Println("db init ok") switch cfg.Mode { case config.CreateMode: @@ -56,7 +57,7 @@ func main() { if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("create table ok") + log.Println("create table ok") gen := generator.New(0) @@ -83,14 +84,14 @@ func main() { panic(err) } - fmt.Println("entries write ok") + log.Println("entries write ok") case config.CleanupMode: err = s.dropTable(ctx) if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("cleanup table ok") + log.Println("cleanup table ok") case config.RunMode: gen := generator.New(cfg.InitialDataCount) @@ -103,7 +104,7 @@ func main() { if err != nil { panic(fmt.Errorf("workers close failed: %w", err)) } - fmt.Println("workers close ok") + log.Println("workers close ok") }() wg := sync.WaitGroup{} diff --git a/tests/slo/internal/log/log.go b/tests/slo/internal/log/log.go new file mode 100644 index 000000000..3e57c1945 --- /dev/null +++ b/tests/slo/internal/log/log.go @@ -0,0 +1,18 @@ +package log + +import ( + "fmt" + "time" +) + +func timestampPrefix() string { + return "[" + time.Now().Format(time.RFC3339) + "] " +} + +func Printf(format string, args ...any) { + fmt.Printf(timestampPrefix()+format+"\n", args...) +} + +func Println(args ...any) { + fmt.Println(append([]any{timestampPrefix()}, args...)...) +} diff --git a/tests/slo/internal/metrics/metrics.go b/tests/slo/internal/metrics/metrics.go index 2cad21027..49b47eaae 100644 --- a/tests/slo/internal/metrics/metrics.go +++ b/tests/slo/internal/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "log" "time" "github.com/prometheus/client_golang/prometheus" @@ -127,7 +128,7 @@ func (j Span) Stop(err error, attempts int) { latency := time.Since(j.start) if attempts > 1 { - fmt.Printf("more than 1 attempt for request (request_type: %q, attempts: %d, start: %s, latency: %s, err: %v)\n", + log.Printf("more than 1 attempt for request (request_type: %q, attempts: %d, start: %s, latency: %s, err: %v)", j.name, attempts, j.start.Format(time.DateTime), diff --git a/tests/slo/internal/workers/metrics.go b/tests/slo/internal/workers/metrics.go index 2d03f089a..e45512dc3 100644 --- a/tests/slo/internal/workers/metrics.go +++ b/tests/slo/internal/workers/metrics.go @@ -2,11 +2,11 @@ package workers import ( "context" - "fmt" "sync" - "time" "golang.org/x/time/rate" + + "slo/internal/log" ) func (w *Workers) Metrics(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter) { @@ -19,7 +19,7 @@ func (w *Workers) Metrics(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limi err = w.m.Push() if err != nil { - fmt.Printf("[%s] error while pushing: %v\n", time.Now().Format(time.RFC3339), err) + log.Printf("error while pushing: %v", err) } } } diff --git a/tests/slo/internal/workers/read.go b/tests/slo/internal/workers/read.go index 0dbe4c8b8..5794d7c8d 100644 --- a/tests/slo/internal/workers/read.go +++ b/tests/slo/internal/workers/read.go @@ -2,13 +2,12 @@ package workers import ( "context" - "fmt" "math/rand" "sync" - "time" "golang.org/x/time/rate" + "slo/internal/log" "slo/internal/metrics" ) @@ -33,7 +32,7 @@ func (w *Workers) read(ctx context.Context) (err error) { defer func() { m.Stop(err, attempts) if err != nil { - fmt.Printf("[%s] get entry error: %v\n", time.Now().Format(time.RFC3339), err) + log.Printf("get entry error: %v", err) } }() diff --git a/tests/slo/internal/workers/workers.go b/tests/slo/internal/workers/workers.go index a741a4ec0..786235544 100644 --- a/tests/slo/internal/workers/workers.go +++ b/tests/slo/internal/workers/workers.go @@ -2,11 +2,10 @@ package workers import ( "context" - "fmt" - "time" "slo/internal/config" "slo/internal/generator" + "slo/internal/log" "slo/internal/metrics" ) @@ -24,7 +23,7 @@ type Workers struct { func New(cfg *config.Config, s ReadWriter, label, jobName string) (*Workers, error) { m, err := metrics.New(cfg.PushGateway, label, jobName) if err != nil { - fmt.Printf("[%s] create metrics failed: %v\n", time.Now().Format(time.RFC3339), err) + log.Printf("create metrics failed: %v", err) return nil, err } diff --git a/tests/slo/internal/workers/write.go b/tests/slo/internal/workers/write.go index fb69b7876..17a27cba4 100644 --- a/tests/slo/internal/workers/write.go +++ b/tests/slo/internal/workers/write.go @@ -2,13 +2,12 @@ package workers import ( "context" - "fmt" "sync" - "time" "golang.org/x/time/rate" "slo/internal/generator" + "slo/internal/log" "slo/internal/metrics" ) @@ -28,7 +27,7 @@ func (w *Workers) write(ctx context.Context, gen *generator.Generator) (err erro var row generator.Row row, err = gen.Generate() if err != nil { - fmt.Printf("[%s] generate error: %v\n", time.Now().Format(time.RFC3339), err) + log.Printf("generate error: %v", err) return err } @@ -39,7 +38,7 @@ func (w *Workers) write(ctx context.Context, gen *generator.Generator) (err erro defer func() { m.Stop(err, attempts) if err != nil { - fmt.Printf("[%s] error when stop 'write' worker: %v\n", time.Now().Format(time.RFC3339), err) + log.Printf("error when stop 'write' worker: %v", err) } }() diff --git a/tests/slo/native/query/internal/main.go b/tests/slo/native/query/internal/main.go deleted file mode 100644 index f00a2c06a..000000000 --- a/tests/slo/native/query/internal/main.go +++ /dev/null @@ -1,137 +0,0 @@ -package internal - -import ( - "context" - "fmt" - "os/signal" - "sync" - "syscall" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" - - "slo/internal/config" - "slo/internal/generator" - "slo/internal/workers" -) - -func Main( - label string, - jobName string, -) { - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - defer cancel() - - cfg, err := config.New() - if err != nil { - panic(fmt.Errorf("create config failed: %w", err)) - } - - fmt.Println("program started") - defer fmt.Println("program finished") - - ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) - defer cancel() - - s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, label) - if err != nil { - panic(fmt.Errorf("create storage failed: %w", err)) - } - defer func() { - var ( - shutdownCtx context.Context - shutdownCancel context.CancelFunc - ) - if cfg.ShutdownTime > 0 { - shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), - time.Duration(cfg.ShutdownTime)*time.Second) - } else { - shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) - } - defer shutdownCancel() - - _ = s.Close(shutdownCtx) - }() - - fmt.Println("db init ok") - - switch cfg.Mode { - case config.CreateMode: - err = s.CreateTable(ctx) - if err != nil { - panic(fmt.Errorf("create table failed: %w", err)) - } - fmt.Println("create table ok") - - gen := generator.New(0) - - g := errgroup.Group{} - - for i := uint64(0); i < cfg.InitialDataCount; i++ { - g.Go(func() (err error) { - e, err := gen.Generate() - if err != nil { - return err - } - - _, err = s.Write(ctx, e) - if err != nil { - return err - } - - return nil - }) - } - - err = g.Wait() - if err != nil { - panic(err) - } - - fmt.Println("entries write ok") - case config.CleanupMode: - err = s.DropTable(ctx) - if err != nil { - panic(fmt.Errorf("create table failed: %w", err)) - } - - fmt.Println("cleanup table ok") - case config.RunMode: - gen := generator.New(cfg.InitialDataCount) - - w, err := workers.New(cfg, s, label, jobName) - if err != nil { - panic(fmt.Errorf("create workers failed: %w", err)) - } - defer func() { - err := w.Close() - if err != nil { - panic(fmt.Errorf("workers close failed: %w", err)) - } - fmt.Println("workers close ok") - }() - - wg := sync.WaitGroup{} - - readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1) - wg.Add(cfg.ReadRPS) - for i := 0; i < cfg.ReadRPS; i++ { - go w.Read(ctx, &wg, readRL) - } - - writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1) - wg.Add(cfg.WriteRPS) - for i := 0; i < cfg.WriteRPS; i++ { - go w.Write(ctx, &wg, writeRL, gen) - } - - metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1) - wg.Add(1) - go w.Metrics(ctx, &wg, metricsRL) - - wg.Wait() - default: - panic(fmt.Errorf("unknown mode: %v", cfg.Mode)) - } -} diff --git a/tests/slo/native/query/main.go b/tests/slo/native/query/main.go index 81bb033cd..fb7301bea 100644 --- a/tests/slo/native/query/main.go +++ b/tests/slo/native/query/main.go @@ -1,7 +1,20 @@ package main import ( - "slo/native/query/internal" + "context" + "fmt" + "os/signal" + "sync" + "syscall" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "slo/internal/config" + "slo/internal/generator" + "slo/internal/log" + "slo/internal/workers" ) var ( @@ -10,5 +23,118 @@ var ( ) func main() { - internal.Main(label, jobName) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + defer cancel() + + cfg, err := config.New() + if err != nil { + panic(fmt.Errorf("create config failed: %w", err)) + } + + log.Println("program started") + defer log.Println("program finished") + + ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) + defer cancel() + + s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, label) + if err != nil { + panic(fmt.Errorf("create storage failed: %w", err)) + } + defer func() { + var ( + shutdownCtx context.Context + shutdownCancel context.CancelFunc + ) + if cfg.ShutdownTime > 0 { + shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), + time.Duration(cfg.ShutdownTime)*time.Second) + } else { + shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) + } + defer shutdownCancel() + + _ = s.Close(shutdownCtx) + }() + + log.Println("db init ok") + + switch cfg.Mode { + case config.CreateMode: + err = s.CreateTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + log.Println("create table ok") + + gen := generator.New(0) + + g := errgroup.Group{} + + for i := uint64(0); i < cfg.InitialDataCount; i++ { + g.Go(func() (err error) { + e, err := gen.Generate() + if err != nil { + return err + } + + _, err = s.Write(ctx, e) + if err != nil { + return err + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + panic(err) + } + + log.Println("entries write ok") + case config.CleanupMode: + err = s.DropTable(ctx) + if err != nil { + panic(fmt.Errorf("create table failed: %w", err)) + } + + log.Println("cleanup table ok") + case config.RunMode: + gen := generator.New(cfg.InitialDataCount) + + w, err := workers.New(cfg, s, label, jobName) + if err != nil { + panic(fmt.Errorf("create workers failed: %w", err)) + } + defer func() { + err := w.Close() + if err != nil { + panic(fmt.Errorf("workers close failed: %w", err)) + } + log.Println("workers close ok") + }() + + wg := sync.WaitGroup{} + + readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1) + wg.Add(cfg.ReadRPS) + for i := 0; i < cfg.ReadRPS; i++ { + go w.Read(ctx, &wg, readRL) + } + + writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1) + wg.Add(cfg.WriteRPS) + for i := 0; i < cfg.WriteRPS; i++ { + go w.Write(ctx, &wg, writeRL, gen) + } + + metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1) + wg.Add(1) + go w.Metrics(ctx, &wg, metricsRL) + + wg.Wait() + default: + panic(fmt.Errorf("unknown mode: %v", cfg.Mode)) + } } diff --git a/tests/slo/native/query/internal/storage.go b/tests/slo/native/query/storage.go similarity index 99% rename from tests/slo/native/query/internal/storage.go rename to tests/slo/native/query/storage.go index f234b0f03..be9bf1f35 100755 --- a/tests/slo/native/query/internal/storage.go +++ b/tests/slo/native/query/storage.go @@ -1,4 +1,4 @@ -package internal +package main import ( "context" diff --git a/tests/slo/native/query/with/pool/main.go b/tests/slo/native/query/with/pool/main.go deleted file mode 100644 index d596ef457..000000000 --- a/tests/slo/native/query/with/pool/main.go +++ /dev/null @@ -1,19 +0,0 @@ -package main - -import ( - "os" - - "slo/native/query/internal" -) - -var ( - label string - jobName string -) - -func main() { - if err := os.Setenv("YDB_GO_SDK_QUERY_SERVICE_USE_SESSION_POOL", "true"); err != nil { - panic(err) - } - internal.Main(label, jobName) -} diff --git a/tests/slo/native/table/main.go b/tests/slo/native/table/main.go index 9d5df4e72..422920bf9 100644 --- a/tests/slo/native/table/main.go +++ b/tests/slo/native/table/main.go @@ -13,6 +13,7 @@ import ( "slo/internal/config" "slo/internal/generator" + "slo/internal/log" "slo/internal/workers" ) @@ -30,8 +31,8 @@ func main() { panic(fmt.Errorf("create config failed: %w", err)) } - fmt.Println("program started") - defer fmt.Println("program finished") + log.Println("program started") + defer log.Println("program finished") ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) defer cancel() @@ -48,7 +49,7 @@ func main() { _ = s.close(shutdownCtx) }() - fmt.Println("db init ok") + log.Println("db init ok") switch cfg.Mode { case config.CreateMode: @@ -56,7 +57,7 @@ func main() { if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("create table ok") + log.Println("create table ok") gen := generator.New(0) @@ -83,14 +84,14 @@ func main() { panic(err) } - fmt.Println("entries write ok") + log.Println("entries write ok") case config.CleanupMode: err = s.dropTable(ctx) if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("cleanup table ok") + log.Println("cleanup table ok") case config.RunMode: gen := generator.New(cfg.InitialDataCount) @@ -103,7 +104,7 @@ func main() { if err != nil { panic(fmt.Errorf("workers close failed: %w", err)) } - fmt.Println("workers close ok") + log.Println("workers close ok") }() wg := sync.WaitGroup{} diff --git a/tests/slo/xorm/main.go b/tests/slo/xorm/main.go index 9d5df4e72..422920bf9 100644 --- a/tests/slo/xorm/main.go +++ b/tests/slo/xorm/main.go @@ -13,6 +13,7 @@ import ( "slo/internal/config" "slo/internal/generator" + "slo/internal/log" "slo/internal/workers" ) @@ -30,8 +31,8 @@ func main() { panic(fmt.Errorf("create config failed: %w", err)) } - fmt.Println("program started") - defer fmt.Println("program finished") + log.Println("program started") + defer log.Println("program finished") ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second) defer cancel() @@ -48,7 +49,7 @@ func main() { _ = s.close(shutdownCtx) }() - fmt.Println("db init ok") + log.Println("db init ok") switch cfg.Mode { case config.CreateMode: @@ -56,7 +57,7 @@ func main() { if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("create table ok") + log.Println("create table ok") gen := generator.New(0) @@ -83,14 +84,14 @@ func main() { panic(err) } - fmt.Println("entries write ok") + log.Println("entries write ok") case config.CleanupMode: err = s.dropTable(ctx) if err != nil { panic(fmt.Errorf("create table failed: %w", err)) } - fmt.Println("cleanup table ok") + log.Println("cleanup table ok") case config.RunMode: gen := generator.New(cfg.InitialDataCount) @@ -103,7 +104,7 @@ func main() { if err != nil { panic(fmt.Errorf("workers close failed: %w", err)) } - fmt.Println("workers close ok") + log.Println("workers close ok") }() wg := sync.WaitGroup{}