Skip to content

Commit 9a7e2b6

Browse files
committed
fail pipelined dml when max ttl exceeded
Signed-off-by: you06 <[email protected]>
1 parent 31a502b commit 9a7e2b6

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

integration_tests/pipelined_memdb_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2727
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
2828
"github.com/stretchr/testify/suite"
29+
"github.com/tikv/client-go/v2/config"
2930
"github.com/tikv/client-go/v2/config/retry"
3031
tikverr "github.com/tikv/client-go/v2/error"
3132
"github.com/tikv/client-go/v2/oracle"
@@ -466,3 +467,42 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
466467
s.NotNil(err)
467468
s.ErrorContains(err, "ttl manager is closed")
468469
}
470+
471+
func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() {
472+
originManagedTTLVal := atomic.LoadUint64(&transaction.ManagedLockTTL)
473+
originMaxPipelinedTxnTTL := atomic.LoadUint64(&transaction.MaxPipelinedTxnTTL)
474+
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
475+
atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, 200) // set to 200ms
476+
updateGlobalConfig(func(conf *config.Config) {
477+
conf.MaxTxnTTL = 200 // set to 200ms
478+
})
479+
defer func() {
480+
atomic.StoreUint64(&transaction.ManagedLockTTL, originManagedTTLVal)
481+
atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, originMaxPipelinedTxnTTL)
482+
restoreGlobalConfFunc()
483+
}()
484+
485+
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
486+
s.Nil(err)
487+
txn.Set([]byte("key1"), []byte("value1"))
488+
txnProbe := transaction.TxnProbe{KVTxn: txn}
489+
flushed, err := txnProbe.GetMemBuffer().Flush(true)
490+
s.Nil(err)
491+
s.True(flushed)
492+
s.Nil(txn.GetMemBuffer().FlushWait())
493+
s.Equal(txnProbe.GetCommitter().GetPrimaryKey(), []byte("key1"))
494+
495+
s.True(txnProbe.GetCommitter().IsTTLRunning())
496+
497+
s.Eventuallyf(func() bool {
498+
return !txnProbe.GetCommitter().IsTTLRunning()
499+
}, 5*time.Second, 100*time.Millisecond, "ttl manager should stop after max ttl")
500+
501+
txn.Set([]byte("key2"), []byte("value2"))
502+
flushed, err = txn.GetMemBuffer().Flush(true)
503+
s.Nil(err)
504+
s.True(flushed)
505+
err = txn.GetMemBuffer().FlushWait()
506+
s.NotNil(err)
507+
s.ErrorContains(err, "ttl manager is closed")
508+
}

txnkv/transaction/2pc.go

+5
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,11 @@ func keepAlive(
12291229
if c.isPessimistic && lockCtx != nil && lockCtx.LockExpired != nil {
12301230
atomic.StoreUint32(lockCtx.LockExpired, 1)
12311231
}
1232+
if isPipelinedTxn {
1233+
// the pipelined txn can last a long time after max ttl exceeded.
1234+
// if we don't stop it, it may fail when committing the primary key with high probability.
1235+
tm.close()
1236+
}
12321237
return
12331238
}
12341239

0 commit comments

Comments
 (0)