diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go index e21b556b43f93..cb1b187da06c6 100644 --- a/pkg/distsql/select_result.go +++ b/pkg/distsql/select_result.go @@ -386,6 +386,9 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { if r.selectResp == nil { return nil } + failpoint.Inject("mockConsumeSelectRespSlow", func(val failpoint.Value) { + time.Sleep(time.Duration(val.(int) * int(time.Millisecond))) + }) } // TODO(Shenghui Wu): add metrics encodeType := r.selectResp.GetEncodeType() diff --git a/pkg/store/copr/copr_test/BUILD.bazel b/pkg/store/copr/copr_test/BUILD.bazel index 0903eec3f3c74..aa9ba312622e3 100644 --- a/pkg/store/copr/copr_test/BUILD.bazel +++ b/pkg/store/copr/copr_test/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 4, + shard_count = 5, deps = [ "//pkg/config", "//pkg/kv", diff --git a/pkg/store/copr/copr_test/coprocessor_test.go b/pkg/store/copr/copr_test/coprocessor_test.go index 9b40a77d9ceef..5a9a005704c06 100644 --- a/pkg/store/copr/copr_test/coprocessor_test.go +++ b/pkg/store/copr/copr_test/coprocessor_test.go @@ -323,3 +323,30 @@ func TestQueryWithConcurrentSmallCop(t *testing.T) { require.Less(t, time.Since(start), time.Millisecond*150) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop")) } + +func TestDMLWithLiteCopWorker(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (id bigint auto_increment key, b int);") + tk.MustExec("insert into t1 (b) values (1),(2),(3),(4),(5),(6),(7),(8);") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t1 (b) select b from t1;") + } + tk.MustQuery("select count(*) from t1").Check(testkit.Rows("2048")) + tk.MustQuery("split table t1 by (1025);").Check(testkit.Rows("1 1")) + tk.MustExec("set @@tidb_enable_paging = off") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop", `return(200)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/mockConsumeSelectRespSlow", `return(100)`)) + start := time.Now() + tk.MustExec("update t1 set b=b+1 where id >= 0;") + require.Less(t, time.Since(start), time.Millisecond*800) // 3 * 200ms + 1 * 100ms + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/mockConsumeSelectRespSlow")) + + // Test select after split table. + tk.MustExec("truncate table t1;") + tk.MustExec("insert into t1 (b) values (1),(2),(3),(4),(5),(6),(7),(8);") + tk.MustQuery("split table t1 by (3), (6), (9);").Check(testkit.Rows("3 1")) + tk.MustQuery("select b from t1 order by id").Check(testkit.Rows("1", "2", "3", "4", "5", "6", "7", "8")) +} diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index ebb4a9ca892b5..4796d6822d730 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -1110,10 +1110,18 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { failpoint.InjectCall("CtxCancelBeforeReceive", ctx) if it.liteWorker != nil { resp = it.liteWorker.liteSendReq(ctx, it) + // after lite handle 1 task, reset tryCopLiteWorker to 0 to make future request can reuse copLiteWorker. + it.liteWorker.tryCopLiteWorker.CompareAndSwap(1, 0) if resp == nil { it.actionOnExceed.close() return nil, nil } + if len(it.tasks) > 0 && len(it.liteWorker.batchCopRespList) == 0 && resp.err == nil { + // if there are remain tasks to be processed, we need to run worker concurrently to avoid blocking. + // see more detail in https://github.com/pingcap/tidb/issues/58658 and TestDMLWithLiteCopWorker. + it.liteWorker.runWorkerConcurrently(it) + it.liteWorker = nil + } it.actionOnExceed.destroyTokenIfNeeded(func() {}) memTrackerConsumeResp(it.memTracker, resp) } else if it.respChan != nil { @@ -1198,10 +1206,6 @@ func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator } else { it.tasks = it.tasks[1:] } - if len(it.tasks) == 0 { - // if all tasks are finished, reset tryCopLiteWorker to 0 to make future request can reuse copLiteWorker. - w.tryCopLiteWorker.Store(0) - } if result != nil { if result.resp != nil { w.batchCopRespList = result.batchRespList @@ -1217,6 +1221,34 @@ func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator return nil } +func (w *liteCopIteratorWorker) runWorkerConcurrently(it *copIterator) { + taskCh := make(chan *copTask, 1) + worker := w.worker + worker.taskCh = taskCh + it.wg.Add(1) + go worker.run(w.ctx) + + if it.respChan == nil { + // If it.respChan is nil, we will read the response from task.respChan, + // but task.respChan maybe nil when rebuilding cop task, so we need to create respChan for the task. + for i := range it.tasks { + if it.tasks[i].respChan == nil { + it.tasks[i].respChan = make(chan *copResponse, 2) + } + } + } + + taskSender := &copIteratorTaskSender{ + taskCh: taskCh, + wg: &it.wg, + tasks: it.tasks, + finishCh: it.finishCh, + sendRate: it.sendRate, + respChan: it.respChan, + } + go taskSender.run(it.req.ConnID, it.req.RunawayChecker) +} + // HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats. type HasUnconsumedCopRuntimeStats interface { // CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.