Skip to content

Commit

Permalink
coprocessor: fix performance regression when execute DML with lite-co…
Browse files Browse the repository at this point in the history
…p-worker meet region error (#58659)

close #58658
  • Loading branch information
crazycs520 authored Jan 13, 2025
1 parent b74eb0f commit 39912af
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
if r.selectResp == nil {
return nil
}
failpoint.Inject("mockConsumeSelectRespSlow", func(val failpoint.Value) {
time.Sleep(time.Duration(val.(int) * int(time.Millisecond)))
})
}
// TODO(Shenghui Wu): add metrics
encodeType := r.selectResp.GetEncodeType()
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/copr_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = [
"//pkg/config",
"//pkg/kv",
Expand Down
27 changes: 27 additions & 0 deletions pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,30 @@ func TestQueryWithConcurrentSmallCop(t *testing.T) {
require.Less(t, time.Since(start), time.Millisecond*150)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop"))
}

func TestDMLWithLiteCopWorker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id bigint auto_increment key, b int);")
tk.MustExec("insert into t1 (b) values (1),(2),(3),(4),(5),(6),(7),(8);")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t1 (b) select b from t1;")
}
tk.MustQuery("select count(*) from t1").Check(testkit.Rows("2048"))
tk.MustQuery("split table t1 by (1025);").Check(testkit.Rows("1 1"))
tk.MustExec("set @@tidb_enable_paging = off")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop", `return(200)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/mockConsumeSelectRespSlow", `return(100)`))
start := time.Now()
tk.MustExec("update t1 set b=b+1 where id >= 0;")
require.Less(t, time.Since(start), time.Millisecond*800) // 3 * 200ms + 1 * 100ms
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/mockConsumeSelectRespSlow"))

// Test select after split table.
tk.MustExec("truncate table t1;")
tk.MustExec("insert into t1 (b) values (1),(2),(3),(4),(5),(6),(7),(8);")
tk.MustQuery("split table t1 by (3), (6), (9);").Check(testkit.Rows("3 1"))
tk.MustQuery("select b from t1 order by id").Check(testkit.Rows("1", "2", "3", "4", "5", "6", "7", "8"))
}
40 changes: 36 additions & 4 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,10 +1110,18 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
failpoint.InjectCall("CtxCancelBeforeReceive", ctx)
if it.liteWorker != nil {
resp = it.liteWorker.liteSendReq(ctx, it)
// after lite handle 1 task, reset tryCopLiteWorker to 0 to make future request can reuse copLiteWorker.
it.liteWorker.tryCopLiteWorker.CompareAndSwap(1, 0)
if resp == nil {
it.actionOnExceed.close()
return nil, nil
}
if len(it.tasks) > 0 && len(it.liteWorker.batchCopRespList) == 0 && resp.err == nil {
// if there are remain tasks to be processed, we need to run worker concurrently to avoid blocking.
// see more detail in https://github.com/pingcap/tidb/issues/58658 and TestDMLWithLiteCopWorker.
it.liteWorker.runWorkerConcurrently(it)
it.liteWorker = nil
}
it.actionOnExceed.destroyTokenIfNeeded(func() {})
memTrackerConsumeResp(it.memTracker, resp)
} else if it.respChan != nil {
Expand Down Expand Up @@ -1198,10 +1206,6 @@ func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator
} else {
it.tasks = it.tasks[1:]
}
if len(it.tasks) == 0 {
// if all tasks are finished, reset tryCopLiteWorker to 0 to make future request can reuse copLiteWorker.
w.tryCopLiteWorker.Store(0)
}
if result != nil {
if result.resp != nil {
w.batchCopRespList = result.batchRespList
Expand All @@ -1217,6 +1221,34 @@ func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator
return nil
}

func (w *liteCopIteratorWorker) runWorkerConcurrently(it *copIterator) {
taskCh := make(chan *copTask, 1)
worker := w.worker
worker.taskCh = taskCh
it.wg.Add(1)
go worker.run(w.ctx)

if it.respChan == nil {
// If it.respChan is nil, we will read the response from task.respChan,
// but task.respChan maybe nil when rebuilding cop task, so we need to create respChan for the task.
for i := range it.tasks {
if it.tasks[i].respChan == nil {
it.tasks[i].respChan = make(chan *copResponse, 2)
}
}
}

taskSender := &copIteratorTaskSender{
taskCh: taskCh,
wg: &it.wg,
tasks: it.tasks,
finishCh: it.finishCh,
sendRate: it.sendRate,
respChan: it.respChan,
}
go taskSender.run(it.req.ConnID, it.req.RunawayChecker)
}

// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
type HasUnconsumedCopRuntimeStats interface {
// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
Expand Down

0 comments on commit 39912af

Please sign in to comment.