From 42f02715cc1fbb2eff354f50152edbabd9e41ae9 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 22 Aug 2024 12:26:34 +0300 Subject: [PATCH] trace changes --- CHANGELOG.md | 2 + internal/pool/defaults.go | 28 - internal/pool/pool.go | 148 ++--- internal/pool/pool_test.go | 20 +- internal/pool/stats.go | 7 +- internal/pool/trace.go | 84 +-- internal/query/client.go | 52 +- internal/table/client.go | 10 +- log/table.go | 1106 +++++++++++++++++------------------- metrics/table.go | 52 +- trace/query.go | 15 +- trace/query_gtrace.go | 20 +- trace/table.go | 69 ++- trace/table_gtrace.go | 251 +++++--- 14 files changed, 927 insertions(+), 937 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c07d3930..c8106c84d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Changed `trace.Table` and `trace.Query` traces + ## v3.79.0 * Added commit messages for topic listener * EOF error in RecvMsg is no longer logged diff --git a/internal/pool/defaults.go b/internal/pool/defaults.go index 2591e8438..a6701222d 100644 --- a/internal/pool/defaults.go +++ b/internal/pool/defaults.go @@ -1,31 +1,3 @@ package pool const DefaultLimit = 50 - -var defaultTrace = &Trace{ - OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) { - return func(info *NewDoneInfo) { - } - }, - OnClose: func(info *CloseStartInfo) func(info *CloseDoneInfo) { - return func(info *CloseDoneInfo) { - } - }, - OnTry: func(info *TryStartInfo) func(info *TryDoneInfo) { - return func(info *TryDoneInfo) { - } - }, - OnWith: func(info *WithStartInfo) func(info *WithDoneInfo) { - return func(info *WithDoneInfo) { - } - }, - OnPut: func(info *PutStartInfo) func(info *PutDoneInfo) { - return func(info *PutDoneInfo) { - } - }, - OnGet: func(info *GetStartInfo) func(info *GetDoneInfo) { - return func(info *GetDoneInfo) { - } - }, - OnChange: func(info ChangeInfo) {}, -} diff --git a/internal/pool/pool.go b/internal/pool/pool.go index e0a10c4e1..45f82fbb1 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -77,7 +77,7 @@ func New[PT Item[T], T any]( ) *Pool[PT, T] { p := &Pool[PT, T]{ config: Config[PT, T]{ - trace: defaultTrace, + trace: &Trace{}, limit: DefaultLimit, createItem: defaultCreateItem[T, PT], }, @@ -90,16 +90,16 @@ func New[PT Item[T], T any]( } } - onDone := p.config.trace.OnNew(&NewStartInfo{ - Context: &ctx, - Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.New"), - }) - - defer func() { - onDone(&NewDoneInfo{ - Limit: p.config.limit, - }) - }() + if onNew := p.config.trace.OnNew; onNew != nil { + onDone := onNew(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.New"), + ) + if onDone != nil { + defer func() { + onDone(p.config.limit) + }() + } + } p.createItem = makeCreateItemFunc(p.config, p.done, func(item PT) error { return xsync.WithLock(&p.mu, func() error { @@ -195,14 +195,21 @@ func makeCreateItemFunc[PT Item[T], T any]( //nolint:funlen } } +func (p *Pool[PT, T]) stats() Stats { + return Stats{ + Limit: p.config.limit, + Idle: len(p.idle), + Wait: 0, + CreateInProgress: 0, + } +} + func (p *Pool[PT, T]) onChangeStats() { - p.mu.RLock() - info := ChangeInfo{ - Limit: p.config.limit, - Idle: len(p.idle), + if onChange := p.config.trace.OnChange; onChange != nil { + onChange(xsync.WithRLock(&p.mu, func() Stats { + return p.stats() + })) } - p.mu.RUnlock() - p.config.trace.OnChange(info) } func (p *Pool[PT, T]) Stats() Stats { @@ -229,18 +236,19 @@ func (p *Pool[PT, T]) getItemFromIdle() (item PT) { return item } -func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) { - onDone := p.config.trace.OnGet(&GetStartInfo{ - Context: &ctx, - Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"), - }) - defer func() { - onDone(&GetDoneInfo{ - Error: finalErr, - }) - }() +func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { + if onGet := p.config.trace.OnGet; onGet != nil { + onDone := onGet(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"), + ) + if onDone != nil { + defer func() { + onDone(item, 0, finalErr) + }() + } + } - item := p.getItemFromIdle() + item = p.getItemFromIdle() if item != nil { if item.IsAlive() { @@ -267,16 +275,17 @@ func (p *Pool[PT, T]) appendItemToIdle(item PT) { } func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) { - onDone := p.config.trace.OnPut(&PutStartInfo{ - Context: &ctx, - Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"), - }) - defer func() { - onDone(&PutDoneInfo{ - Error: finalErr, - }) - }() - + if onPut := p.config.trace.OnPut; onPut != nil { + onDone := onPut(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"), + item, + ) + if onDone != nil { + defer func() { + onDone(finalErr) + }() + } + } if !item.IsAlive() { p.closeItem(ctx, item) @@ -324,15 +333,16 @@ func makeAsyncCloseItemFunc[PT Item[T], T any]( } func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) { - onDone := p.config.trace.OnTry(&TryStartInfo{ - Context: &ctx, - Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"), - }) - defer func() { - onDone(&TryDoneInfo{ - Error: finalErr, - }) - }() + if onTry := p.config.trace.OnTry; onTry != nil { + onDone := onTry(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"), + ) + if onDone != nil { + defer func() { + onDone(finalErr) + }() + } + } select { case <-p.done: @@ -373,19 +383,18 @@ func (p *Pool[PT, T]) With( f func(ctx context.Context, item PT) error, opts ...retry.Option, ) (finalErr error) { - var ( - onDone = p.config.trace.OnWith(&WithStartInfo{ - Context: &ctx, - Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"), - }) - attempts int - ) - defer func() { - onDone(&WithDoneInfo{ - Error: finalErr, - Attempts: attempts, - }) - }() + var attempts int + + if onWith := p.config.trace.OnWith; onWith != nil { + onDone := onWith(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"), + ) + if onDone != nil { + defer func() { + onDone(attempts, finalErr) + }() + } + } err := retry.Retry(ctx, func(ctx context.Context) error { err := p.try(ctx, f) @@ -409,15 +418,16 @@ func (p *Pool[PT, T]) With( } func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) { - onDone := p.config.trace.OnClose(&CloseStartInfo{ - Context: &ctx, - Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).Close"), - }) - defer func() { - onDone(&CloseDoneInfo{ - Error: finalErr, - }) - }() + if onClose := p.config.trace.OnClose; onClose != nil { + onDone := onClose(&ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).Close"), + ) + if onDone != nil { + defer func() { + onDone(finalErr) + }() + } + } close(p.done) diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index ac3c97236..a0d9d40fc 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -268,11 +268,12 @@ func TestPool(t *testing.T) { }) t.Run("Stress", func(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { - trace := *defaultTrace - trace.OnChange = func(info ChangeInfo) { - require.GreaterOrEqual(t, info.Limit, info.Idle) + trace := &Trace{ + OnChange: func(info Stats) { + require.GreaterOrEqual(t, info.Limit, info.Idle) + }, } - p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace)) + p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace)) var wg sync.WaitGroup wg.Add(DefaultLimit*2 + 1) for range make([]struct{}, DefaultLimit*2) { @@ -297,12 +298,13 @@ func TestPool(t *testing.T) { }) t.Run("ParallelCreation", func(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { - trace := *defaultTrace - trace.OnChange = func(info ChangeInfo) { - require.Equal(t, DefaultLimit, info.Limit) - require.LessOrEqual(t, info.Idle, DefaultLimit) + trace := &Trace{ + OnChange: func(info Stats) { + require.Equal(t, DefaultLimit, info.Limit) + require.LessOrEqual(t, info.Idle, DefaultLimit) + }, } - p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace)) + p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace)) var wg sync.WaitGroup for range make([]struct{}, DefaultLimit*10) { wg.Add(1) diff --git a/internal/pool/stats.go b/internal/pool/stats.go index b02dfdbd0..69c3e3eef 100644 --- a/internal/pool/stats.go +++ b/internal/pool/stats.go @@ -1,6 +1,9 @@ package pool type Stats struct { - Limit int - Idle int + Limit int + Index int + Idle int + Wait int + CreateInProgress int } diff --git a/internal/pool/trace.go b/internal/pool/trace.go index 4ad825134..ae0300210 100644 --- a/internal/pool/trace.go +++ b/internal/pool/trace.go @@ -8,81 +8,13 @@ import ( type ( Trace struct { - OnNew func(*NewStartInfo) func(*NewDoneInfo) - OnClose func(*CloseStartInfo) func(*CloseDoneInfo) - OnTry func(*TryStartInfo) func(*TryDoneInfo) - OnWith func(*WithStartInfo) func(*WithDoneInfo) - OnPut func(*PutStartInfo) func(*PutDoneInfo) - OnGet func(*GetStartInfo) func(*GetDoneInfo) - OnChange func(ChangeInfo) + OnNew func(ctx *context.Context, call stack.Caller) func(limit int) + OnClose func(ctx *context.Context, call stack.Caller) func(err error) + OnTry func(ctx *context.Context, call stack.Caller) func(err error) + OnWith func(ctx *context.Context, call stack.Caller) func(attempts int, err error) + OnPut func(ctx *context.Context, call stack.Caller, item any) func(err error) + OnGet func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) + onWait func() func(item any, err error) //nolint:unused + OnChange func(Stats) } - NewStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - NewDoneInfo struct { - Limit int - } - CloseStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - CloseDoneInfo struct { - Error error - } - TryStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - TryDoneInfo struct { - Error error - } - WithStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - WithDoneInfo struct { - Error error - - Attempts int - } - PutStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - PutDoneInfo struct { - Error error - } - GetStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - GetDoneInfo struct { - Error error - } - ChangeInfo = Stats ) diff --git a/internal/query/client.go b/internal/query/client.go index 556aea07d..bd8d856b4 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -589,50 +589,50 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con func poolTrace(t *trace.Query) *pool.Trace { return &pool.Trace{ - OnNew: func(info *pool.NewStartInfo) func(*pool.NewDoneInfo) { - onDone := trace.QueryOnPoolNew(t, info.Context, info.Call) + OnNew: func(ctx *context.Context, call stack.Caller) func(limit int) { + onDone := trace.QueryOnPoolNew(t, ctx, call) - return func(info *pool.NewDoneInfo) { - onDone(info.Limit) + return func(limit int) { + onDone(limit) } }, - OnClose: func(info *pool.CloseStartInfo) func(*pool.CloseDoneInfo) { - onDone := trace.QueryOnClose(t, info.Context, info.Call) + OnClose: func(ctx *context.Context, call stack.Caller) func(err error) { + onDone := trace.QueryOnClose(t, ctx, call) - return func(info *pool.CloseDoneInfo) { - onDone(info.Error) + return func(err error) { + onDone(err) } }, - OnTry: func(info *pool.TryStartInfo) func(*pool.TryDoneInfo) { - onDone := trace.QueryOnPoolTry(t, info.Context, info.Call) + OnTry: func(ctx *context.Context, call stack.Caller) func(err error) { + onDone := trace.QueryOnPoolTry(t, ctx, call) - return func(info *pool.TryDoneInfo) { - onDone(info.Error) + return func(err error) { + onDone(err) } }, - OnWith: func(info *pool.WithStartInfo) func(*pool.WithDoneInfo) { - onDone := trace.QueryOnPoolWith(t, info.Context, info.Call) + OnWith: func(ctx *context.Context, call stack.Caller) func(attempts int, err error) { + onDone := trace.QueryOnPoolWith(t, ctx, call) - return func(info *pool.WithDoneInfo) { - onDone(info.Error, info.Attempts) + return func(attempts int, err error) { + onDone(attempts, err) } }, - OnPut: func(info *pool.PutStartInfo) func(*pool.PutDoneInfo) { - onDone := trace.QueryOnPoolPut(t, info.Context, info.Call) + OnPut: func(ctx *context.Context, call stack.Caller, item any) func(err error) { + onDone := trace.QueryOnPoolPut(t, ctx, call, item.(*Session)) //nolint:forcetypeassert - return func(info *pool.PutDoneInfo) { - onDone(info.Error) + return func(err error) { + onDone(err) } }, - OnGet: func(info *pool.GetStartInfo) func(*pool.GetDoneInfo) { - onDone := trace.QueryOnPoolGet(t, info.Context, info.Call) + OnGet: func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) { + onDone := trace.QueryOnPoolGet(t, ctx, call) - return func(info *pool.GetDoneInfo) { - onDone(info.Error) + return func(item any, attempts int, err error) { + onDone(item.(*Session), attempts, err) //nolint:forcetypeassert } }, - OnChange: func(info pool.ChangeInfo) { - trace.QueryOnPoolChange(t, info.Limit, info.Idle) + OnChange: func(stats pool.Stats) { + trace.QueryOnPoolChange(t, stats.Limit, stats.Index, stats.Idle, stats.Wait, stats.CreateInProgress) }, } } diff --git a/internal/table/client.go b/internal/table/client.go index 0d03e4cb5..a30e31a3c 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -289,7 +289,7 @@ func (c *Client) isClosed() bool { } // c.mu must NOT be held. -func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err error) { +func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err error) { //nolint:funlen if c.isClosed() { return nil, errClosedClient } @@ -322,7 +322,9 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err touched: c.clock.Now(), } trace.TableOnPoolSessionAdd(c.config.Trace(), s) - trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append") + trace.TableOnPoolStateChange(c.config.Trace(), + c.limit, len(c.index), c.idle.Len(), c.waitQ.Len(), c.createInProgress, len(c.index), + ) }) }), withCreateSessionOnClose(func(s *session) { c.mu.WithLock(func() { @@ -334,7 +336,9 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err delete(c.index, s) trace.TableOnPoolSessionRemove(c.config.Trace(), s) - trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove") + trace.TableOnPoolStateChange(c.config.Trace(), + c.limit, len(c.index), c.idle.Len(), c.waitQ.Len(), c.createInProgress, len(c.index), + ) if !c.isClosed() { c.internalPoolNotify(nil) diff --git a/log/table.go b/log/table.go index 01e54d4b9..11a19f1c7 100644 --- a/log/table.go +++ b/log/table.go @@ -16,648 +16,610 @@ func Table(l Logger, d trace.Detailer, opts ...Option) (t trace.Table) { //nolint:gocyclo,funlen func internalTable(l *wrapper, d trace.Detailer) (t trace.Table) { - t.OnDo = func( - info trace.TableDoStartInfo, - ) func( - trace.TableDoDoneInfo, - ) { - if d.Details()&trace.TablePoolAPIEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "do") - idempotent := info.Idempotent - label := info.Label - l.Log(ctx, "start", - Bool("idempotent", idempotent), - String("label", label), - ) - start := time.Now() + return trace.Table{ + OnInit: func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) { + if d.Details()&trace.TableEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "init") + l.Log(ctx, "start") + start := time.Now() - return func(info trace.TableDoDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", + return func(info trace.TableInitDoneInfo) { + l.Log(WithLevel(ctx, INFO), "done", latencyField(start), - Bool("idempotent", idempotent), - String("label", label), - Int("attempts", info.Attempts), + Int("size_max", info.Limit), ) - } else { - lvl := ERROR - if !xerrors.IsYdb(info.Error) { - lvl = DEBUG + } + }, + OnClose: func(info trace.TableCloseStartInfo) func(trace.TableCloseDoneInfo) { + if d.Details()&trace.TableEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "close") + l.Log(ctx, "start") + start := time.Now() + + return func(info trace.TableCloseDoneInfo) { + if info.Error == nil { + l.Log(WithLevel(ctx, INFO), "done", + latencyField(start), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + latencyField(start), + Error(info.Error), + versionField(), + ) } - m := retry.Check(info.Error) - l.Log(WithLevel(ctx, lvl), "done", - latencyField(start), - Bool("idempotent", idempotent), - String("label", label), - Int("attempts", info.Attempts), - Error(info.Error), - Bool("retryable", m.MustRetry(idempotent)), - Int64("code", m.StatusCode()), - Bool("deleteSession", m.IsRetryObjectValid()), - versionField(), - ) } - } - } - t.OnDoTx = func( - info trace.TableDoTxStartInfo, - ) func( - trace.TableDoTxDoneInfo, - ) { - if d.Details()&trace.TablePoolAPIEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "do", "tx") - idempotent := info.Idempotent - label := info.Label - l.Log(ctx, "start", - Bool("idempotent", idempotent), - String("label", label), - ) - start := time.Now() + }, + OnDo: func( + info trace.TableDoStartInfo, + ) func( + trace.TableDoDoneInfo, + ) { + if d.Details()&trace.TablePoolAPIEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "do") + idempotent := info.Idempotent + label := info.Label + l.Log(ctx, "start", + Bool("idempotent", idempotent), + String("label", label), + ) + start := time.Now() - return func(info trace.TableDoTxDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - Bool("idempotent", idempotent), - String("label", label), - Int("attempts", info.Attempts), - ) - } else { - lvl := WARN - if !xerrors.IsYdb(info.Error) { - lvl = DEBUG + return func(info trace.TableDoDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), + Bool("idempotent", idempotent), + String("label", label), + Int("attempts", info.Attempts), + ) + } else { + lvl := ERROR + if !xerrors.IsYdb(info.Error) { + lvl = DEBUG + } + m := retry.Check(info.Error) + l.Log(WithLevel(ctx, lvl), "done", + latencyField(start), + Bool("idempotent", idempotent), + String("label", label), + Int("attempts", info.Attempts), + Error(info.Error), + Bool("retryable", m.MustRetry(idempotent)), + Int64("code", m.StatusCode()), + Bool("deleteSession", m.IsRetryObjectValid()), + versionField(), + ) } - m := retry.Check(info.Error) - l.Log(WithLevel(ctx, lvl), "done", - latencyField(start), - Bool("idempotent", idempotent), - String("label", label), - Int("attempts", info.Attempts), - Error(info.Error), - Bool("retryable", m.MustRetry(idempotent)), - Int64("code", m.StatusCode()), - Bool("deleteSession", m.IsRetryObjectValid()), - versionField(), - ) } - } - } - t.OnCreateSession = func( - info trace.TableCreateSessionStartInfo, - ) func( - trace.TableCreateSessionDoneInfo, - ) { - if d.Details()&trace.TablePoolAPIEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "create", "session") - l.Log(ctx, "start") - start := time.Now() + }, + OnDoTx: func( + info trace.TableDoTxStartInfo, + ) func( + trace.TableDoTxDoneInfo, + ) { + if d.Details()&trace.TablePoolAPIEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "do", "tx") + idempotent := info.Idempotent + label := info.Label + l.Log(ctx, "start", + Bool("idempotent", idempotent), + String("label", label), + ) + start := time.Now() - return func(info trace.TableCreateSessionDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - Int("attempts", info.Attempts), - String("session_id", info.Session.ID()), - String("session_status", info.Session.Status()), - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - latencyField(start), - Int("attempts", info.Attempts), - Error(info.Error), - versionField(), - ) + return func(info trace.TableDoTxDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), + Bool("idempotent", idempotent), + String("label", label), + Int("attempts", info.Attempts), + ) + } else { + lvl := WARN + if !xerrors.IsYdb(info.Error) { + lvl = DEBUG + } + m := retry.Check(info.Error) + l.Log(WithLevel(ctx, lvl), "done", + latencyField(start), + Bool("idempotent", idempotent), + String("label", label), + Int("attempts", info.Attempts), + Error(info.Error), + Bool("retryable", m.MustRetry(idempotent)), + Int64("code", m.StatusCode()), + Bool("deleteSession", m.IsRetryObjectValid()), + versionField(), + ) + } } - } - } - t.OnSessionNew = func(info trace.TableSessionNewStartInfo) func(trace.TableSessionNewDoneInfo) { - if d.Details()&trace.TableSessionEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "new") - l.Log(ctx, "start") - start := time.Now() + }, + OnCreateSession: func( + info trace.TableCreateSessionStartInfo, + ) func( + trace.TableCreateSessionDoneInfo, + ) { + if d.Details()&trace.TablePoolAPIEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "create", "session") + l.Log(ctx, "start") + start := time.Now() - return func(info trace.TableSessionNewDoneInfo) { - if info.Error == nil { - if info.Session != nil { + return func(info trace.TableCreateSessionDoneInfo) { + if info.Error == nil { l.Log(ctx, "done", latencyField(start), - String("id", info.Session.ID()), + Int("attempts", info.Attempts), + String("session_id", info.Session.ID()), + String("session_status", info.Session.Status()), ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + latencyField(start), + Int("attempts", info.Attempts), + Error(info.Error), + versionField(), + ) + } + } + }, + OnSessionNew: func(info trace.TableSessionNewStartInfo) func(trace.TableSessionNewDoneInfo) { + if d.Details()&trace.TableSessionEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "new") + l.Log(ctx, "start") + start := time.Now() + + return func(info trace.TableSessionNewDoneInfo) { + if info.Error == nil { + if info.Session != nil { + l.Log(ctx, "done", + latencyField(start), + String("id", info.Session.ID()), + ) + } else { + l.Log(WithLevel(ctx, WARN), "failed", + latencyField(start), + versionField(), + ) + } } else { l.Log(WithLevel(ctx, WARN), "failed", latencyField(start), + Error(info.Error), versionField(), ) } - } else { - l.Log(WithLevel(ctx, WARN), "failed", - latencyField(start), - Error(info.Error), - versionField(), - ) } - } - } - t.OnSessionDelete = func(info trace.TableSessionDeleteStartInfo) func(trace.TableSessionDeleteDoneInfo) { - if d.Details()&trace.TableSessionEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "delete") - session := info.Session - l.Log(ctx, "start", - String("id", info.Session.ID()), - String("status", info.Session.Status()), - ) - start := time.Now() + }, + OnSessionDelete: func(info trace.TableSessionDeleteStartInfo) func(trace.TableSessionDeleteDoneInfo) { + if d.Details()&trace.TableSessionEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "delete") + session := info.Session + l.Log(ctx, "start", + String("id", info.Session.ID()), + String("status", info.Session.Status()), + ) + start := time.Now() - return func(info trace.TableSessionDeleteDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - ) - } else { - l.Log(WithLevel(ctx, WARN), "failed", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - Error(info.Error), - versionField(), - ) + return func(info trace.TableSessionDeleteDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + ) + } else { + l.Log(WithLevel(ctx, WARN), "failed", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + Error(info.Error), + versionField(), + ) + } } - } - } - t.OnSessionKeepAlive = func(info trace.TableKeepAliveStartInfo) func(trace.TableKeepAliveDoneInfo) { - if d.Details()&trace.TableSessionEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "keep", "alive") - session := info.Session - l.Log(ctx, "start", - String("id", session.ID()), - String("status", session.Status()), - ) - start := time.Now() + }, + OnSessionKeepAlive: func(info trace.TableKeepAliveStartInfo) func(trace.TableKeepAliveDoneInfo) { + if d.Details()&trace.TableSessionEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "keep", "alive") + session := info.Session + l.Log(ctx, "start", + String("id", session.ID()), + String("status", session.Status()), + ) + start := time.Now() - return func(info trace.TableKeepAliveDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), + return func(info trace.TableKeepAliveDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + ) + } else { + l.Log(WithLevel(ctx, WARN), "failed", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + Error(info.Error), + versionField(), + ) + } + } + }, + OnSessionQueryPrepare: func( + info trace.TablePrepareDataQueryStartInfo, + ) func( + trace.TablePrepareDataQueryDoneInfo, + ) { + if d.Details()&trace.TableSessionQueryInvokeEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "prepare") + session := info.Session + query := info.Query + l.Log(ctx, "start", + appendFieldByCondition(l.logQuery, + String("query", info.Query), String("id", session.ID()), String("status", session.Status()), - ) - } else { - l.Log(WithLevel(ctx, WARN), "failed", - latencyField(start), + )..., + ) + start := time.Now() + + return func(info trace.TablePrepareDataQueryDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + appendFieldByCondition(l.logQuery, + Stringer("result", info.Result), + appendFieldByCondition(l.logQuery, + String("query", query), + String("id", session.ID()), + String("status", session.Status()), + latencyField(start), + )..., + )..., + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + appendFieldByCondition(l.logQuery, + String("query", query), + Error(info.Error), + String("id", session.ID()), + String("status", session.Status()), + latencyField(start), + versionField(), + )..., + ) + } + } + }, + OnSessionQueryExecute: func( + info trace.TableExecuteDataQueryStartInfo, + ) func( + trace.TableExecuteDataQueryDoneInfo, + ) { + if d.Details()&trace.TableSessionQueryInvokeEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "execute") + session := info.Session + query := info.Query + l.Log(ctx, "start", + appendFieldByCondition(l.logQuery, + Stringer("query", info.Query), String("id", session.ID()), String("status", session.Status()), - Error(info.Error), - versionField(), - ) + )..., + ) + start := time.Now() + + return func(info trace.TableExecuteDataQueryDoneInfo) { + if info.Error == nil { + tx := info.Tx + l.Log(ctx, "done", + appendFieldByCondition(l.logQuery, + Stringer("query", query), + String("id", session.ID()), + String("tx", tx.ID()), + String("status", session.Status()), + Bool("prepared", info.Prepared), + NamedError("result_err", info.Result.Err()), + latencyField(start), + )..., + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + appendFieldByCondition(l.logQuery, + Stringer("query", query), + Error(info.Error), + String("id", session.ID()), + String("status", session.Status()), + Bool("prepared", info.Prepared), + latencyField(start), + versionField(), + )..., + ) + } } - } - } - t.OnSessionQueryPrepare = func( - info trace.TablePrepareDataQueryStartInfo, - ) func( - trace.TablePrepareDataQueryDoneInfo, - ) { - if d.Details()&trace.TableSessionQueryInvokeEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "prepare") - session := info.Session - query := info.Query - l.Log(ctx, "start", - appendFieldByCondition(l.logQuery, - String("query", info.Query), - String("id", session.ID()), - String("status", session.Status()), - )..., - ) - start := time.Now() + }, + OnSessionQueryStreamExecute: func( + info trace.TableSessionQueryStreamExecuteStartInfo, + ) func( + trace.TableSessionQueryStreamExecuteDoneInfo, + ) { + if d.Details()&trace.TableSessionQueryStreamEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "stream", "execute") + session := info.Session + query := info.Query + l.Log(ctx, "start", + appendFieldByCondition(l.logQuery, + Stringer("query", info.Query), + String("id", session.ID()), + String("status", session.Status()), + )..., + ) + start := time.Now() - return func(info trace.TablePrepareDataQueryDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - appendFieldByCondition(l.logQuery, - Stringer("result", info.Result), + return func(info trace.TableSessionQueryStreamExecuteDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", appendFieldByCondition(l.logQuery, - String("query", query), + Stringer("query", query), + Error(info.Error), String("id", session.ID()), String("status", session.Status()), latencyField(start), )..., - )..., - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - appendFieldByCondition(l.logQuery, - String("query", query), - Error(info.Error), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + appendFieldByCondition(l.logQuery, + Stringer("query", query), + Error(info.Error), + String("id", session.ID()), + String("status", session.Status()), + latencyField(start), + versionField(), + )..., + ) + } + } + }, + OnSessionQueryStreamRead: func( + info trace.TableSessionQueryStreamReadStartInfo, + ) func( + trace.TableSessionQueryStreamReadDoneInfo, + ) { + if d.Details()&trace.TableSessionQueryStreamEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "stream", "read") + session := info.Session + l.Log(ctx, "start", + String("id", session.ID()), + String("status", session.Status()), + ) + start := time.Now() + + return func(info trace.TableSessionQueryStreamReadDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), String("id", session.ID()), String("status", session.Status()), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + Error(info.Error), versionField(), - )..., - ) + ) + } } - } - } - t.OnSessionQueryExecute = func( - info trace.TableExecuteDataQueryStartInfo, - ) func( - trace.TableExecuteDataQueryDoneInfo, - ) { - if d.Details()&trace.TableSessionQueryInvokeEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "execute") - session := info.Session - query := info.Query - l.Log(ctx, "start", - appendFieldByCondition(l.logQuery, - Stringer("query", info.Query), + }, + OnTxBegin: func( + info trace.TableTxBeginStartInfo, + ) func( + trace.TableTxBeginDoneInfo, + ) { + if d.Details()&trace.TableSessionTransactionEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "tx", "begin") + session := info.Session + l.Log(ctx, "start", String("id", session.ID()), String("status", session.Status()), - )..., - ) - start := time.Now() + ) + start := time.Now() - return func(info trace.TableExecuteDataQueryDoneInfo) { - if info.Error == nil { - tx := info.Tx - l.Log(ctx, "done", - appendFieldByCondition(l.logQuery, - Stringer("query", query), + return func(info trace.TableTxBeginDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), String("id", session.ID()), - String("tx", tx.ID()), String("status", session.Status()), - Bool("prepared", info.Prepared), - NamedError("result_err", info.Result.Err()), + String("tx", info.Tx.ID()), + ) + } else { + l.Log(WithLevel(ctx, WARN), "failed", latencyField(start), - )..., - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - appendFieldByCondition(l.logQuery, - Stringer("query", query), - Error(info.Error), String("id", session.ID()), String("status", session.Status()), - Bool("prepared", info.Prepared), - latencyField(start), + Error(info.Error), versionField(), - )..., - ) + ) + } } - } - } - t.OnSessionQueryStreamExecute = func( - info trace.TableSessionQueryStreamExecuteStartInfo, - ) func( - trace.TableSessionQueryStreamExecuteDoneInfo, - ) { - if d.Details()&trace.TableSessionQueryStreamEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "stream", "execute") - session := info.Session - query := info.Query - l.Log(ctx, "start", - appendFieldByCondition(l.logQuery, - Stringer("query", info.Query), + }, + OnTxCommit: func(info trace.TableTxCommitStartInfo) func(trace.TableTxCommitDoneInfo) { + if d.Details()&trace.TableSessionTransactionEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "tx", "commit") + session := info.Session + tx := info.Tx + l.Log(ctx, "start", String("id", session.ID()), String("status", session.Status()), - )..., - ) - start := time.Now() + String("tx", info.Tx.ID()), + ) + start := time.Now() - return func(info trace.TableSessionQueryStreamExecuteDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - appendFieldByCondition(l.logQuery, - Stringer("query", query), - Error(info.Error), + return func(info trace.TableTxCommitDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), String("id", session.ID()), String("status", session.Status()), + String("tx", tx.ID()), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", latencyField(start), - )..., - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - appendFieldByCondition(l.logQuery, - Stringer("query", query), - Error(info.Error), String("id", session.ID()), String("status", session.Status()), - latencyField(start), + String("tx", tx.ID()), + Error(info.Error), versionField(), - )..., - ) + ) + } } - } - } - t.OnSessionQueryStreamRead = func( - info trace.TableSessionQueryStreamReadStartInfo, - ) func( - trace.TableSessionQueryStreamReadDoneInfo, - ) { - if d.Details()&trace.TableSessionQueryStreamEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "query", "stream", "read") - session := info.Session - l.Log(ctx, "start", - String("id", session.ID()), - String("status", session.Status()), - ) - start := time.Now() - - return func(info trace.TableSessionQueryStreamReadDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - Error(info.Error), - versionField(), - ) + }, + OnTxRollback: func( + info trace.TableTxRollbackStartInfo, + ) func( + trace.TableTxRollbackDoneInfo, + ) { + if d.Details()&trace.TableSessionTransactionEvents == 0 { + return nil } - } - } - t.OnTxBegin = func( - info trace.TableTxBeginStartInfo, - ) func( - trace.TableTxBeginDoneInfo, - ) { - if d.Details()&trace.TableSessionTransactionEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "tx", "begin") - session := info.Session - l.Log(ctx, "start", - String("id", session.ID()), - String("status", session.Status()), - ) - start := time.Now() - - return func(info trace.TableTxBeginDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - String("tx", info.Tx.ID()), - ) - } else { - l.Log(WithLevel(ctx, WARN), "failed", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - Error(info.Error), - versionField(), - ) - } - } - } - t.OnTxCommit = func(info trace.TableTxCommitStartInfo) func(trace.TableTxCommitDoneInfo) { - if d.Details()&trace.TableSessionTransactionEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "tx", "commit") - session := info.Session - tx := info.Tx - l.Log(ctx, "start", - String("id", session.ID()), - String("status", session.Status()), - String("tx", info.Tx.ID()), - ) - start := time.Now() + ctx := with(*info.Context, TRACE, "ydb", "table", "session", "tx", "rollback") + session := info.Session + tx := info.Tx + l.Log(ctx, "start", + String("id", session.ID()), + String("status", session.Status()), + String("tx", tx.ID()), + ) + start := time.Now() - return func(info trace.TableTxCommitDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - String("tx", tx.ID()), - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - String("tx", tx.ID()), - Error(info.Error), - versionField(), - ) + return func(info trace.TableTxRollbackDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + String("tx", tx.ID()), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + String("tx", tx.ID()), + Error(info.Error), + versionField(), + ) + } } - } - } - t.OnTxRollback = func( - info trace.TableTxRollbackStartInfo, - ) func( - trace.TableTxRollbackDoneInfo, - ) { - if d.Details()&trace.TableSessionTransactionEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "session", "tx", "rollback") - session := info.Session - tx := info.Tx - l.Log(ctx, "start", - String("id", session.ID()), - String("status", session.Status()), - String("tx", tx.ID()), - ) - start := time.Now() - - return func(info trace.TableTxRollbackDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - String("tx", tx.ID()), - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - String("tx", tx.ID()), - Error(info.Error), - versionField(), - ) + }, + OnPoolPut: func(info trace.TablePoolPutStartInfo) func(trace.TablePoolPutDoneInfo) { + if d.Details()&trace.TablePoolAPIEvents == 0 { + return nil } - } - } - t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) { - if d.Details()&trace.TableEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "init") - l.Log(ctx, "start") - start := time.Now() - - return func(info trace.TableInitDoneInfo) { - l.Log(WithLevel(ctx, INFO), "done", - latencyField(start), - Int("size_max", info.Limit), + ctx := with(*info.Context, TRACE, "ydb", "table", "pool", "put") + session := info.Session + l.Log(ctx, "start", + String("id", session.ID()), + String("status", session.Status()), ) - } - } - t.OnClose = func(info trace.TableCloseStartInfo) func(trace.TableCloseDoneInfo) { - if d.Details()&trace.TableEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "close") - l.Log(ctx, "start") - start := time.Now() + start := time.Now() - return func(info trace.TableCloseDoneInfo) { - if info.Error == nil { - l.Log(WithLevel(ctx, INFO), "done", - latencyField(start), - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - latencyField(start), - Error(info.Error), - versionField(), - ) - } - } - } - t.OnPoolStateChange = func(info trace.TablePoolStateChangeInfo) { - if d.Details()&trace.TablePoolLifeCycleEvents == 0 { - return - } - ctx := with(context.Background(), TRACE, "ydb", "table", "pool", "state", "change") - l.Log(WithLevel(ctx, DEBUG), "", - Int("size", info.Size), - String("event", info.Event), - ) - } - t.OnPoolSessionAdd = func(info trace.TablePoolSessionAddInfo) { - if d.Details()&trace.TablePoolLifeCycleEvents == 0 { - return - } - ctx := with(context.Background(), TRACE, "ydb", "table", "pool", "session", "add") - l.Log(ctx, "start", - String("id", info.Session.ID()), - String("status", info.Session.Status()), - ) - } - t.OnPoolSessionRemove = func(info trace.TablePoolSessionRemoveInfo) { - if d.Details()&trace.TablePoolLifeCycleEvents == 0 { - return - } - ctx := with(context.Background(), TRACE, "ydb", "table", "pool", "session", "remove") - l.Log(ctx, "start", - String("id", info.Session.ID()), - String("status", info.Session.Status()), - ) - } - t.OnPoolPut = func(info trace.TablePoolPutStartInfo) func(trace.TablePoolPutDoneInfo) { - if d.Details()&trace.TablePoolAPIEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "pool", "put") - session := info.Session - l.Log(ctx, "start", - String("id", session.ID()), - String("status", session.Status()), - ) - start := time.Now() - - return func(info trace.TablePoolPutDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - ) - } else { - l.Log(WithLevel(ctx, ERROR), "failed", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - Error(info.Error), - versionField(), - ) + return func(info trace.TablePoolPutDoneInfo) { + if info.Error == nil { + l.Log(ctx, "done", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + ) + } else { + l.Log(WithLevel(ctx, ERROR), "failed", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + Error(info.Error), + versionField(), + ) + } } - } - } - t.OnPoolGet = func(info trace.TablePoolGetStartInfo) func(trace.TablePoolGetDoneInfo) { - if d.Details()&trace.TablePoolAPIEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "pool", "get") - l.Log(ctx, "start") - start := time.Now() - - return func(info trace.TablePoolGetDoneInfo) { - if info.Error == nil { - session := info.Session - l.Log(ctx, "done", - latencyField(start), - String("id", session.ID()), - String("status", session.Status()), - Int("attempts", info.Attempts), - ) - } else { - l.Log(WithLevel(ctx, WARN), "failed", - latencyField(start), - Int("attempts", info.Attempts), - Error(info.Error), - versionField(), - ) + }, + OnPoolGet: func(info trace.TablePoolGetStartInfo) func(trace.TablePoolGetDoneInfo) { + if d.Details()&trace.TablePoolAPIEvents == 0 { + return nil } - } - } - t.OnPoolWait = func(info trace.TablePoolWaitStartInfo) func(trace.TablePoolWaitDoneInfo) { - if d.Details()&trace.TablePoolAPIEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "table", "pool", "wait") - l.Log(ctx, "start") - start := time.Now() + ctx := with(*info.Context, TRACE, "ydb", "table", "pool", "get") + l.Log(ctx, "start") + start := time.Now() - return func(info trace.TablePoolWaitDoneInfo) { - fields := []Field{ - latencyField(start), - } - if info.Session != nil { - fields = append(fields, - String("id", info.Session.ID()), - String("status", info.Session.Status()), - ) + return func(info trace.TablePoolGetDoneInfo) { + if info.Error == nil { + session := info.Session + l.Log(ctx, "done", + latencyField(start), + String("id", session.ID()), + String("status", session.Status()), + Int("attempts", info.Attempts), + ) + } else { + l.Log(WithLevel(ctx, WARN), "failed", + latencyField(start), + Int("attempts", info.Attempts), + Error(info.Error), + versionField(), + ) + } } - if info.Error == nil { - l.Log(ctx, "done", fields...) - } else { - fields = append(fields, Error(info.Error)) - l.Log(WithLevel(ctx, WARN), "failed", fields...) + }, + OnPoolStateChange: func(info trace.TablePoolStateChangeInfo) { + if d.Details()&trace.TablePoolLifeCycleEvents == 0 { + return } - } + ctx := with(context.Background(), TRACE, "ydb", "table", "pool", "state", "change") + l.Log(WithLevel(ctx, DEBUG), "", + Int("limit", info.Limit), + Int("index", info.Index), + Int("idle", info.Idle), + Int("wait", info.Wait), + Int("create_in_progress", info.CreateInProgress), + ) + }, + OnSessionBulkUpsert: nil, + OnSessionQueryExplain: nil, + OnTxExecute: nil, + OnTxExecuteStatement: nil, + OnPoolWith: nil, } - - return t } diff --git a/metrics/table.go b/metrics/table.go index 18c0468bb..721ee2bc0 100644 --- a/metrics/table.go +++ b/metrics/table.go @@ -1,8 +1,6 @@ package metrics import ( - "fmt" - "sync" "time" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -15,11 +13,13 @@ func table(config Config) (t trace.Table) { session := config.WithSystem("session") config = config.WithSystem("pool") limit := config.GaugeVec("limit") - size := config.GaugeVec("size") - inflight := config.GaugeVec("inflight") - inflightLatency := config.WithSystem("inflight").TimerVec("latency") + index := config.GaugeVec("index") + idle := config.GaugeVec("idle") wait := config.GaugeVec("wait") - waitLatency := config.WithSystem("wait").TimerVec("latency") + createInProgress := config.GaugeVec("createInProgress") + get := config.CounterVec("get") + put := config.CounterVec("put") + with := config.GaugeVec("with") t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) { return func(info trace.TableInitDoneInfo) { limit.With(nil).Set(float64(info.Limit)) @@ -43,46 +43,38 @@ func table(config Config) (t trace.Table) { return nil } - t.OnPoolSessionAdd = func(info trace.TablePoolSessionAddInfo) { + t.OnPoolWith = func(info trace.TablePoolWithStartInfo) func(trace.TablePoolWithDoneInfo) { if config.Details()&trace.TablePoolEvents != 0 { - size.With(nil).Add(1) + with.With(nil).Add(1) } - } - t.OnPoolSessionRemove = func(info trace.TablePoolSessionRemoveInfo) { - if config.Details()&trace.TablePoolEvents != 0 { - size.With(nil).Add(-1) + + return func(info trace.TablePoolWithDoneInfo) { + if config.Details()&trace.TablePoolEvents != 0 { + with.With(nil).Add(-1) + } } } - var inflightStarts sync.Map t.OnPoolGet = func(info trace.TablePoolGetStartInfo) func(trace.TablePoolGetDoneInfo) { - wait.With(nil).Add(1) - start := time.Now() - return func(info trace.TablePoolGetDoneInfo) { - wait.With(nil).Add(-1) if info.Error == nil && config.Details()&trace.TablePoolEvents != 0 { - inflight.With(nil).Add(1) - inflightStarts.Store(info.Session.ID(), time.Now()) - waitLatency.With(nil).Record(time.Since(start)) + get.With(nil).Inc() } } } t.OnPoolPut = func(info trace.TablePoolPutStartInfo) func(trace.TablePoolPutDoneInfo) { if config.Details()&trace.TablePoolEvents != 0 { - inflight.With(nil).Add(-1) - start, ok := inflightStarts.LoadAndDelete(info.Session.ID()) - if !ok { - panic(fmt.Sprintf("unknown session '%s'", info.Session.ID())) - } - val, ok := start.(time.Time) - if !ok { - panic(fmt.Sprintf("unsupported type conversion from %T to time.Time", val)) - } - inflightLatency.With(nil).Record(time.Since(val)) + put.With(nil).Inc() } return nil } + t.OnPoolStateChange = func(info trace.TablePoolStateChangeInfo) { + limit.With(nil).Set(float64(info.Limit)) + index.With(nil).Set(float64(info.Index)) + idle.With(nil).Set(float64(info.Idle)) + wait.With(nil).Set(float64(info.Wait)) + createInProgress.With(nil).Set(float64(info.CreateInProgress)) + } { latency := session.WithSystem("query").TimerVec("latency") errs := session.WithSystem("query").CounterVec("errs", "status") diff --git a/trace/query.go b/trace/query.go index 050b6ca52..0347390cf 100644 --- a/trace/query.go +++ b/trace/query.go @@ -514,9 +514,8 @@ type ( } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals QueryPoolWithDoneInfo struct { - Error error - Attempts int + Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals QueryPoolPutStartInfo struct { @@ -526,6 +525,7 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call + Session sessionInfo } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals QueryPoolPutDoneInfo struct { @@ -542,11 +542,16 @@ type ( } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals QueryPoolGetDoneInfo struct { - Error error + Session sessionInfo + Attempts int + Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals QueryPoolChange struct { - Limit int - Idle int + Limit int + Index int + Idle int + Wait int + CreateInProgress int } ) diff --git a/trace/query_gtrace.go b/trace/query_gtrace.go index 6793586a5..890c81f2b 100644 --- a/trace/query_gtrace.go +++ b/trace/query_gtrace.go @@ -1623,23 +1623,24 @@ func QueryOnPoolTry(t *Query, c *context.Context, call call) func(error) { } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func QueryOnPoolWith(t *Query, c *context.Context, call call) func(_ error, attempts int) { +func QueryOnPoolWith(t *Query, c *context.Context, call call) func(attempts int, _ error) { var p QueryPoolWithStartInfo p.Context = c p.Call = call res := t.onPoolWith(p) - return func(e error, attempts int) { + return func(attempts int, e error) { var p QueryPoolWithDoneInfo - p.Error = e p.Attempts = attempts + p.Error = e res(p) } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func QueryOnPoolPut(t *Query, c *context.Context, call call) func(error) { +func QueryOnPoolPut(t *Query, c *context.Context, call call, session sessionInfo) func(error) { var p QueryPoolPutStartInfo p.Context = c p.Call = call + p.Session = session res := t.onPoolPut(p) return func(e error) { var p QueryPoolPutDoneInfo @@ -1648,22 +1649,27 @@ func QueryOnPoolPut(t *Query, c *context.Context, call call) func(error) { } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func QueryOnPoolGet(t *Query, c *context.Context, call call) func(error) { +func QueryOnPoolGet(t *Query, c *context.Context, call call) func(session sessionInfo, attempts int, _ error) { var p QueryPoolGetStartInfo p.Context = c p.Call = call res := t.onPoolGet(p) - return func(e error) { + return func(session sessionInfo, attempts int, e error) { var p QueryPoolGetDoneInfo + p.Session = session + p.Attempts = attempts p.Error = e res(p) } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func QueryOnPoolChange(t *Query, limit int, idle int) { +func QueryOnPoolChange(t *Query, limit int, index int, idle int, wait int, createInProgress int) { var p QueryPoolChange p.Limit = limit + p.Index = index p.Idle = idle + p.Wait = wait + p.CreateInProgress = createInProgress t.onPoolChange(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals diff --git a/trace/table.go b/trace/table.go index 9d9e09d75..5589327b1 100644 --- a/trace/table.go +++ b/trace/table.go @@ -64,15 +64,6 @@ type ( ) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnTxRollback func(TableTxRollbackStartInfo) func(TableTxRollbackDoneInfo) - // Pool state event - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals - OnPoolStateChange func(TablePoolStateChangeInfo) - - // Pool session lifecycle events - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals - OnPoolSessionAdd func(info TablePoolSessionAddInfo) - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals - OnPoolSessionRemove func(info TablePoolSessionRemoveInfo) // Pool common API events // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -80,6 +71,21 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnPoolGet func(TablePoolGetStartInfo) func(TablePoolGetDoneInfo) // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnPoolWith func(TablePoolWithStartInfo) func(TablePoolWithDoneInfo) + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + OnPoolStateChange func(TablePoolStateChangeInfo) + + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated + OnPoolSessionAdd func(info TablePoolSessionAddInfo) + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated + OnPoolSessionRemove func(info TablePoolSessionRemoveInfo) + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated OnPoolWait func(TablePoolWaitStartInfo) func(TablePoolWaitDoneInfo) } ) @@ -335,8 +341,16 @@ type ( } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TablePoolStateChangeInfo struct { - Size int - Event string + Limit int + Index int + Idle int + Wait int + CreateInProgress int + + // Deprecated: use Index field instead. + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated + Size int } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TablePoolSessionNewStartInfo struct { @@ -367,7 +381,9 @@ type ( Attempts int Error error } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated TablePoolWaitStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. @@ -376,15 +392,28 @@ type ( Context *context.Context Call call } - // TablePoolWaitDoneInfo means a wait iteration inside Get call is done - // Warning: Session and Error may be nil at the same time. This means - // that a wait iteration donned without any significant tableResultErr - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated TablePoolWaitDoneInfo struct { Session sessionInfo Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TablePoolWithStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + TablePoolWithDoneInfo struct { + Attempts int + Error error + } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TablePoolPutStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. @@ -410,11 +439,15 @@ type ( } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TablePoolSessionCloseDoneInfo struct{} - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated TablePoolSessionAddInfo struct { Session sessionInfo } - // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals + // Deprecated + // Will be removed after March 2025. + // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated TablePoolSessionRemoveInfo struct { Session sessionInfo } diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index d8908dfc8..c548157b1 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -699,28 +699,9 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } } { - h1 := t.OnPoolStateChange - h2 := x.OnPoolStateChange - ret.OnPoolStateChange = func(t TablePoolStateChangeInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - if h1 != nil { - h1(t) - } - if h2 != nil { - h2(t) - } - } - } - { - h1 := t.OnPoolSessionAdd - h2 := x.OnPoolSessionAdd - ret.OnPoolSessionAdd = func(info TablePoolSessionAddInfo) { + h1 := t.OnPoolPut + h2 := x.OnPoolPut + ret.OnPoolPut = func(t TablePoolPutStartInfo) func(TablePoolPutDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -728,37 +709,34 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } }() } + var r, r1 func(TablePoolPutDoneInfo) if h1 != nil { - h1(info) + r = h1(t) } if h2 != nil { - h2(info) - } - } - } - { - h1 := t.OnPoolSessionRemove - h2 := x.OnPoolSessionRemove - ret.OnPoolSessionRemove = func(info TablePoolSessionRemoveInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - if h1 != nil { - h1(info) + r1 = h2(t) } - if h2 != nil { - h2(info) + return func(t TablePoolPutDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(t) + } + if r1 != nil { + r1(t) + } } } } { - h1 := t.OnPoolPut - h2 := x.OnPoolPut - ret.OnPoolPut = func(t TablePoolPutStartInfo) func(TablePoolPutDoneInfo) { + h1 := t.OnPoolGet + h2 := x.OnPoolGet + ret.OnPoolGet = func(t TablePoolGetStartInfo) func(TablePoolGetDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -766,14 +744,14 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } }() } - var r, r1 func(TablePoolPutDoneInfo) + var r, r1 func(TablePoolGetDoneInfo) if h1 != nil { r = h1(t) } if h2 != nil { r1 = h2(t) } - return func(t TablePoolPutDoneInfo) { + return func(t TablePoolGetDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -791,9 +769,9 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } } { - h1 := t.OnPoolGet - h2 := x.OnPoolGet - ret.OnPoolGet = func(t TablePoolGetStartInfo) func(TablePoolGetDoneInfo) { + h1 := t.OnPoolWith + h2 := x.OnPoolWith + ret.OnPoolWith = func(t TablePoolWithStartInfo) func(TablePoolWithDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -801,14 +779,14 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } }() } - var r, r1 func(TablePoolGetDoneInfo) + var r, r1 func(TablePoolWithDoneInfo) if h1 != nil { r = h1(t) } if h2 != nil { r1 = h2(t) } - return func(t TablePoolGetDoneInfo) { + return func(t TablePoolWithDoneInfo) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -825,6 +803,63 @@ func (t *Table) Compose(x *Table, opts ...TableComposeOption) *Table { } } } + { + h1 := t.OnPoolStateChange + h2 := x.OnPoolStateChange + ret.OnPoolStateChange = func(t TablePoolStateChangeInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(t) + } + if h2 != nil { + h2(t) + } + } + } + { + h1 := t.OnPoolSessionAdd + h2 := x.OnPoolSessionAdd + ret.OnPoolSessionAdd = func(info TablePoolSessionAddInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(info) + } + if h2 != nil { + h2(info) + } + } + } + { + h1 := t.OnPoolSessionRemove + h2 := x.OnPoolSessionRemove + ret.OnPoolSessionRemove = func(info TablePoolSessionRemoveInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if h1 != nil { + h1(info) + } + if h2 != nil { + h2(info) + } + } + } { h1 := t.OnPoolWait h2 := x.OnPoolWait @@ -1147,27 +1182,6 @@ func (t *Table) onTxRollback(t1 TableTxRollbackStartInfo) func(TableTxRollbackDo } return res } -func (t *Table) onPoolStateChange(t1 TablePoolStateChangeInfo) { - fn := t.OnPoolStateChange - if fn == nil { - return - } - fn(t1) -} -func (t *Table) onPoolSessionAdd(info TablePoolSessionAddInfo) { - fn := t.OnPoolSessionAdd - if fn == nil { - return - } - fn(info) -} -func (t *Table) onPoolSessionRemove(info TablePoolSessionRemoveInfo) { - fn := t.OnPoolSessionRemove - if fn == nil { - return - } - fn(info) -} func (t *Table) onPoolPut(t1 TablePoolPutStartInfo) func(TablePoolPutDoneInfo) { fn := t.OnPoolPut if fn == nil { @@ -1198,6 +1212,42 @@ func (t *Table) onPoolGet(t1 TablePoolGetStartInfo) func(TablePoolGetDoneInfo) { } return res } +func (t *Table) onPoolWith(t1 TablePoolWithStartInfo) func(TablePoolWithDoneInfo) { + fn := t.OnPoolWith + if fn == nil { + return func(TablePoolWithDoneInfo) { + return + } + } + res := fn(t1) + if res == nil { + return func(TablePoolWithDoneInfo) { + return + } + } + return res +} +func (t *Table) onPoolStateChange(t1 TablePoolStateChangeInfo) { + fn := t.OnPoolStateChange + if fn == nil { + return + } + fn(t1) +} +func (t *Table) onPoolSessionAdd(info TablePoolSessionAddInfo) { + fn := t.OnPoolSessionAdd + if fn == nil { + return + } + fn(info) +} +func (t *Table) onPoolSessionRemove(info TablePoolSessionRemoveInfo) { + fn := t.OnPoolSessionRemove + if fn == nil { + return + } + fn(info) +} func (t *Table) onPoolWait(t1 TablePoolWaitStartInfo) func(TablePoolWaitDoneInfo) { fn := t.OnPoolWait if fn == nil { @@ -1490,25 +1540,6 @@ func TableOnTxRollback(t *Table, c *context.Context, call call, session sessionI } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnPoolStateChange(t *Table, size int, event string) { - var p TablePoolStateChangeInfo - p.Size = size - p.Event = event - t.onPoolStateChange(p) -} -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnPoolSessionAdd(t *Table, session sessionInfo) { - var p TablePoolSessionAddInfo - p.Session = session - t.onPoolSessionAdd(p) -} -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TableOnPoolSessionRemove(t *Table, session sessionInfo) { - var p TablePoolSessionRemoveInfo - p.Session = session - t.onPoolSessionRemove(p) -} -// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TableOnPoolPut(t *Table, c *context.Context, call call, session sessionInfo) func(error) { var p TablePoolPutStartInfo p.Context = c @@ -1536,6 +1567,42 @@ func TableOnPoolGet(t *Table, c *context.Context, call call) func(session sessio } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TableOnPoolWith(t *Table, c *context.Context, call call) func(attempts int, _ error) { + var p TablePoolWithStartInfo + p.Context = c + p.Call = call + res := t.onPoolWith(p) + return func(attempts int, e error) { + var p TablePoolWithDoneInfo + p.Attempts = attempts + p.Error = e + res(p) + } +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TableOnPoolStateChange(t *Table, limit int, index int, idle int, wait int, createInProgress int, size int) { + var p TablePoolStateChangeInfo + p.Limit = limit + p.Index = index + p.Idle = idle + p.Wait = wait + p.CreateInProgress = createInProgress + p.Size = size + t.onPoolStateChange(p) +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TableOnPoolSessionAdd(t *Table, session sessionInfo) { + var p TablePoolSessionAddInfo + p.Session = session + t.onPoolSessionAdd(p) +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals +func TableOnPoolSessionRemove(t *Table, session sessionInfo) { + var p TablePoolSessionRemoveInfo + p.Session = session + t.onPoolSessionRemove(p) +} +// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TableOnPoolWait(t *Table, c *context.Context, call call) func(session sessionInfo, _ error) { var p TablePoolWaitStartInfo p.Context = c