Skip to content

Commit

Permalink
Merge pull request #1445 from ydb-platform/trace
Browse files Browse the repository at this point in the history
trace changes
  • Loading branch information
asmyasnikov authored Sep 6, 2024
2 parents f12a33e + 42f0271 commit 1d8bf01
Show file tree
Hide file tree
Showing 14 changed files with 927 additions and 937 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Changed `trace.Table` and `trace.Query` traces

## v3.79.0
* Added commit messages for topic listener
* EOF error in RecvMsg is no longer logged
Expand Down
28 changes: 0 additions & 28 deletions internal/pool/defaults.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,3 @@
package pool

const DefaultLimit = 50

var defaultTrace = &Trace{
OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) {
return func(info *NewDoneInfo) {
}
},
OnClose: func(info *CloseStartInfo) func(info *CloseDoneInfo) {
return func(info *CloseDoneInfo) {
}
},
OnTry: func(info *TryStartInfo) func(info *TryDoneInfo) {
return func(info *TryDoneInfo) {
}
},
OnWith: func(info *WithStartInfo) func(info *WithDoneInfo) {
return func(info *WithDoneInfo) {
}
},
OnPut: func(info *PutStartInfo) func(info *PutDoneInfo) {
return func(info *PutDoneInfo) {
}
},
OnGet: func(info *GetStartInfo) func(info *GetDoneInfo) {
return func(info *GetDoneInfo) {
}
},
OnChange: func(info ChangeInfo) {},
}
148 changes: 79 additions & 69 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func New[PT Item[T], T any](
) *Pool[PT, T] {
p := &Pool[PT, T]{
config: Config[PT, T]{
trace: defaultTrace,
trace: &Trace{},
limit: DefaultLimit,
createItem: defaultCreateItem[T, PT],
},
Expand All @@ -90,16 +90,16 @@ func New[PT Item[T], T any](
}
}

onDone := p.config.trace.OnNew(&NewStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.New"),
})

defer func() {
onDone(&NewDoneInfo{
Limit: p.config.limit,
})
}()
if onNew := p.config.trace.OnNew; onNew != nil {
onDone := onNew(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.New"),
)
if onDone != nil {
defer func() {
onDone(p.config.limit)
}()
}
}

