Skip to content

Commit

Permalink
Merge pull request #1476 from ydb-platform/limit-usage
Browse files Browse the repository at this point in the history
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
  • Loading branch information
asmyasnikov authored Sep 19, 2024
2 parents f4c4c8a + 2f69e79 commit a16161a
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 9 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
* refactored experimental topic iterators in topicsugar package
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
* Refactored experimental topic iterators in `topicsugar` package

## v3.80.9
* Fixed bug in experimental api: `ydb.ParamsBuilder().Param().Optional()` receive pointer and really produce optional value.
Expand Down
2 changes: 1 addition & 1 deletion internal/pool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func isRetriable(err error) bool {

switch {
case
xerrors.Is(err, errPoolIsOverflow),
xerrors.Is(err, errPoolIsOverflow, errItemIsNotAlive, errNoProgress),
xerrors.IsRetryableError(err),
xerrors.IsOperationError(err, Ydb.StatusIds_OVERLOADED),
xerrors.IsTransportError(
Expand Down
21 changes: 17 additions & 4 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ type (
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleTimeToLive time.Duration
itemUsageLimit uint64
}
itemInfo[PT ItemConstraint[T], T any] struct {
idle *xlist.Element[PT]
lastUsage time.Time
idle *xlist.Element[PT]
lastUsage time.Time
useCounter *uint64
}
waitChPool[PT ItemConstraint[T], T any] interface {
GetOrNew() *chan PT
Expand Down Expand Up @@ -93,6 +95,12 @@ func WithLimit[PT ItemConstraint[T], T any](size int) Option[PT, T] {
}
}

func WithItemUsageLimit[PT ItemConstraint[T], T any](itemUsageLimit uint64) Option[PT, T] {
return func(c *Config[PT, T]) {
c.itemUsageLimit = itemUsageLimit
}
}

func WithTrace[PT ItemConstraint[T], T any](t *Trace) Option[PT, T] {
return func(c *Config[PT, T]) {
c.trace = t
Expand Down Expand Up @@ -217,8 +225,10 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
newItem, err := p.config.createItem(createCtx)
if newItem != nil {
p.mu.WithLock(func() {
var useCounter uint64
p.index[newItem] = itemInfo[PT, T]{
lastUsage: p.config.clock.Now(),
lastUsage: p.config.clock.Now(),
useCounter: &useCounter,
}
})
}
Expand Down Expand Up @@ -592,10 +602,13 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
panic("no index for item")
}

*info.useCounter++

return info
})

if p.config.idleTimeToLive > 0 && p.config.clock.Since(info.lastUsage) > p.config.idleTimeToLive {
if (p.config.itemUsageLimit > 0 && *info.useCounter > p.config.itemUsageLimit) ||
(p.config.idleTimeToLive > 0 && p.config.clock.Since(info.lastUsage) > p.config.idleTimeToLive) {
p.closeItem(ctx, item)
p.mu.WithLock(func() {
p.changeState(func() Stats {
Expand Down
29 changes: 28 additions & 1 deletion internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func mustClose(t testing.TB, pool closer.Closer) {
}
}

func TestPool(t *testing.T) {
func TestPool(t *testing.T) { //nolint:gocyclo
rootCtx := xtest.Context(t)
t.Run("New", func(t *testing.T) {
t.Run("Default", func(t *testing.T) {
Expand All @@ -177,6 +177,33 @@ func TestPool(t *testing.T) {
)
require.EqualValues(t, 1, p.config.limit)
})
t.Run("WithItemUsageLimit", func(t *testing.T) {
var newCounter int64
p := New[*testItem, testItem](rootCtx,
WithLimit[*testItem, testItem](1),
WithItemUsageLimit[*testItem, testItem](5),
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
WithCreateItemFunc(func(context.Context) (*testItem, error) {
atomic.AddInt64(&newCounter, 1)

var v testItem

return &v, nil
}),
)
require.EqualValues(t, 1, p.config.limit)
var lambdaCounter int64
err := p.With(rootCtx, func(ctx context.Context, item *testItem) error {
if atomic.AddInt64(&lambdaCounter, 1) < 10 {
return xerrors.Retryable(errors.New("test"))
}

return nil
})
require.NoError(t, err)
require.EqualValues(t, 2, newCounter)
})
t.Run("WithCreateItemFunc", func(t *testing.T) {
var newCounter int64
p := New(rootCtx,
Expand Down
1 change: 1 addition & 0 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
done: make(chan struct{}),
pool: pool.New(ctx,
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
pool.WithItemUsageLimit[*Session, Session](cfg.PoolSessionUsageLimit()),
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
Expand Down
7 changes: 6 additions & 1 deletion internal/query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const (
type Config struct {
config.Common

poolLimit int
poolLimit int
poolSessionUsageLimit uint64

sessionCreateTimeout time.Duration
sessionDeleteTimeout time.Duration
Expand Down Expand Up @@ -60,6 +61,10 @@ func (c *Config) PoolLimit() int {
return c.poolLimit
}

func (c *Config) PoolSessionUsageLimit() uint64 {
return c.poolSessionUsageLimit
}

// SessionCreateTimeout limits maximum time spent on Create session request
func (c *Config) SessionCreateTimeout() time.Duration {
return c.sessionCreateTimeout
Expand Down
6 changes: 6 additions & 0 deletions internal/query/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func WithPoolLimit(size int) Option {
}
}

func WithPoolSessionUsageLimit(sessionUsageLimit uint64) Option {
return func(c *Config) {
c.poolSessionUsageLimit = sessionUsageLimit
}
}

// WithSessionCreateTimeout limits maximum time spent on Create session request
// If sessionCreateTimeout is less than or equal to zero then no used timeout on create session request
func WithSessionCreateTimeout(createSessionTimeout time.Duration) Option {
Expand Down
1 change: 1 addition & 0 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
},
pool: pool.New[*session, session](ctx,
pool.WithLimit[*session, session](config.SizeLimit()),
pool.WithItemUsageLimit[*session, session](config.SessionUsageLimit()),
pool.WithIdleTimeToLive[*session, session](config.IdleThreshold()),
pool.WithCreateItemTimeout[*session, session](config.CreateSessionTimeout()),
pool.WithCloseItemTimeout[*session, session](config.DeleteTimeout()),
Expand Down
13 changes: 12 additions & 1 deletion internal/table/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func WithSizeLimit(sizeLimit int) Option {
}
}

func WithPoolSessionUsageLimit(sessionUsageLimit uint64) Option {
return func(c *Config) {
c.sessionUsageLimit = sessionUsageLimit
}
}

// WithKeepAliveMinSize defines lower bound for sessions in the pool. If there are more sessions open, then
// the excess idle ones will be closed and removed after IdleKeepAliveThreshold is reached for each of them.
// If keepAliveMinSize is less than zero, then no sessions will be preserved
Expand Down Expand Up @@ -159,7 +165,8 @@ func WithClock(clock clockwork.Clock) Option {
type Config struct {
config.Common

sizeLimit int
sizeLimit int
sessionUsageLimit uint64

createSessionTimeout time.Duration
deleteTimeout time.Duration
Expand Down Expand Up @@ -189,6 +196,10 @@ func (c *Config) SizeLimit() int {
return c.sizeLimit
}

func (c *Config) SessionUsageLimit() uint64 {
return c.sessionUsageLimit
}

// KeepAliveMinSize is a lower bound for sessions in the pool. If there are more sessions open, then
// the excess idle ones will be closed and removed after IdleKeepAliveThreshold is reached for each of them.
// If KeepAliveMinSize is less than zero, then no sessions will be preserved
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,16 @@ func WithSessionPoolSizeLimit(sizeLimit int) Option {
}
}

// WithSessionPoolSessionUsageLimit set max count for use session
func WithSessionPoolSessionUsageLimit(sessionUsageLimit uint64) Option {
return func(ctx context.Context, d *Driver) error {
d.tableOptions = append(d.tableOptions, tableConfig.WithPoolSessionUsageLimit(sessionUsageLimit))
d.queryOptions = append(d.queryOptions, queryConfig.WithPoolSessionUsageLimit(sessionUsageLimit))

return nil
}
}

func WithLazyTx(lazyTx bool) Option {
return func(ctx context.Context, d *Driver) error {
d.queryOptions = append(d.queryOptions, queryConfig.WithLazyTx(lazyTx))
Expand Down

0 comments on commit a16161a

Please sign in to comment.