Skip to content

Commit

Permalink
fixed closing of child driver with shared balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Nov 5, 2024
1 parent 6f56a5c commit 629b019
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 8 deletions.
14 changes: 10 additions & 4 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type (
balancerWithMeta struct {
balancer *balancer.Balancer
meta *meta.Meta
close func(ctx context.Context) error
}
)

Expand All @@ -134,6 +135,10 @@ func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc,
return b.balancer.NewStream(metaCtx, desc, method, opts...)
}

func (b *balancerWithMeta) Close(ctx context.Context) error {
return b.close(ctx)
}

// Close closes Driver and clear resources
//
//nolint:nonamedreturns
Expand Down Expand Up @@ -176,7 +181,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
d.query.Close,
d.topic.Close,
d.discovery.Close,
d.metaBalancer.balancer.Close,
d.metaBalancer.Close,
d.pool.Release,
)

Expand Down Expand Up @@ -278,7 +283,7 @@ func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, _ error)
}
}

d, err := newConnectionFromOptions(ctx, opts...)
d, err := driverFromOptions(ctx, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -320,7 +325,7 @@ func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver {
// Will be removed after Oct 2024.
// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint:nonamedreturns
d, err := newConnectionFromOptions(ctx, opts...)
d, err := driverFromOptions(ctx, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -342,7 +347,7 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint:
}

//nolint:cyclop, nonamedreturns, funlen
func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) {
func driverFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) {
ctx, driverCtxCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
defer func() {
if err != nil {
Expand Down Expand Up @@ -444,6 +449,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
return xerrors.WithStackTrace(err)
}
d.metaBalancer.balancer = b
d.metaBalancer.close = b.Close
}
d.metaBalancer.meta = d.config.Meta()

Expand Down
2 changes: 1 addition & 1 deletion dsn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestParse(t *testing.T) {
require.ErrorIs(t, err, tt.err)
} else {
require.NoError(t, err)
d, err := newConnectionFromOptions(context.Background(), opts...)
d, err := driverFromOptions(context.Background(), opts...)
require.NoError(t, err)
exp := newConnector(tt.connectorOpts...)
act := newConnector(d.databaseSQLOptions...)
Expand Down
1 change: 1 addition & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option {
func WithSharedBalancer(parent *Driver) Option {
return func(ctx context.Context, d *Driver) error {
d.metaBalancer.balancer = parent.metaBalancer.balancer
d.metaBalancer.close = func(ctx context.Context) error { return nil }

return nil
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

//nolint:gocyclo
func TestConnection(sourceTest *testing.T) {
func TestDriver(sourceTest *testing.T) {
t := xtest.MakeSyncedTest(sourceTest)
const sumColumn = "sum"
var (
Expand Down Expand Up @@ -164,6 +164,19 @@ func TestConnection(sourceTest *testing.T) {
t.Fatalf("close failed: %+v", e)
}
}()
t.Run("With", func(t *testing.T) {
t.Run("WithSharedBalancer", func(t *testing.T) {
child, err := db.With(ctx, ydb.WithSharedBalancer(db))
require.NoError(t, err)
row, err := child.Query().QueryRow(ctx, `SELECT 1`)
require.NoError(t, err)
var result int32
err = row.Scan(&result)
require.NoError(t, err)
err = child.Close(ctx)
require.NoError(t, err)
})
})
t.Run("discovery.WhoAmI", func(t *testing.T) {
if err = retry.Retry(ctx, func(ctx context.Context) (err error) {
discoveryClient := Ydb_Discovery_V1.NewDiscoveryServiceClient(ydb.GRPCConn(db))
Expand Down
2 changes: 1 addition & 1 deletion with.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var nextID atomic.Uint64 //nolint:gochecknoglobals
func (d *Driver) with(ctx context.Context, opts ...Option) (*Driver, uint64, error) {
id := nextID.Add(1)

child, err := newConnectionFromOptions(
child, err := driverFromOptions(
ctx,
append(
append(
Expand Down
2 changes: 1 addition & 1 deletion with_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestWithCertificatesCached(t *testing.T) {
},
} {
t.Run(test.name, func(t *testing.T) {
db, err := newConnectionFromOptions(ctx,
db, err := driverFromOptions(ctx,
append(
test.options,
withConnPool(conn.NewPool(context.Background(), config.New())), //nolint:contextcheck
Expand Down

0 comments on commit 629b019

Please sign in to comment.