Skip to content

Commit

Permalink
* Removed ydb.WithSessionPoolSizeLimit() option
Browse files Browse the repository at this point in the history
* Added async put session into pool if external context is done
  • Loading branch information
asmyasnikov committed Mar 20, 2024
1 parent 302f738 commit 6c8a4ec
Show file tree
Hide file tree
Showing 18 changed files with 209 additions and 118 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Fixed logic of query session pool
* Changed initialization of internal driver clients to lazy
* Disabled the logic of background grpc-connection parking
* Removed `ydb.WithSessionPoolSizeLimit()` option
* Added async put session into pool if external context is done

## v3.58.2
* Added `trace.Query.OnSessionBegin` event
Expand Down
2 changes: 1 addition & 1 deletion internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {

err = s.ClientStream.RecvMsg(m)

if err != nil {
if err != nil { //nolint:nestif
if xerrors.IsContextError(err) {
return xerrors.WithStackTrace(err)
}
Expand Down
30 changes: 15 additions & 15 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
if !xerrors.IsTransportError(cause,
grpcCodes.ResourceExhausted,
grpcCodes.Unavailable,
//grpcCodes.OK,
//grpcCodes.Canceled,
//grpcCodes.Unknown,
//grpcCodes.InvalidArgument,
//grpcCodes.DeadlineExceeded,
//grpcCodes.NotFound,
//grpcCodes.AlreadyExists,
//grpcCodes.PermissionDenied,
//grpcCodes.FailedPrecondition,
//grpcCodes.Aborted,
//grpcCodes.OutOfRange,
//grpcCodes.Unimplemented,
//grpcCodes.Internal,
//grpcCodes.DataLoss,
//grpcCodes.Unauthenticated,
// grpcCodes.OK,
// grpcCodes.Canceled,
// grpcCodes.Unknown,
// grpcCodes.InvalidArgument,
// grpcCodes.DeadlineExceeded,
// grpcCodes.NotFound,
// grpcCodes.AlreadyExists,
// grpcCodes.PermissionDenied,
// grpcCodes.FailedPrecondition,
// grpcCodes.Aborted,
// grpcCodes.OutOfRange,
// grpcCodes.Unimplemented,
// grpcCodes.Internal,
// grpcCodes.DataLoss,
// grpcCodes.Unauthenticated,
) {
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
var (
errClosedPool = errors.New("closed pool")
errPoolOverflow = xerrors.Retryable(errors.New("pool overflow"))
errItemIsNotAlive = errors.New("item is not alive")
errItemIsNotAlive = xerrors.Retryable(errors.New("item is not alive"), xerrors.InvalidRetryObject())
)
211 changes: 152 additions & 59 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package pool

import (
"context"
"sync/atomic"
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
Expand All @@ -26,13 +27,14 @@ type (
trace *Trace
limit int

create func(ctx context.Context) (PT, error)
create func(ctx context.Context) (PT, error)
createTimeout time.Duration
closeTimeout time.Duration

mu xsync.Mutex
idle []PT
index map[PT]struct{}

done atomic.Bool
done chan struct{}

stats *Stats
}
Expand All @@ -53,6 +55,18 @@ func WithCreateFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error))
}
}

func WithCreateItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] {
return func(p *Pool[PT, T]) {
p.createTimeout = t
}
}

func WithCloseItemTimeout[PT Item[T], T any](t time.Duration) option[PT, T] {
return func(p *Pool[PT, T]) {
p.closeTimeout = t
}
}

func WithLimit[PT Item[T], T any](size int) option[PT, T] {
return func(p *Pool[PT, T]) {
p.limit = size
Expand All @@ -77,6 +91,7 @@ func New[PT Item[T], T any](

return &item, nil
},
done: make(chan struct{}),
}

for _, opt := range opts {
Expand All @@ -96,6 +111,78 @@ func New[PT Item[T], T any](
})
}()

create := p.create

p.create = func(ctx context.Context) (PT, error) {
var (
createCtx = xcontext.WithoutDeadline(ctx)
cancelCreate context.CancelFunc
)
if d := p.createTimeout; d > 0 {
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
} else {
createCtx, cancelCreate = xcontext.WithCancel(createCtx)
}
defer cancelCreate()

var (
ch = make(chan PT)
createErr error
)
go func() {
defer close(ch)
createErr = func() error {
newItem, err := create(createCtx)
if err != nil {
return xerrors.WithStackTrace(err)
}

needCloseItem := true
defer func() {
if needCloseItem {
_ = newItem.Close(createCtx)
}
}()

select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)

case <-ctx.Done():
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) >= p.limit {
return xerrors.WithStackTrace(errPoolOverflow)
}

p.idle = append(p.idle, newItem)
p.index[newItem] = struct{}{}
needCloseItem = false

return xerrors.WithStackTrace(ctx.Err())

case ch <- newItem:
needCloseItem = false

return nil
}
}()
}()

