From c6a3dbefb83e34a94d8338dfb9d8de4a9e5886d1 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 May 2024 16:06:53 +0300 Subject: [PATCH] * Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors) --- CHANGELOG.md | 1 + internal/table/client.go | 14 -------------- internal/table/transaction.go | 18 ++++++++++++++++++ retry/sql.go | 16 +++------------- 4 files changed, 22 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92b8a63b2..f98db979a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v3.67.1 * Fixed race of stop internal processes on close topic writer * Fixed goroutines leak within topic reader on network problems +* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors) ## v3.67.0 * Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response diff --git a/internal/table/client.go b/internal/table/client.go index 4965b8b6f..57e20a827 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -704,20 +704,6 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O return xerrors.WithStackTrace(err) } - defer func() { - if err != nil { - errRollback := tx.Rollback(ctx) - if errRollback != nil { - err = xerrors.NewWithIssues("", - xerrors.WithStackTrace(err), - xerrors.WithStackTrace(errRollback), - ) - } else { - err = xerrors.WithStackTrace(err) - } - } - }() - err = func() error { if panicCallback := c.config.PanicCallback(); panicCallback != nil { defer func() { diff --git a/internal/table/transaction.go b/internal/table/transaction.go index 3289d45cc..e4a98475e 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -22,6 +22,7 @@ import ( var ( errTxAlreadyCommitted = xerrors.Wrap(fmt.Errorf("transaction already committed")) errTxRollbackedEarly = xerrors.Wrap(fmt.Errorf("transaction rollbacked early")) + errTxFailedEarly = xerrors.Wrap(fmt.Errorf("transaction failed early")) ) type txState struct { @@ -42,6 +43,7 @@ const ( txStateInitialized txStateEnum = iota txStateCommitted txStateRollbacked + txStateFailed ) type transaction struct { @@ -73,11 +75,15 @@ func (tx *transaction) Execute( switch tx.state.Load() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) + case txStateFailed: + return nil, xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return nil, xerrors.WithStackTrace(errTxRollbackedEarly) default: _, r, err = tx.s.Execute(ctx, tx.control, query, parameters, opts...) if err != nil { + tx.state.Store(txStateFailed) + return nil, xerrors.WithStackTrace(err) } @@ -115,11 +121,15 @@ func (tx *transaction) ExecuteStatement( switch tx.state.Load() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) + case txStateFailed: + return nil, xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return nil, xerrors.WithStackTrace(errTxRollbackedEarly) default: _, r, err = stmt.Execute(ctx, tx.control, parameters, opts...) if err != nil { + tx.state.Store(txStateFailed) + return nil, xerrors.WithStackTrace(err) } @@ -148,6 +158,8 @@ func (tx *transaction) CommitTx( switch tx.state.Load() { case txStateCommitted: return nil, xerrors.WithStackTrace(errTxAlreadyCommitted) + case txStateFailed: + return nil, xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return nil, xerrors.WithStackTrace(errTxRollbackedEarly) default: @@ -174,6 +186,8 @@ func (tx *transaction) CommitTx( response, err = tx.s.tableService.CommitTransaction(ctx, request) if err != nil { + tx.state.Store(txStateFailed) + return nil, xerrors.WithStackTrace(err) } @@ -206,6 +220,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { switch tx.state.Load() { case txStateCommitted: return nil // nop for committed tx + case txStateFailed: + return xerrors.WithStackTrace(errTxFailedEarly) case txStateRollbacked: return xerrors.WithStackTrace(errTxRollbackedEarly) default: @@ -222,6 +238,8 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) { }, ) if err != nil { + tx.state.Store(txStateFailed) + return xerrors.WithStackTrace(err) } diff --git a/retry/sql.go b/retry/sql.go index 5bf974351..fca156431 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -161,26 +161,16 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err } err := Retry(ctx, func(ctx context.Context) (finalErr error) { attempts++ + tx, err := db.BeginTx(ctx, options.txOptions) if err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) } - defer func() { - if finalErr == nil { - return - } - errRollback := tx.Rollback() - if errRollback == nil { - return - } - finalErr = xerrors.NewWithIssues("", - xerrors.WithStackTrace(finalErr), - xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)), - ) - }() + if err = op(xcontext.MarkRetryCall(ctx), tx); err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) } + if err = tx.Commit(); err != nil { return unwrapErrBadConn(xerrors.WithStackTrace(err)) }