Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement rollback for pipelined dml #1235

Merged
merged 11 commits into from
Apr 9, 2024
33 changes: 32 additions & 1 deletion integration_tests/pipelined_memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)

const (
Expand Down Expand Up @@ -241,7 +242,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
bo := retry.NewNoopBackoff(ctx)
committer.ResolveFlushedLocks(bo, []byte("1"), []byte("99"))
committer.ResolveFlushedLocks(bo, []byte("1"), []byte("99"), true)
close(done)
}()
// should be done within 10 seconds.
Expand All @@ -262,6 +263,36 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}
}

func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
startTS := txn.StartTS()
s.Nil(err)
keys := make([][]byte, 0, 100)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
value := key
txn.Set(key, value)
keys = append(keys, key)
}
txn.GetMemBuffer().Flush(true)
s.Nil(txn.GetMemBuffer().FlushWait())
s.Nil(txn.Rollback())
s.Eventuallyf(func() bool {
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB())
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier)
s.Nil(err)
return len(storageBufferedValues) == 0
}, 10*time.Second, 10*time.Millisecond, "rollback should cleanup locks in time")
txn, err = s.store.Begin()
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageValues, err := txn.GetSnapshot().BatchGet(context.Background(), keys)
s.Nil(err)
s.Len(storageValues, 0)
}

func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
Expand Down
50 changes: 35 additions & 15 deletions txnkv/transaction/pipelined_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/docker/go-units"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
Expand Down Expand Up @@ -314,7 +315,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {

// async resolve the rest locks.
commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
go c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd)
c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd, true)
return nil
}

Expand All @@ -334,6 +335,18 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
maxBackOff = CommitSecondaryMaxBackoff
}
regionCache := c.store.GetRegionCache()
// the handler function runs in a different goroutine, should copy the required values before it to avoid race.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As dicussed in above comments, this may not be a perfect solution, we still need to find a way to avoid race completely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe set txn.committer to nil which disable further accessing to rollbacked committer.

kvContext := &kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: c.resourceGroupTag,
DiskFullOpt: c.txn.diskFullOpt,
TxnSource: c.txn.txnSource,
RequestSource: PipelinedRequestSource,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: c.resourceGroupName,
},
}
return func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
start := r.StartKey
res := rangetask.TaskStat{}
Expand All @@ -342,9 +355,7 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
StartVersion: c.startTS,
CommitVersion: commitVersion,
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
RequestSource: PipelinedRequestSource,
})
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, *proto.Clone(kvContext).(*kvrpcpb.Context))
bo := retry.NewBackoffer(ctx, maxBackOff)
loc, err := regionCache.LocateKey(bo, start)
if err != nil {
Expand Down Expand Up @@ -392,22 +403,31 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
}, nil
}

func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte) {
// TODO: implement cleanup.
// resolveFlushedLocks resolves all locks in the given range [start, end) with the given status.
// The resolve process is running in another goroutine so this function won't block.
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need to choose lower concurrency for rollback.

handler, err := c.buildPipelinedResolveHandler(true, &resolved)
handler, err := c.buildPipelinedResolveHandler(commit, &resolved)
if err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err))
return
}
runner := rangetask.NewRangeTaskRunner("pipelined-dml-commit", c.store, RESOLVE_CONCURRENCY, handler)
if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed",
zap.Uint64("resolved regions", resolved.Load()),
zap.Error(err))
} else {
logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done",
zap.Uint64("resolved regions", resolved.Load()))
status := "rollback"
if commit {
status = "commit"
}
runner := rangetask.NewRangeTaskRunner("pipelined-dml-"+status, c.store, RESOLVE_CONCURRENCY, handler)
go func() {
if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] resolve flushed locks failed",
zap.String("txn-status", status),
zap.Uint64("resolved regions", resolved.Load()),
zap.Error(err))
} else {
logutil.BgLogger().Info("[pipelined dml] resolve flushed locks done",
zap.String("txn-status", status),
zap.Uint64("resolved regions", resolved.Load()))
}
}()
}
4 changes: 2 additions & 2 deletions txnkv/transaction/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error {
}

// ResolveFlushedLocks exports resolveFlushedLocks
func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []byte) {
c.resolveFlushedLocks(bo, start, end)
func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
c.resolveFlushedLocks(bo, start, end, commit)
}

// SendTxnHeartBeat renews a txn's ttl.
Expand Down
6 changes: 6 additions & 0 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,12 @@ func (txn *KVTxn) Rollback() error {
txn.pipelinedCancel()
txn.GetMemBuffer().FlushWait()
txn.committer.ttlManager.close()
// no need to clean up locks when no flush triggered.
pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd
if len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 {
rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars)
txn.committer.resolveFlushedLocks(rollbackBo, pipelinedStart, pipelinedEnd, false)
}
}
txn.close()
logutil.BgLogger().Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS()))
Expand Down
Loading