p.createItem = makeCreateItemFunc(p.config, p.done, func(item PT) error {
return xsync.WithLock(&p.mu, func() error {
Expand Down Expand Up @@ -195,14 +195,21 @@ func makeCreateItemFunc[PT Item[T], T any]( //nolint:funlen
}
}

func (p *Pool[PT, T]) stats() Stats {
return Stats{
Limit: p.config.limit,
Idle: len(p.idle),
Wait: 0,
CreateInProgress: 0,
}
}

func (p *Pool[PT, T]) onChangeStats() {
p.mu.RLock()
info := ChangeInfo{
Limit: p.config.limit,
Idle: len(p.idle),
if onChange := p.config.trace.OnChange; onChange != nil {
onChange(xsync.WithRLock(&p.mu, func() Stats {
return p.stats()
}))
}
p.mu.RUnlock()
p.config.trace.OnChange(info)
}

func (p *Pool[PT, T]) Stats() Stats {
Expand All @@ -229,18 +236,19 @@ func (p *Pool[PT, T]) getItemFromIdle() (item PT) {
return item
}

func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
onDone := p.config.trace.OnGet(&GetStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"),
})
defer func() {
onDone(&GetDoneInfo{
Error: finalErr,
})
}()
func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) {
if onGet := p.config.trace.OnGet; onGet != nil {
onDone := onGet(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).getItem"),
)
if onDone != nil {
defer func() {
onDone(item, 0, finalErr)
}()
}
}

item := p.getItemFromIdle()
item = p.getItemFromIdle()

if item != nil {
if item.IsAlive() {
Expand All @@ -267,16 +275,17 @@ func (p *Pool[PT, T]) appendItemToIdle(item PT) {
}

func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
onDone := p.config.trace.OnPut(&PutStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"),
})
defer func() {
onDone(&PutDoneInfo{
Error: finalErr,
})
}()

if onPut := p.config.trace.OnPut; onPut != nil {
onDone := onPut(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).putItem"),
item,
)
if onDone != nil {
defer func() {
onDone(finalErr)
}()
}
}
if !item.IsAlive() {
p.closeItem(ctx, item)

Expand Down Expand Up @@ -324,15 +333,16 @@ func makeAsyncCloseItemFunc[PT Item[T], T any](
}

func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
onDone := p.config.trace.OnTry(&TryStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
})
defer func() {
onDone(&TryDoneInfo{
Error: finalErr,
})
}()
if onTry := p.config.trace.OnTry; onTry != nil {
onDone := onTry(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
)
if onDone != nil {
defer func() {
onDone(finalErr)
}()
}
}

select {
case <-p.done:
Expand Down Expand Up @@ -373,19 +383,18 @@ func (p *Pool[PT, T]) With(
f func(ctx context.Context, item PT) error,
opts ...retry.Option,
) (finalErr error) {
var (
onDone = p.config.trace.OnWith(&WithStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"),
})
attempts int
)
defer func() {
onDone(&WithDoneInfo{
Error: finalErr,
Attempts: attempts,
})
}()
var attempts int

if onWith := p.config.trace.OnWith; onWith != nil {
onDone := onWith(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).With"),
)
if onDone != nil {
defer func() {
onDone(attempts, finalErr)
}()
}
}

err := retry.Retry(ctx, func(ctx context.Context) error {
err := p.try(ctx, f)
Expand All @@ -409,15 +418,16 @@ func (p *Pool[PT, T]) With(
}

func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
onDone := p.config.trace.OnClose(&CloseStartInfo{
Context: &ctx,
Call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).Close"),
})
defer func() {
onDone(&CloseDoneInfo{
Error: finalErr,
})
}()
if onClose := p.config.trace.OnClose; onClose != nil {
onDone := onClose(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).Close"),
)
if onDone != nil {
defer func() {
onDone(finalErr)
}()
}
}

close(p.done)

Expand Down
20 changes: 11 additions & 9 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,12 @@ func TestPool(t *testing.T) {
})
t.Run("Stress", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
trace := *defaultTrace
trace.OnChange = func(info ChangeInfo) {
require.GreaterOrEqual(t, info.Limit, info.Idle)
trace := &Trace{
OnChange: func(info Stats) {
require.GreaterOrEqual(t, info.Limit, info.Idle)
},
}
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace))
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace))
var wg sync.WaitGroup
wg.Add(DefaultLimit*2 + 1)
for range make([]struct{}, DefaultLimit*2) {
Expand All @@ -297,12 +298,13 @@ func TestPool(t *testing.T) {
})
t.Run("ParallelCreation", func(t *testing.T) {
xtest.TestManyTimes(t, func(t testing.TB) {
trace := *defaultTrace
trace.OnChange = func(info ChangeInfo) {
require.Equal(t, DefaultLimit, info.Limit)
require.LessOrEqual(t, info.Idle, DefaultLimit)
trace := &Trace{
OnChange: func(info Stats) {
require.Equal(t, DefaultLimit, info.Limit)
require.LessOrEqual(t, info.Idle, DefaultLimit)
},
}
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace))
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](trace))
var wg sync.WaitGroup
for range make([]struct{}, DefaultLimit*10) {
wg.Add(1)
Expand Down
7 changes: 5 additions & 2 deletions internal/pool/stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package pool

type Stats struct {
Limit int
Idle int
Limit int
Index int
Idle int
Wait int
CreateInProgress int
}
84 changes: 8 additions & 76 deletions internal/pool/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,81 +8,13 @@ import (

type (
Trace struct {
OnNew func(*NewStartInfo) func(*NewDoneInfo)
OnClose func(*CloseStartInfo) func(*CloseDoneInfo)
OnTry func(*TryStartInfo) func(*TryDoneInfo)
OnWith func(*WithStartInfo) func(*WithDoneInfo)
OnPut func(*PutStartInfo) func(*PutDoneInfo)
OnGet func(*GetStartInfo) func(*GetDoneInfo)
OnChange func(ChangeInfo)
OnNew func(ctx *context.Context, call stack.Caller) func(limit int)
OnClose func(ctx *context.Context, call stack.Caller) func(err error)
OnTry func(ctx *context.Context, call stack.Caller) func(err error)
OnWith func(ctx *context.Context, call stack.Caller) func(attempts int, err error)
OnPut func(ctx *context.Context, call stack.Caller, item any) func(err error)
OnGet func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error)
onWait func() func(item any, err error) //nolint:unused
OnChange func(Stats)
}
NewStartInfo struct {
// Context make available context in trace stack.Callerback function.
// Pointer to context provide replacement of context in trace stack.Callerback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside stack.Callerback function
Context *context.Context
Call stack.Caller
}
NewDoneInfo struct {
Limit int
}
CloseStartInfo struct {
// Context make available context in trace stack.Callerback function.
// Pointer to context provide replacement of context in trace stack.Callerback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside stack.Callerback function
Context *context.Context
Call stack.Caller
}
CloseDoneInfo struct {
Error error
}
TryStartInfo struct {
// Context make available context in trace stack.Callerback function.
// Pointer to context provide replacement of context in trace stack.Callerback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside stack.Callerback function
Context *context.Context
Call stack.Caller
}
TryDoneInfo struct {
Error error
}
WithStartInfo struct {
// Context make available context in trace stack.Callerback function.
// Pointer to context provide replacement of context in trace stack.Callerback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside stack.Callerback function
Context *context.Context
Call stack.Caller
}
WithDoneInfo struct {
Error error

Attempts int
}
PutStartInfo struct {
// Context make available context in trace stack.Callerback function.
// Pointer to context provide replacement of context in trace stack.Callerback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside stack.Callerback function
Context *context.Context
Call stack.Caller
}
PutDoneInfo struct {
Error error
}
GetStartInfo struct {
// Context make available context in trace stack.Callerback function.
// Pointer to context provide replacement of context in trace stack.Callerback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside stack.Callerback function
Context *context.Context
Call stack.Caller
}
GetDoneInfo struct {
Error error
}
ChangeInfo = Stats
)
Loading

0 comments on commit 1d8bf01

Please sign in to comment.