From 0a3216dee8035aded142537fe5353a126fac213a Mon Sep 17 00:00:00 2001 From: HappenLee Date: Mon, 10 Jul 2023 17:04:39 +0800 Subject: [PATCH] [pipeline](task_queue) remove disable steal in task queue to speed up query --- be/src/pipeline/pipeline.h | 8 +------- be/src/pipeline/pipeline_fragment_context.cpp | 1 - be/src/pipeline/pipeline_task.h | 4 ---- be/src/pipeline/task_queue.cpp | 3 --- 4 files changed, 1 insertion(+), 15 deletions(-) diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 676eb9efa19439..73b2c3850c5849 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -49,10 +49,7 @@ class Pipeline : public std::enable_shared_from_this { public: Pipeline() = delete; explicit Pipeline(PipelineId pipeline_id, std::weak_ptr context) - : _complete_dependency(0), - _pipeline_id(pipeline_id), - _context(context), - _can_steal(true) { + : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) { _init_profile(); } @@ -84,8 +81,6 @@ class Pipeline : public std::enable_shared_from_this { RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); } - void disable_task_steal() { _can_steal = false; } - private: void _init_profile(); std::atomic _complete_dependency; @@ -98,7 +93,6 @@ class Pipeline : public std::enable_shared_from_this { PipelineId _pipeline_id; std::weak_ptr _context; - bool _can_steal; int _previous_schedule_id = -1; std::unique_ptr _pipeline_profile; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 73feac07ac3e8f..c697b11928dec0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -600,7 +600,6 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur OperatorBuilderPtr join_sink = std::make_shared(next_operator_builder_id(), join_node); RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); - new_pipe->disable_task_steal(); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr join_source = std::make_shared( diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 65e9ad83ed81de..acb5d1202f4ecc 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -121,7 +121,6 @@ class PipelineTask { _sink(sink), _prepared(false), _opened(false), - _can_steal(pipeline->_can_steal), _state(state), _cur_state(PipelineTaskState::NOT_READY), _data_state(SourceState::DEPEND_ON_SOURCE), @@ -159,8 +158,6 @@ class PipelineTask { bool sink_can_write() { return _sink->can_write(); } - bool can_steal() const { return _can_steal; } - Status finalize(); PipelineFragmentContext* fragment_context() { return _fragment_context; } @@ -229,7 +226,6 @@ class PipelineTask { bool _prepared; bool _opened; - bool _can_steal; RuntimeState* _state; int _previous_schedule_id = -1; uint32_t _schedule_time = 0; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 807a344cf923c6..2c09ac8edd7310 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -35,9 +35,6 @@ PipelineTask* SubTaskQueue::try_take(bool is_steal) { return nullptr; } auto task = _queue.front(); - if (!task->can_steal() && is_steal) { - return nullptr; - } _queue.pop(); return task; }