From c6a3dbefb83e34a94d8338dfb9d8de4a9e5886d1 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 May 2024 16:06:53 +0300 Subject: [PATCH 1/3] * Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors) --- CHANGELOG.md | 1 + internal/table/client.go | 14 -------------- internal/table/transaction.go | 18 ++++++++++++++++++ retry/sql.go | 16 +++------------- 4 files changed, 22 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92b8a63b2..f98db979a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v3.67.1 * Fixed race of stop internal processes on close topic writer * Fixed goroutines leak within topic reader on network problems +* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors) ## v3.67.0 * Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response diff --git a/internal/table/client.go b/internal/table/client.go index 4965b8b6f..57e20a827 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -704,20 +704,6 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O return xerrors.WithStackTrace(err) } - defer func() { - if err != nil { - errRollback := tx.Rollback(ctx) - if errRollback != nil { - err = xerrors.NewWithIssues("", - xerrors.WithStackTrace(err), - xerrors.WithStackTrace(errRollback), - ) - } else { - err = xerrors.WithStackTrace(err) - } - } - }() - err = func() error { if panicCallback := c.config.PanicCallback(); panicCallback != nil { defer func() { diff --git a/internal/table/transaction.go b/internal/table/transaction.go index 3289d45cc..e4a98475e 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -22,6 +22,7 @@ import ( var ( errTxAlreadyCommitted = xerrors.Wrap(fmt.Errorf("transaction already committed")) errTxRollbackedEarly = xerrors.Wrap(fmt.Errorf("transaction rollbacked early")) + errTxFailedEarly = xerrors.Wrap(fmt.Errorf("transaction failed early")) ) type txState struct { @@ -42,6 +43,7 @@ const ( txStateInitialized txStateEnum = iota txStateCommitted txStateRollbacked + txStateFailed ) type transaction struct { @@ -73,11 +75,15 @@ func (tx *transaction) Execute( switch tx.state.Load() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) + case txStateFailed: + return nil, xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return nil, xerrors.WithStackTrace(errTxRollbackedEarly) default: _, r, err = tx.s.Execute(ctx, tx.control, query, parameters, opts...) if err != nil { + tx.state.Store(txStateFailed) + return nil, xerrors.WithStackTrace(err) } @@ -115,11 +121,15 @@ func (tx *transaction) ExecuteStatement( switch tx.state.Load() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) + case txStateFailed: + return nil, xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return nil, xerrors.WithStackTrace(errTxRollbackedEarly) default: _, r, err = stmt.Execute(ctx, tx.control, parameters, opts...) if err != nil { + tx.state.Store(txStateFailed) + return nil, xerrors.WithStackTrace(err) } @@ -148,6 +158,8 @@ func (tx *transaction) CommitTx( switch tx.state.Load() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) + case txStateFailed: + return nil, xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return nil, xerrors.WithStackTrace(errTxRollbackedEarly) default: @@ -174,6 +186,8 @@ func (tx *transaction) CommitTx( response, err = tx.s.tableService.CommitTransaction(ctx, request) if err != nil { + tx.state.Store(txStateFailed) + return nil, xerrors.WithStackTrace(err) } @@ -206,6 +220,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { switch tx.state.Load() { case txStateCommitted: return nil // nop for committed tx + case txStateFailed: + return xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return xerrors.WithStackTrace(errTxRollbackedEarly) default: @@ -222,6 +238,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { }, ) if err != nil { + tx.state.Store(txStateFailed) + return xerrors.WithStackTrace(err) } diff --git a/retry/sql.go b/retry/sql.go index 5bf974351..fca156431 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -161,26 +161,16 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err } err := Retry(ctx, func(ctx context.Context) (finalErr error) { attempts++ + tx, err := db.BeginTx(ctx, options.txOptions) if err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) } - defer func() { - if finalErr == nil { - return - } - errRollback := tx.Rollback() - if errRollback == nil { - return - } - finalErr = xerrors.NewWithIssues("", - xerrors.WithStackTrace(finalErr), - xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)), - ) - }() + if err = op(xcontext.MarkRetryCall(ctx), tx); err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) } + if err = tx.Commit(); err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) } From aac82162355cfc71e8766d0f45debc50f35b0a9e Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 17 May 2024 20:54:10 +0300 Subject: [PATCH 2/3] fixes --- internal/table/client.go | 41 +++++++++++++++++++++++++++++++----- internal/table/retry.go | 40 ----------------------------------- internal/table/retry_test.go | 32 ++++++---------------------- retry/sql.go | 5 +++++ 4 files changed, 47 insertions(+), 71 deletions(-) diff --git a/internal/table/client.go b/internal/table/client.go index 57e20a827..90bedc239 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -666,9 +666,30 @@ func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Optio onDone(attempts, finalErr) }() - err := do(ctx, c, c.config, op, func(err error) { - attempts++ - }, config.RetryOptions...) + err := retryBackoff(ctx, c, + func(ctx context.Context, s table.Session) (err error) { + attempts++ + + err = func() error { + if panicCallback := c.config.PanicCallback(); panicCallback != nil { + defer func() { + if e := recover(); e != nil { + panicCallback(e) + } + }() + } + + return op(xcontext.MarkRetryCall(ctx), s) + }() + + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + }, + config.RetryOptions..., + ) if err != nil { return xerrors.WithStackTrace(err) } @@ -695,14 +716,19 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O onDone(attempts, finalErr) }() - return retryBackoff(ctx, c, - func(ctx context.Context, s table.Session) (err error) { + err := retryBackoff(ctx, c, + func(ctx context.Context, s table.Session) (finalErr error) { attempts++ tx, err := s.BeginTransaction(ctx, config.TxSettings) if err != nil { return xerrors.WithStackTrace(err) } + defer func() { + if finalErr != nil { + _ = tx.Rollback(ctx) + } + }() err = func() error { if panicCallback := c.config.PanicCallback(); panicCallback != nil { @@ -729,6 +755,11 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O }, config.RetryOptions..., ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil } func (c *Client) internalPoolGCTick(ctx context.Context, idleThreshold time.Duration) { diff --git a/internal/table/retry.go b/internal/table/retry.go index e2b522b45..92d95bacf 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -3,8 +3,6 @@ package table import ( "context" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/table" @@ -22,44 +20,6 @@ type SessionProvider interface { Put(ctx context.Context, s *session) (err error) } -func do( - ctx context.Context, - c SessionProvider, - config *config.Config, - op table.Operation, - onAttempt func(err error), - opts ...retry.Option, -) (err error) { - return retryBackoff(ctx, c, - func(ctx context.Context, s table.Session) (err error) { - defer func() { - if onAttempt != nil { - onAttempt(err) - } - }() - - err = func() error { - if panicCallback := config.PanicCallback(); panicCallback != nil { - defer func() { - if e := recover(); e != nil { - panicCallback(e) - } - }() - } - - return op(xcontext.MarkRetryCall(ctx), s) - }() - - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }, - opts..., - ) -} - func retryBackoff( ctx context.Context, p SessionProvider, diff --git a/internal/table/retry_test.go b/internal/table/retry_test.go index 008633786..332126360 100644 --- a/internal/table/retry_test.go +++ b/internal/table/retry_test.go @@ -11,7 +11,6 @@ import ( grpcCodes "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand" @@ -41,12 +40,10 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) { ctx, cancel := xcontext.WithCancel(context.Background()) results := make(chan error) go func() { - err := do(ctx, p, - config.New(), + err := retryBackoff(ctx, p, func(ctx context.Context, _ table.Session) error { return testErr }, - nil, retry.WithFastBackoff( testutil.BackoffFunc(func(n int) <-chan time.Time { ch := make(chan time.Time) @@ -103,7 +100,7 @@ func TestRetryerBadSession(t *testing.T) { sessions []table.Session ) ctx, cancel := xcontext.WithCancel(context.Background()) - err := do(ctx, p, config.New(), + err := retryBackoff(ctx, p, func(ctx context.Context, s table.Session) error { sessions = append(sessions, s) i++ @@ -115,7 +112,6 @@ func TestRetryerBadSession(t *testing.T) { xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION), ) }, - func(err error) {}, ) if !xerrors.Is(err, context.Canceled) { t.Errorf("unexpected error: %v", err) @@ -154,17 +150,13 @@ func TestRetryerSessionClosing(t *testing.T) { } var sessions []table.Session for i := 0; i < 1000; i++ { - err := do( - context.Background(), - p, - config.New(), + err := retryBackoff(context.Background(), p, func(ctx context.Context, s table.Session) error { sessions = append(sessions, s) s.(*session).SetStatus(table.SessionClosing) return nil }, - nil, ) if err != nil { t.Errorf("unexpected error: %v", err) @@ -208,14 +200,10 @@ func TestRetryerImmediateReturn(t *testing.T) { p := SingleSession( simpleSession(t), ) - err := do( - context.Background(), - p, - config.New(), + err := retryBackoff(context.Background(), p, func(ctx context.Context, _ table.Session) error { return testErr }, - nil, retry.WithFastBackoff( testutil.BackoffFunc(func(n int) <-chan time.Time { panic("this code will not be called") @@ -341,10 +329,7 @@ func TestRetryContextDeadline(t *testing.T) { t.Run(fmt.Sprintf("Timeout=%v,Sleep=%v", timeout, sleep), func(t *testing.T) { ctx, cancel := xcontext.WithTimeout(context.Background(), timeout) defer cancel() - _ = do( - ctx, - p, - config.New(), + _ = retryBackoff(ctx, p, func(ctx context.Context, _ table.Session) error { select { case <-ctx.Done(): @@ -353,7 +338,6 @@ func TestRetryContextDeadline(t *testing.T) { return errs[r.Int(len(errs))] } }, - nil, ) }) } @@ -442,10 +426,7 @@ func TestRetryWithCustomErrors(t *testing.T) { i = 0 sessions = make(map[table.Session]int) ) - err := do( - ctx, - p, - config.New(), + err := retryBackoff(ctx, p, func(ctx context.Context, s table.Session) (err error) { sessions[s]++ i++ @@ -455,7 +436,6 @@ func TestRetryWithCustomErrors(t *testing.T) { return nil }, - nil, ) //nolint:nestif if test.retriable { diff --git a/retry/sql.go b/retry/sql.go index fca156431..aa263cee6 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -166,6 +166,11 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err if err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) } + defer func() { + if finalErr != nil { + _ = tx.Rollback() + } + }() if err = op(xcontext.MarkRetryCall(ctx), tx); err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) From f02913a26dff2f0f7f3026a4bba1341483d075dc Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 17 May 2024 22:36:32 +0300 Subject: [PATCH 3/3] Apply suggestions from code review --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f98db979a..5f8a46620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ +* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors) + ## v3.67.1 * Fixed race of stop internal processes on close topic writer * Fixed goroutines leak within topic reader on network problems -* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors) ## v3.67.0 * Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response