From 629b0190cbc09c49eb4d04f531dac2817227f9ea Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 5 Nov 2024 20:25:09 +0300 Subject: [PATCH] fixed closing of child driver with shared balancer --- driver.go | 14 ++++++++++---- dsn_test.go | 2 +- options.go | 1 + ...ction_secure_test.go => driver_secure_test.go} | 0 .../{connection_test.go => driver_test.go} | 15 ++++++++++++++- with.go | 2 +- with_test.go | 2 +- 7 files changed, 28 insertions(+), 8 deletions(-) rename tests/integration/{connection_secure_test.go => driver_secure_test.go} (100%) rename tests/integration/{connection_test.go => driver_test.go} (96%) diff --git a/driver.go b/driver.go index 46a682aca..da2122a56 100644 --- a/driver.go +++ b/driver.go @@ -109,6 +109,7 @@ type ( balancerWithMeta struct { balancer *balancer.Balancer meta *meta.Meta + close func(ctx context.Context) error } ) @@ -134,6 +135,10 @@ func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, return b.balancer.NewStream(metaCtx, desc, method, opts...) } +func (b *balancerWithMeta) Close(ctx context.Context) error { + return b.close(ctx) +} + // Close closes Driver and clear resources // //nolint:nonamedreturns @@ -176,7 +181,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) { d.query.Close, d.topic.Close, d.discovery.Close, - d.metaBalancer.balancer.Close, + d.metaBalancer.Close, d.pool.Release, ) @@ -278,7 +283,7 @@ func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, _ error) } } - d, err := newConnectionFromOptions(ctx, opts...) + d, err := driverFromOptions(ctx, opts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -320,7 +325,7 @@ func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver { // Will be removed after Oct 2024. // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint:nonamedreturns - d, err := newConnectionFromOptions(ctx, opts...) + d, err := driverFromOptions(ctx, opts...) if err != nil { return nil, xerrors.WithStackTrace(err) } @@ -342,7 +347,7 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint: } //nolint:cyclop, nonamedreturns, funlen -func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) { +func driverFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) { ctx, driverCtxCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx)) defer func() { if err != nil { @@ -444,6 +449,7 @@ func (d *Driver) connect(ctx context.Context) (err error) { return xerrors.WithStackTrace(err) } d.metaBalancer.balancer = b + d.metaBalancer.close = b.Close } d.metaBalancer.meta = d.config.Meta() diff --git a/dsn_test.go b/dsn_test.go index cb241a9a5..a2fa0c789 100644 --- a/dsn_test.go +++ b/dsn_test.go @@ -178,7 +178,7 @@ func TestParse(t *testing.T) { require.ErrorIs(t, err, tt.err) } else { require.NoError(t, err) - d, err := newConnectionFromOptions(context.Background(), opts...) + d, err := driverFromOptions(context.Background(), opts...) require.NoError(t, err) exp := newConnector(tt.connectorOpts...) act := newConnector(d.databaseSQLOptions...) diff --git a/options.go b/options.go index ed1b02cfb..f8b923270 100644 --- a/options.go +++ b/options.go @@ -602,6 +602,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { func WithSharedBalancer(parent *Driver) Option { return func(ctx context.Context, d *Driver) error { d.metaBalancer.balancer = parent.metaBalancer.balancer + d.metaBalancer.close = func(ctx context.Context) error { return nil } return nil } diff --git a/tests/integration/connection_secure_test.go b/tests/integration/driver_secure_test.go similarity index 100% rename from tests/integration/connection_secure_test.go rename to tests/integration/driver_secure_test.go diff --git a/tests/integration/connection_test.go b/tests/integration/driver_test.go similarity index 96% rename from tests/integration/connection_test.go rename to tests/integration/driver_test.go index 522adcc83..067ae5973 100644 --- a/tests/integration/connection_test.go +++ b/tests/integration/driver_test.go @@ -36,7 +36,7 @@ import ( ) //nolint:gocyclo -func TestConnection(sourceTest *testing.T) { +func TestDriver(sourceTest *testing.T) { t := xtest.MakeSyncedTest(sourceTest) const sumColumn = "sum" var ( @@ -164,6 +164,19 @@ func TestConnection(sourceTest *testing.T) { t.Fatalf("close failed: %+v", e) } }() + t.Run("With", func(t *testing.T) { + t.Run("WithSharedBalancer", func(t *testing.T) { + child, err := db.With(ctx, ydb.WithSharedBalancer(db)) + require.NoError(t, err) + row, err := child.Query().QueryRow(ctx, `SELECT 1`) + require.NoError(t, err) + var result int32 + err = row.Scan(&result) + require.NoError(t, err) + err = child.Close(ctx) + require.NoError(t, err) + }) + }) t.Run("discovery.WhoAmI", func(t *testing.T) { if err = retry.Retry(ctx, func(ctx context.Context) (err error) { discoveryClient := Ydb_Discovery_V1.NewDiscoveryServiceClient(ydb.GRPCConn(db)) diff --git a/with.go b/with.go index 1d004b81c..efb1349c3 100644 --- a/with.go +++ b/with.go @@ -14,7 +14,7 @@ var nextID atomic.Uint64 //nolint:gochecknoglobals func (d *Driver) with(ctx context.Context, opts ...Option) (*Driver, uint64, error) { id := nextID.Add(1) - child, err := newConnectionFromOptions( + child, err := driverFromOptions( ctx, append( append( diff --git a/with_test.go b/with_test.go index 26383e12c..6f356939d 100644 --- a/with_test.go +++ b/with_test.go @@ -128,7 +128,7 @@ func TestWithCertificatesCached(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - db, err := newConnectionFromOptions(ctx, + db, err := driverFromOptions(ctx, append( test.options, withConnPool(conn.NewPool(context.Background(), config.New())), //nolint:contextcheck