select {
case <-p.done:
return nil, xerrors.WithStackTrace(errClosedPool)
case <-ctx.Done():
return nil, xerrors.WithStackTrace(ctx.Err())
case item, has := <-ch:
if !has {
return nil, xerrors.WithStackTrace(createErr)
}

return item, nil
}
}
p.idle = make([]PT, 0, p.limit)
p.index = make(map[PT]struct{}, p.limit)
p.stats = &Stats{
Expand Down Expand Up @@ -127,57 +214,62 @@ func (p *Pool[PT, T]) get(ctx context.Context) (_ PT, finalErr error) {
return nil, xerrors.WithStackTrace(err)
}

if p.done.Load() {
select {
case <-p.done:
return nil, xerrors.WithStackTrace(errClosedPool)
}
default:
var item PT
p.mu.WithLock(func() {
if len(p.idle) > 0 {
item, p.idle = p.idle[0], p.idle[1:]
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Idle--

var item PT
p.mu.WithLock(func() {
if len(p.idle) > 0 {
item, p.idle = p.idle[0], p.idle[1:]
return v
})
}
})

if item != nil {
if item.IsAlive() {
return item, nil
}
_ = item.Close(ctx)
p.mu.WithLock(func() {
delete(p.index, item)
})
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Idle--
v.Index--

return v
})
}
})

if item != nil {
if item.IsAlive() {
return item, nil
p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) >= p.limit {
return nil, xerrors.WithStackTrace(errPoolOverflow)
}
_ = item.Close(ctx)
p.mu.WithLock(func() {
delete(p.index, item)
})

item, err := p.create(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

if item == nil {
panic("")
}

p.index[item] = struct{}{}
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Index--
v.Index++

return v
})
}

p.mu.Lock()
defer p.mu.Unlock()

if len(p.index) == p.limit {
return nil, xerrors.WithStackTrace(errPoolOverflow)
return item, nil
}

item, err := p.create(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

p.index[item] = struct{}{}
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Index++

return v
})

return item, nil
}

func (p *Pool[PT, T]) put(ctx context.Context, item PT) (finalErr error) {
Expand All @@ -195,35 +287,36 @@ func (p *Pool[PT, T]) put(ctx context.Context, item PT) (finalErr error) {
return xerrors.WithStackTrace(err)
}

if p.done.Load() {
select {
case <-p.done:
return xerrors.WithStackTrace(errClosedPool)
}
default:
if !item.IsAlive() {
_ = item.Close(ctx)

p.mu.WithLock(func() {
delete(p.index, item)
})
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Index--

if !item.IsAlive() {
_ = item.Close(ctx)
return v
})

return xerrors.WithStackTrace(errItemIsNotAlive)
}

p.mu.WithLock(func() {
delete(p.index, item)
p.idle = append(p.idle, item)
})
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Index--
v.Idle++

return v
})

return xerrors.WithStackTrace(errItemIsNotAlive)
return nil
}

p.mu.WithLock(func() {
p.idle = append(p.idle, item)
})
p.stats.Change(func(v stats.Stats) stats.Stats {
v.Idle++

return v
})

return nil
}

func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
Expand Down Expand Up @@ -320,7 +413,7 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
})
}()

p.done.Store(true)
close(p.done)

p.mu.Lock()
defer p.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestPool(t *testing.T) {
var (
newItems int64
deleteItems int64
expErr = xerrors.Retryable(errors.New("expected error"), xerrors.WithDeleteSession())
expErr = xerrors.Retryable(errors.New("expected error"), xerrors.InvalidRetryObject())
)
p := New(rootCtx,
WithLimit[*testItem, testItem](1),
Expand Down
13 changes: 8 additions & 5 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,18 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
pool.WithCreateFunc(func(ctx context.Context) (_ *Session, err error) {
var cancel context.CancelFunc
var (
createSessionCtx context.Context
cancelCreateSession context.CancelFunc
)
if d := cfg.SessionCreateTimeout(); d > 0 {
ctx, cancel = xcontext.WithTimeout(ctx, d)
createSessionCtx, cancelCreateSession = xcontext.WithTimeout(ctx, d)
} else {
ctx, cancel = xcontext.WithCancel(ctx)
createSessionCtx, cancelCreateSession = xcontext.WithCancel(ctx)
}
defer cancel()
defer cancelCreateSession()

s, err := createSession(ctx,
s, err := createSession(createSessionCtx,
client.grpcClient,
withSessionTrace(cfg.Trace()),
withSessionCheck(func(s *Session) bool {
Expand Down
Loading

0 comments on commit 6c8a4ec

Please sign in to comment.