Skip to content

Commit

Permalink
* Removed ydb.WithSessionPoolSizeLimit() option
Browse files Browse the repository at this point in the history
* Added async put session into pool if external context is done
  • Loading branch information
asmyasnikov committed Mar 20, 2024
1 parent 302f738 commit 8e40869
Show file tree
Hide file tree
Showing 39 changed files with 443 additions and 282 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Fixed logic of query session pool
* Changed initialization of internal driver clients to lazy
* Disabled the logic of background grpc-connection parking
* Removed `ydb.WithSessionPoolSizeLimit()` option
* Added async put session into pool if external context is done

## v3.58.2
* Added `trace.Query.OnSessionBegin` event
Expand Down
18 changes: 9 additions & 9 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) {

//nolint:cyclop, nonamedreturns
func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) {
ctx, driverCtxCancel := xcontext.WithCancel(xcontext.WithoutDeadline(ctx))
ctx, driverCtxCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
defer func() {
if err != nil {
driverCtxCancel()
Expand Down Expand Up @@ -401,7 +401,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
}

d.table = xsync.OnceValue(func() *internalTable.Client {
return internalTable.New(xcontext.WithoutDeadline(ctx),
return internalTable.New(xcontext.ValueOnly(ctx),
d.balancer,
tableConfig.New(
append(
Expand All @@ -416,7 +416,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
})

d.query = xsync.OnceValue(func() *internalQuery.Client {
return internalQuery.New(xcontext.WithoutDeadline(ctx),
return internalQuery.New(xcontext.ValueOnly(ctx),
d.balancer,
queryConfig.New(
append(
Expand All @@ -434,7 +434,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
}

d.scheme = xsync.OnceValue(func() *internalScheme.Client {
return internalScheme.New(xcontext.WithoutDeadline(ctx),
return internalScheme.New(xcontext.ValueOnly(ctx),
d.balancer,
schemeConfig.New(
append(
Expand All @@ -450,7 +450,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
})

d.coordination = xsync.OnceValue(func() *internalCoordination.Client {
return internalCoordination.New(xcontext.WithoutDeadline(ctx),
return internalCoordination.New(xcontext.ValueOnly(ctx),
d.balancer,
coordinationConfig.New(
append(
Expand All @@ -465,7 +465,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
})

d.ratelimiter = xsync.OnceValue(func() *internalRatelimiter.Client {
return internalRatelimiter.New(xcontext.WithoutDeadline(ctx),
return internalRatelimiter.New(xcontext.ValueOnly(ctx),
d.balancer,
ratelimiterConfig.New(
append(
Expand All @@ -480,7 +480,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
})

d.discovery = xsync.OnceValue(func() *internalDiscovery.Client {
return internalDiscovery.New(xcontext.WithoutDeadline(ctx),
return internalDiscovery.New(xcontext.ValueOnly(ctx),
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
Expand All @@ -499,7 +499,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
})

d.scripting = xsync.OnceValue(func() *internalScripting.Client {
return internalScripting.New(xcontext.WithoutDeadline(ctx),
return internalScripting.New(xcontext.ValueOnly(ctx),
d.balancer,
scriptingConfig.New(
append(
Expand All @@ -514,7 +514,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
})

d.topic = xsync.OnceValue(func() *topicclientinternal.Client {
return topicclientinternal.New(xcontext.WithoutDeadline(ctx),
return topicclientinternal.New(xcontext.ValueOnly(ctx),
d.balancer,
d.config.Credentials(),
append(
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func New(
}
// run background discovering
if d := discoveryConfig.Interval(); d > 0 {
b.discoveryRepeater = repeater.New(xcontext.WithoutDeadline(ctx),
b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx),
d, b.clusterDiscoveryAttempt,
repeater.WithName("discovery"),
repeater.WithTrace(b.driverConfig.Trace()),
Expand Down
2 changes: 1 addition & 1 deletion internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {

err = s.ClientStream.RecvMsg(m)

if err != nil {
if err != nil { //nolint:nestif
if xerrors.IsContextError(err) {
return xerrors.WithStackTrace(err)
}
Expand Down
30 changes: 15 additions & 15 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
if !xerrors.IsTransportError(cause,
grpcCodes.ResourceExhausted,
grpcCodes.Unavailable,
//grpcCodes.OK,
//grpcCodes.Canceled,
//grpcCodes.Unknown,
//grpcCodes.InvalidArgument,
//grpcCodes.DeadlineExceeded,
//grpcCodes.NotFound,
//grpcCodes.AlreadyExists,
//grpcCodes.PermissionDenied,
//grpcCodes.FailedPrecondition,
//grpcCodes.Aborted,
//grpcCodes.OutOfRange,
//grpcCodes.Unimplemented,
//grpcCodes.Internal,
//grpcCodes.DataLoss,
//grpcCodes.Unauthenticated,
// grpcCodes.OK,
// grpcCodes.Canceled,
// grpcCodes.Unknown,
// grpcCodes.InvalidArgument,
// grpcCodes.DeadlineExceeded,
// grpcCodes.NotFound,
// grpcCodes.AlreadyExists,
// grpcCodes.PermissionDenied,
// grpcCodes.FailedPrecondition,
// grpcCodes.Aborted,
// grpcCodes.OutOfRange,
// grpcCodes.Unimplemented,
// grpcCodes.Internal,
// grpcCodes.DataLoss,
// grpcCodes.Unauthenticated,
) {
return
}
Expand Down
Loading

0 comments on commit 8e40869

Please sign in to comment.