diff --git a/driver.go b/driver.go index 8bb558bfb..f35ee074f 100644 --- a/driver.go +++ b/driver.go @@ -94,7 +94,12 @@ type Driver struct { //nolint:maligned } // Close closes Driver and clear resources -func (d *Driver) Close(ctx context.Context) error { +func (d *Driver) Close(ctx context.Context) (finalErr error) { + onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID(0)) + defer func() { + onDone(finalErr) + }() + d.mtx.Lock() defer d.mtx.Unlock() @@ -104,16 +109,16 @@ func (d *Driver) Close(ctx context.Context) error { } }() - closers := make([]func(context.Context) error, 0) + closes := make([]func(context.Context) error, 0) d.childrenMtx.WithLock(func() { for _, child := range d.children { - closers = append(closers, child.Close) + closes = append(closes, child.Close) } d.children = nil }) - closers = append( - closers, + closes = append( + closes, d.ratelimiter.Close, d.coordination.Close, d.scheme.Close, @@ -125,8 +130,8 @@ func (d *Driver) Close(ctx context.Context) error { ) var issues []error - for _, closer := range closers { - if err := closer(ctx); err != nil { + for _, f := range closes { + if err := f(ctx); err != nil { issues = append(issues, err) } } @@ -310,7 +315,7 @@ func connect(ctx context.Context, d *Driver) error { } if d.pool == nil { - d.pool = conn.NewPool(d.config) + d.pool = conn.NewPool(ctx, d.config) } d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) diff --git a/internal/balancer/local_dc_test.go b/internal/balancer/local_dc_test.go index 678b55c3f..26eaf38a6 100644 --- a/internal/balancer/local_dc_test.go +++ b/internal/balancer/local_dc_test.go @@ -136,7 +136,7 @@ func TestLocalDCDiscovery(t *testing.T) { r := &Balancer{ driverConfig: cfg, balancerConfig: *cfg.Balancer(), - pool: conn.NewPool(cfg), + pool: conn.NewPool(context.Background(), cfg), discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{ &mock.Endpoint{AddrField: "a:123", LocationField: "a"}, &mock.Endpoint{AddrField: "b:234", LocationField: "b"}, diff --git a/internal/conn/pool.go b/internal/conn/pool.go index ed64af2f6..5c4e59e5e 100644 --- a/internal/conn/pool.go +++ b/internal/conn/pool.go @@ -11,6 +11,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -118,7 +119,12 @@ func (p *Pool) Take(context.Context) error { return nil } -func (p *Pool) Release(ctx context.Context) error { +func (p *Pool) Release(ctx context.Context) (finalErr error) { + onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID(0)) + defer func() { + onDone(finalErr) + }() + if atomic.AddInt64(&p.usages, -1) > 0 { return nil } @@ -162,7 +168,7 @@ func (p *Pool) Release(ctx context.Context) error { return nil } -func (p *Pool) connParker(ttl, interval time.Duration) { +func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -174,7 +180,7 @@ func (p *Pool) connParker(ttl, interval time.Duration) { if time.Since(c.LastUsage()) > ttl { switch c.GetState() { case Online, Banned: - _ = c.park(context.Background()) + _ = c.park(ctx) default: // nop } @@ -194,7 +200,11 @@ func (p *Pool) collectConns() []*conn { return conns } -func NewPool(config Config) *Pool { +func NewPool(ctx context.Context, config Config) *Pool { + onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(0)) + defer func() { + onDone() + }() p := &Pool{ usages: 1, config: config, @@ -203,7 +213,7 @@ func NewPool(config Config) *Pool { done: make(chan struct{}), } if ttl := config.ConnectionTTL(); ttl > 0 { - go p.connParker(ttl, ttl/2) + go p.connParker(xcontext.WithoutDeadline(ctx), ttl, ttl/2) } return p } diff --git a/trace/driver.go b/trace/driver.go index bfb84e0f3..6e80c86c6 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -19,6 +19,10 @@ type ( OnInit func(DriverInitStartInfo) func(DriverInitDoneInfo) OnClose func(DriverCloseStartInfo) func(DriverCloseDoneInfo) + // Pool of connections + OnPoolNew func(DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo) + OnPoolRelease func(DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo) + // Deprecated: driver not notificate about this event OnNetRead func(DriverNetReadStartInfo) func(DriverNetReadDoneInfo) // Deprecated: driver not notificate about this event @@ -409,6 +413,27 @@ type ( DriverInitDoneInfo struct { Error error } + DriverConnPoolNewStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + DriverConnPoolNewDoneInfo struct { + } + DriverConnPoolReleaseStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + } + DriverConnPoolReleaseDoneInfo struct { + Error error + } DriverCloseStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index 33c518f99..926541943 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -100,6 +100,76 @@ func (t *Driver) Compose(x *Driver, opts ...DriverComposeOption) *Driver { } } } + { + h1 := t.OnPoolNew + h2 := x.OnPoolNew + ret.OnPoolNew = func(d DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DriverConnPoolNewDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DriverConnPoolNewDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } + { + h1 := t.OnPoolRelease + h2 := x.OnPoolRelease + ret.OnPoolRelease = func(d DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DriverConnPoolReleaseDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DriverConnPoolReleaseDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } { h1 := t.OnNetRead h2 := x.OnNetRead @@ -918,6 +988,36 @@ func (t *Driver) onClose(d DriverCloseStartInfo) func(DriverCloseDoneInfo) { } return res } +func (t *Driver) onPoolNew(d DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo) { + fn := t.OnPoolNew + if fn == nil { + return func(DriverConnPoolNewDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DriverConnPoolNewDoneInfo) { + return + } + } + return res +} +func (t *Driver) onPoolRelease(d DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo) { + fn := t.OnPoolRelease + if fn == nil { + return func(DriverConnPoolReleaseDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DriverConnPoolReleaseDoneInfo) { + return + } + } + return res +} func (t *Driver) onNetRead(d DriverNetReadStartInfo) func(DriverNetReadDoneInfo) { fn := t.OnNetRead if fn == nil { @@ -1285,6 +1385,27 @@ func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { res(p) } } +func DriverOnPoolNew(t *Driver, c *context.Context, call call) func() { + var p DriverConnPoolNewStartInfo + p.Context = c + p.Call = call + res := t.onPoolNew(p) + return func() { + var p DriverConnPoolNewDoneInfo + res(p) + } +} +func DriverOnPoolRelease(t *Driver, c *context.Context, call call) func(error) { + var p DriverConnPoolReleaseStartInfo + p.Context = c + p.Call = call + res := t.onPoolRelease(p) + return func(e error) { + var p DriverConnPoolReleaseDoneInfo + p.Error = e + res(p) + } +} func DriverOnNetRead(t *Driver, call call, address string, buffer int) func(received int, _ error) { var p DriverNetReadStartInfo p.Call = call diff --git a/with_test.go b/with_test.go index d04594d76..5469a45ed 100644 --- a/with_test.go +++ b/with_test.go @@ -132,7 +132,7 @@ func TestWithCertificatesCached(t *testing.T) { db, err := newConnectionFromOptions(ctx, append( test.options, - withConnPool(conn.NewPool(config.New())), + withConnPool(conn.NewPool(context.Background(), config.New())), )..., ) require.NoError(t, err)