Skip to content

Commit

Permalink
driver close traces
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Nov 8, 2023
1 parent aea0b8a commit e9189f6
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 15 deletions.
21 changes: 13 additions & 8 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ type Driver struct { //nolint:maligned
}

// Close closes Driver and clear resources
func (d *Driver) Close(ctx context.Context) error {
func (d *Driver) Close(ctx context.Context) (finalErr error) {
onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID(0))
defer func() {
onDone(finalErr)
}()

d.mtx.Lock()
defer d.mtx.Unlock()

Expand All @@ -104,16 +109,16 @@ func (d *Driver) Close(ctx context.Context) error {
}
}()

closers := make([]func(context.Context) error, 0)
closes := make([]func(context.Context) error, 0)
d.childrenMtx.WithLock(func() {
for _, child := range d.children {
closers = append(closers, child.Close)
closes = append(closes, child.Close)
}
d.children = nil
})

closers = append(
closers,
closes = append(
closes,
d.ratelimiter.Close,
d.coordination.Close,
d.scheme.Close,
Expand All @@ -125,8 +130,8 @@ func (d *Driver) Close(ctx context.Context) error {
)

var issues []error
for _, closer := range closers {
if err := closer(ctx); err != nil {
for _, f := range closes {
if err := f(ctx); err != nil {
issues = append(issues, err)
}
}
Expand Down Expand Up @@ -310,7 +315,7 @@ func connect(ctx context.Context, d *Driver) error {
}

if d.pool == nil {
d.pool = conn.NewPool(d.config)
d.pool = conn.NewPool(ctx, d.config)
}

d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/local_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestLocalDCDiscovery(t *testing.T) {
r := &Balancer{
driverConfig: cfg,
balancerConfig: *cfg.Balancer(),
pool: conn.NewPool(cfg),
pool: conn.NewPool(context.Background(), cfg),
discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},
Expand Down
20 changes: 15 additions & 5 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
Expand Down Expand Up @@ -118,7 +119,12 @@ func (p *Pool) Take(context.Context) error {
return nil
}

func (p *Pool) Release(ctx context.Context) error {
func (p *Pool) Release(ctx context.Context) (finalErr error) {
onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID(0))
defer func() {
onDone(finalErr)
}()

if atomic.AddInt64(&p.usages, -1) > 0 {
return nil
}
Expand Down Expand Up @@ -162,7 +168,7 @@ func (p *Pool) Release(ctx context.Context) error {
return nil
}

func (p *Pool) connParker(ttl, interval time.Duration) {
func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
Expand All @@ -174,7 +180,7 @@ func (p *Pool) connParker(ttl, interval time.Duration) {
if time.Since(c.LastUsage()) > ttl {
switch c.GetState() {
case Online, Banned:
_ = c.park(context.Background())
_ = c.park(ctx)
default:
// nop
}
Expand All @@ -194,7 +200,11 @@ func (p *Pool) collectConns() []*conn {
return conns
}

func NewPool(config Config) *Pool {
func NewPool(ctx context.Context, config Config) *Pool {
onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(0))
defer func() {

Check failure on line 205 in internal/conn/pool.go

View workflow job for this annotation

GitHub Actions / golangci-lint

deferUnlambda: can rewrite as `defer onDone()` (gocritic)
onDone()
}()
p := &Pool{
usages: 1,
config: config,
Expand All @@ -203,7 +213,7 @@ func NewPool(config Config) *Pool {
done: make(chan struct{}),
}
if ttl := config.ConnectionTTL(); ttl > 0 {
go p.connParker(ttl, ttl/2)
go p.connParker(xcontext.WithoutDeadline(ctx), ttl, ttl/2)
}
return p
}
25 changes: 25 additions & 0 deletions trace/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type (
OnInit func(DriverInitStartInfo) func(DriverInitDoneInfo)
OnClose func(DriverCloseStartInfo) func(DriverCloseDoneInfo)

// Pool of connections
OnPoolNew func(DriverConnPoolNewStartInfo) func(DriverConnPoolNewDoneInfo)
OnPoolRelease func(DriverConnPoolReleaseStartInfo) func(DriverConnPoolReleaseDoneInfo)

// Deprecated: driver not notificate about this event
OnNetRead func(DriverNetReadStartInfo) func(DriverNetReadDoneInfo)
// Deprecated: driver not notificate about this event
Expand Down Expand Up @@ -409,6 +413,27 @@ type (
DriverInitDoneInfo struct {
Error error
}
DriverConnPoolNewStartInfo struct {
// Context make available context in trace callback function.
// Pointer to context provide replacement of context in trace callback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside callback function
Context *context.Context
Call call
}
DriverConnPoolNewDoneInfo struct {

Check failure on line 424 in trace/driver.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
}
DriverConnPoolReleaseStartInfo struct {
// Context make available context in trace callback function.
// Pointer to context provide replacement of context in trace callback function.
// Warning: concurrent access to pointer on client side must be excluded.
// Safe replacement of context are provided only inside callback function
Context *context.Context
Call call
}
DriverConnPoolReleaseDoneInfo struct {
Error error
}
DriverCloseStartInfo struct {
// Context make available context in trace callback function.
// Pointer to context provide replacement of context in trace callback function.
Expand Down
121 changes: 121 additions & 0 deletions trace/driver_gtrace.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion with_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestWithCertificatesCached(t *testing.T) {
db, err := newConnectionFromOptions(ctx,
append(
test.options,
withConnPool(conn.NewPool(config.New())),
withConnPool(conn.NewPool(context.Background(), config.New())),
)...,
)
require.NoError(t, err)
Expand Down

0 comments on commit e9189f6

Please sign in to comment.