Skip to content

Commit

Permalink
Merge pull request #1260 from ydb-platform/retry-with-result
Browse files Browse the repository at this point in the history
Added `retry.DoWithResult` and `retry.DoTxWithResult`
  • Loading branch information
asmyasnikov authored Jun 6, 2024
2 parents eedd068 + 527bfcd commit 9927a47
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 62 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added experimental `retry.DoWithResult` and `retry.DoTxWithResult` helpers for retry lambda and return value from lambda

## v3.72.0
* Excluded `Query()` method from interface `ydb.Connection`. Method `Query()` remains accessible from `ydb.Driver`

Expand Down
20 changes: 16 additions & 4 deletions internal/query/scanner/named.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,25 @@ type (
NamedScanner struct {
data *data
}
NamedDestination struct {
namedDestination struct {
name string
ref interface{}
}
NamedDestination interface {
Name() string
Ref() interface{}
}
)

func NamedRef(columnName string, destinationValueReference interface{}) (dst NamedDestination) {
func (dst namedDestination) Name() string {
return dst.name
}

func (dst namedDestination) Ref() interface{} {
return dst.ref
}

func NamedRef(columnName string, destinationValueReference interface{}) (dst namedDestination) {
if columnName == "" {
panic("columnName must be not empty")
}
Expand All @@ -40,11 +52,11 @@ func Named(data *data) NamedScanner {

func (s NamedScanner) ScanNamed(dst ...NamedDestination) (err error) {
for i := range dst {
v, err := s.data.seekByName(dst[i].name)
v, err := s.data.seekByName(dst[i].Name())
if err != nil {
return xerrors.WithStackTrace(err)
}
if err = value.CastTo(v, dst[i].ref); err != nil {
if err = value.CastTo(v, dst[i].Ref()); err != nil {
return xerrors.WithStackTrace(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/query/scanner/named_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,25 +618,25 @@ func TestNamedRef(t *testing.T) {
{
name: "",
ref: nil,
dst: NamedDestination{},
dst: namedDestination{},
panic: true,
},
{
name: "nil_ref",
ref: nil,
dst: NamedDestination{},
dst: namedDestination{},
panic: true,
},
{
name: "not_ref",
ref: 123,
dst: NamedDestination{},
dst: namedDestination{},
panic: true,
},
{
name: "int_ptr",
ref: func(v int) *int { return &v }(123),
dst: NamedDestination{
dst: namedDestination{
name: "int_ptr",
ref: func(v int) *int { return &v }(123),
},
Expand All @@ -649,7 +649,7 @@ func TestNamedRef(t *testing.T) {

return &vv
}(123),
dst: NamedDestination{
dst: namedDestination{
name: "int_dbl_ptr",
ref: func(v int) **int {
vv := &v
Expand Down
17 changes: 10 additions & 7 deletions query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,31 @@ type (
}
ResultSet interface {
Columns() []string
ColumnTypes() []types.Type
ColumnTypes() []Type
NextRow(ctx context.Context) (Row, error)
}
Row interface {
Scan(dst ...interface{}) error
ScanNamed(dst ...scanner.NamedDestination) error
ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) error
ScanNamed(dst ...NamedDestination) error
ScanStruct(dst interface{}, opts ...ScanStructOption) error
}
Type = types.Type
NamedDestination = scanner.NamedDestination
ScanStructOption = scanner.ScanStructOption
)

func Named(columnName string, destinationValueReference interface{}) (dst scanner.NamedDestination) {
func Named(columnName string, destinationValueReference interface{}) (dst NamedDestination) {
return scanner.NamedRef(columnName, destinationValueReference)
}

func WithScanStructTagName(name string) scanner.ScanStructOption {
func WithScanStructTagName(name string) ScanStructOption {
return scanner.WithTagName(name)
}

func WithScanStructAllowMissingColumnsFromSelect() scanner.ScanStructOption {
func WithScanStructAllowMissingColumnsFromSelect() ScanStructOption {
return scanner.WithAllowMissingColumnsFromSelect()
}

func WithScanStructAllowMissingFieldsInStruct() scanner.ScanStructOption {
func WithScanStructAllowMissingFieldsInStruct() ScanStructOption {
return scanner.WithAllowMissingFieldsInStruct()
}
56 changes: 31 additions & 25 deletions retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,8 @@ func WithPanicCallback(panicCallback func(e interface{})) panicCallbackOption {
// Warning: if context without deadline or cancellation func was passed, Retry will work infinitely.
//
// # If you need to retry your op func on some logic errors - you must return RetryableError() from retryOperation
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr error) {
_, err := RetryWithResult[struct{}](ctx, func(ctx context.Context) (*struct{}, error) {
_, err := RetryWithResult[*struct{}](ctx, func(ctx context.Context) (*struct{}, error) {
err := op(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand All @@ -285,19 +283,22 @@ func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr err
//
// Warning: if context without deadline or cancellation func was passed, RetryWithResult will work infinitely.
//
// If you need to retry your op func on some logic errors - you must return RetryableError() from retryOperation
// # If you need to retry your op func on some logic errors - you must return RetryableError() from retryOperation
//
//nolint:funlen
func RetryWithResult[T any](ctx context.Context, //nolint:revive
op func(context.Context) (*T, error), opts ...Option,
) (v *T, finalErr error) {
options := &retryOptions{
call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/retry.RetryWithResult"),
trace: &trace.Retry{},
budget: budget.Limited(-1),
fastBackoff: backoff.Fast,
slowBackoff: backoff.Slow,
}
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func RetryWithResult[T any](ctx context.Context, //nolint:revive,funlen
op func(context.Context) (T, error), opts ...Option,
) (_ T, finalErr error) {
var (
zeroValue T
options = &retryOptions{
call: stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/retry.RetryWithResult"),
trace: &trace.Retry{},
budget: budget.Limited(-1),
fastBackoff: backoff.Fast,
slowBackoff: backoff.Slow,
}
)
for _, opt := range opts {
if opt != nil {
opt.ApplyRetryOption(options)
Expand Down Expand Up @@ -332,13 +333,12 @@ func RetryWithResult[T any](ctx context.Context, //nolint:revive
attempts++
select {
case <-ctx.Done():
return nil, xerrors.WithStackTrace(
return zeroValue, xerrors.WithStackTrace(
fmt.Errorf("retry failed on attempt No.%d: %w", attempts, ctx.Err()),
)

default:
var err error
v, err = opWithRecover(ctx, options, op)
v, err := opWithRecover(ctx, options, op)

if err == nil {
return v, nil
Expand All @@ -353,7 +353,7 @@ func RetryWithResult[T any](ctx context.Context, //nolint:revive
code = m.StatusCode()

if !m.MustRetry(options.idempotent) {
return nil, xerrors.WithStackTrace(
return zeroValue, xerrors.WithStackTrace(
fmt.Errorf("non-retryable error occurred on attempt No.%d (idempotent=%v): %w",
attempts, options.idempotent, err),
)
Expand All @@ -368,7 +368,7 @@ func RetryWithResult[T any](ctx context.Context, //nolint:revive
case <-ctx.Done():
t.Stop()

return nil, xerrors.WithStackTrace(
return zeroValue, xerrors.WithStackTrace(
xerrors.Join(
fmt.Errorf("attempt No.%d: %w", attempts, ctx.Err()),
err,
Expand All @@ -378,7 +378,7 @@ func RetryWithResult[T any](ctx context.Context, //nolint:revive
t.Stop()

if acquireErr := options.budget.Acquire(ctx); acquireErr != nil {
return nil, xerrors.WithStackTrace(
return zeroValue, xerrors.WithStackTrace(
xerrors.Join(
fmt.Errorf("attempt No.%d: %w", attempts, budget.ErrNoQuota),
acquireErr,
Expand All @@ -392,20 +392,26 @@ func RetryWithResult[T any](ctx context.Context, //nolint:revive
}

func opWithRecover[T any](ctx context.Context,
options *retryOptions, op func(context.Context) (*T, error),
) (_ *T, err error) {
options *retryOptions, op func(context.Context) (T, error),
) (_ T, finalErr error) {
var zeroValue T
if options.panicCallback != nil {
defer func() {
if e := recover(); e != nil {
options.panicCallback(e)
err = xerrors.WithStackTrace(
finalErr = xerrors.WithStackTrace(
fmt.Errorf("panic recovered: %v", e),
)
}
}()
}

return op(ctx)
v, err := op(ctx)
if err != nil {
return zeroValue, xerrors.WithStackTrace(err)
}

return v, nil
}

// Check returns retry mode for queryErr.
Expand Down
Loading

0 comments on commit 9927a47

Please sign in to comment.