diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index f2cd3f2a4..ac38d59dc 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/transaction" + "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) const ( @@ -241,7 +242,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() bo := retry.NewNoopBackoff(ctx) - committer.ResolveFlushedLocks(bo, []byte("1"), []byte("99")) + committer.ResolveFlushedLocks(bo, []byte("1"), []byte("99"), true) close(done) }() // should be done within 10 seconds. @@ -262,6 +263,36 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() { } } +func (s *testPipelinedMemDBSuite) TestPipelinedRollback() { + txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + startTS := txn.StartTS() + s.Nil(err) + keys := make([][]byte, 0, 100) + for i := 0; i < 100; i++ { + key := []byte(strconv.Itoa(i)) + value := key + txn.Set(key, value) + keys = append(keys, key) + } + txn.GetMemBuffer().Flush(true) + s.Nil(txn.GetMemBuffer().FlushWait()) + s.Nil(txn.Rollback()) + s.Eventuallyf(func() bool { + txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB()) + s.Nil(err) + defer func() { s.Nil(txn.Rollback()) }() + storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier) + s.Nil(err) + return len(storageBufferedValues) == 0 + }, 10*time.Second, 10*time.Millisecond, "rollback should cleanup locks in time") + txn, err = s.store.Begin() + s.Nil(err) + defer func() { s.Nil(txn.Rollback()) }() + storageValues, err := txn.GetSnapshot().BatchGet(context.Background(), keys) + s.Nil(err) + s.Len(storageValues, 0) +} + func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { failpoint.Enable("tikvclient/beforeSendReqToRegion", "return") defer failpoint.Disable("tikvclient/beforeSendReqToRegion") diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index a8a55da88..78338b992 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -23,6 +23,7 @@ import ( "time" "github.com/docker/go-units" + "github.com/golang/protobuf/proto" //nolint:staticcheck "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -335,7 +336,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { // async resolve the rest locks. commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars) - go c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd) + c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd, true) return nil } @@ -355,6 +356,18 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved * maxBackOff = CommitSecondaryMaxBackoff } regionCache := c.store.GetRegionCache() + // the handler function runs in a different goroutine, should copy the required values before it to avoid race. + kvContext := &kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.txn.diskFullOpt, + TxnSource: c.txn.txnSource, + RequestSource: PipelinedRequestSource, + ResourceControlContext: &kvrpcpb.ResourceControlContext{ + ResourceGroupName: c.resourceGroupName, + }, + } return func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { start := r.StartKey res := rangetask.TaskStat{} @@ -363,9 +376,7 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved * StartVersion: c.startTS, CommitVersion: commitVersion, } - req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{ - RequestSource: PipelinedRequestSource, - }) + req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, *proto.Clone(kvContext).(*kvrpcpb.Context)) bo := retry.NewBackoffer(ctx, maxBackOff) loc, err := regionCache.LocateKey(bo, start) if err != nil { @@ -413,11 +424,12 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved * }, nil } -func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte) { - // TODO: implement cleanup. +// resolveFlushedLocks resolves all locks in the given range [start, end) with the given status. +// The resolve process is running in another goroutine so this function won't block. +func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) { const RESOLVE_CONCURRENCY = 8 var resolved atomic.Uint64 - handler, err := c.buildPipelinedResolveHandler(true, &resolved) + handler, err := c.buildPipelinedResolveHandler(commit, &resolved) if err != nil { logutil.Logger(bo.GetCtx()).Error( "[pipelined dml] build buildPipelinedResolveHandler error", @@ -428,26 +440,38 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end ) return } + + status := "rollback" + if commit { + status = "commit" + } + runner := rangetask.NewRangeTaskRunnerWithID( - "pipelined-dml-commit", - fmt.Sprintf("pipelined-dml-commit-%d", c.startTS), + fmt.Sprintf("pipelined-dml-%s", status), + fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS), c.store, RESOLVE_CONCURRENCY, handler, ) - if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil { - logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed", - zap.Uint64("resolved regions", resolved.Load()), - zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), - zap.Error(err), - ) - } else { - logutil.Logger(bo.GetCtx()).Info( - "[pipelined dml] commit transaction secondaries done", - zap.Uint64("resolved regions", resolved.Load()), - zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), - ) - } + + go func() { + if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil { + logutil.Logger(bo.GetCtx()).Error("[pipelined dml] resolve flushed locks failed", + zap.String("txn-status", status), + zap.Uint64("resolved regions", resolved.Load()), + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + zap.Error(err), + ) + } else { + logutil.Logger(bo.GetCtx()).Info("[pipelined dml] resolve flushed locks done", + zap.String("txn-status", status), + zap.Uint64("resolved regions", resolved.Load()), + zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("session", c.sessionID), + ) + } + }() } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 08077aae4..713502db4 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -370,8 +370,8 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error { } // ResolveFlushedLocks exports resolveFlushedLocks -func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []byte) { - c.resolveFlushedLocks(bo, start, end) +func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) { + c.resolveFlushedLocks(bo, start, end, commit) } // SendTxnHeartBeat renews a txn's ttl. diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 2546e35e2..28c712bce 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -790,6 +790,12 @@ func (txn *KVTxn) Rollback() error { txn.pipelinedCancel() txn.GetMemBuffer().FlushWait() txn.committer.ttlManager.close() + // no need to clean up locks when no flush triggered. + pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd + if len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 { + rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars) + txn.committer.resolveFlushedLocks(rollbackBo, pipelinedStart, pipelinedEnd, false) + } } txn.close() logutil.BgLogger().Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS()))