Skip to content

Commit

Permalink
replaced table client pool to internal/pool
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Sep 6, 2024
1 parent 2b869c5 commit f36b8f5
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 1,847 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Replaced internal table client pool entities to `internal/pool`

## v3.79.2
* Enabled by default usage of `internal/pool` in `internal/query.Client`

Expand Down
6 changes: 4 additions & 2 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ func WithCreateItemFunc[PT Item[T], T any](f func(ctx context.Context) (PT, erro
}
}

func withCloseItemFunc[PT Item[T], T any](f func(ctx context.Context, item PT)) option[PT, T] {
func WithSyncCloseItem[PT Item[T], T any]() option[PT, T] {
return func(c *Config[PT, T]) {
c.closeItem = f
c.closeItem = func(ctx context.Context, item PT) {
_ = item.Close(ctx)
}
}
}

Expand Down
96 changes: 66 additions & 30 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
)

type (
Expand Down Expand Up @@ -229,9 +231,7 @@ func TestPool(t *testing.T) {
return &v, nil
}),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
WithTrace[*testItem, testItem](defaultTrace),
)

Expand Down Expand Up @@ -301,9 +301,7 @@ func TestPool(t *testing.T) {
}
p := New[*testItem, testItem](rootCtx,
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
WithLimit[*testItem, testItem](1),
WithTrace[*testItem, testItem](&Trace{
onWait: func() func(item any, err error) {
Expand Down Expand Up @@ -394,9 +392,7 @@ func TestPool(t *testing.T) {
}),
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
// replace default async closer for sync testing
withCloseItemFunc[*testItem, testItem](func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
WithClock[*testItem, testItem](fakeClock),
WithIdleThreshold[*testItem, testItem](idleThreshold),
WithTrace[*testItem, testItem](defaultTrace),
Expand Down Expand Up @@ -560,9 +556,7 @@ func TestPool(t *testing.T) {
defer cancel()
p := New[*testItem, testItem](rootCtx,
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
defer func() {
_ = p.Close(context.Background())
Expand Down Expand Up @@ -618,6 +612,60 @@ func TestPool(t *testing.T) {
})
})
})
t.Run("DoBackoffRetryCancelation", func(t *testing.T) {
for _, testErr := range []error{
// Errors leading to Wait repeat.
xerrors.Transport(
grpcStatus.Error(grpcCodes.ResourceExhausted, ""),
),
fmt.Errorf("wrap transport error: %w", xerrors.Transport(
grpcStatus.Error(grpcCodes.ResourceExhausted, ""),
)),
xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_OVERLOADED)),
fmt.Errorf("wrap op error: %w", xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_OVERLOADED))),
} {
t.Run("", func(t *testing.T) {
backoff := make(chan chan time.Time)
ctx, cancel := xcontext.WithCancel(context.Background())
p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1))

results := make(chan error)
go func() {
err := p.With(ctx,
func(ctx context.Context, item *testItem) error {
return testErr
},
retry.WithFastBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
backoff <- ch

return ch
}),
),
retry.WithSlowBackoff(
testutil.BackoffFunc(func(n int) <-chan time.Time {
ch := make(chan time.Time)
backoff <- ch

return ch
}),
),
)
results <- err
}()

select {
case <-backoff:
t.Logf("expected result")
case res := <-results:
t.Fatalf("unexpected result: %v", res)
}

cancel()
})
}
})
})
t.Run("Item", func(t *testing.T) {
t.Run("Close", func(t *testing.T) {
Expand All @@ -642,9 +690,7 @@ func TestPool(t *testing.T) {
return v, nil
}),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
return nil
Expand Down Expand Up @@ -684,9 +730,7 @@ func TestPool(t *testing.T) {
return v, nil
}),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
if newItems.Load() < 10 {
Expand Down Expand Up @@ -744,9 +788,7 @@ func TestPool(t *testing.T) {
return &v, nil
}),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
defer func() {
_ = p.Close(context.Background())
Expand Down Expand Up @@ -778,9 +820,7 @@ func TestPool(t *testing.T) {
p := New[*testItem, testItem](rootCtx,
WithTrace[*testItem, testItem](trace),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
r := xrand.New(xrand.WithLock())
var wg sync.WaitGroup
Expand Down Expand Up @@ -848,9 +888,7 @@ func TestPool(t *testing.T) {
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
item := mustGetItem(t, p)
if err := p.putItem(context.Background(), item); err != nil {
Expand All @@ -867,9 +905,7 @@ func TestPool(t *testing.T) {
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
// replace default async closer for sync testing
withCloseItemFunc(func(ctx context.Context, item *testItem) {
_ = item.Close(ctx)
}),
WithSyncCloseItem[*testItem, testItem](),
)
item := mustGetItem(t, p)
mustPutItem(t, p, item)
Expand Down
Loading

0 comments on commit f36b8f5

Please sign in to comment.