diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 676eb9efa19439..73b2c3850c5849 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -49,10 +49,7 @@ class Pipeline : public std::enable_shared_from_this { public: Pipeline() = delete; explicit Pipeline(PipelineId pipeline_id, std::weak_ptr context) - : _complete_dependency(0), - _pipeline_id(pipeline_id), - _context(context), - _can_steal(true) { + : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) { _init_profile(); } @@ -84,8 +81,6 @@ class Pipeline : public std::enable_shared_from_this { RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); } - void disable_task_steal() { _can_steal = false; } - private: void _init_profile(); std::atomic _complete_dependency; @@ -98,7 +93,6 @@ class Pipeline : public std::enable_shared_from_this { PipelineId _pipeline_id; std::weak_ptr _context; - bool _can_steal; int _previous_schedule_id = -1; std::unique_ptr _pipeline_profile; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0d9745255f80ce..29f5b8d438b5f5 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -603,7 +603,6 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur OperatorBuilderPtr join_sink = std::make_shared(next_operator_builder_id(), join_node); RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); - new_pipe->disable_task_steal(); RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe)); OperatorBuilderPtr join_source = std::make_shared( diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 11aeb620fa7e48..c44cec514c68a6 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -52,7 +52,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* _sink(sink), _prepared(false), _opened(false), - _can_steal(pipeline->_can_steal), _state(state), _cur_state(PipelineTaskState::NOT_READY), _data_state(SourceState::DEPEND_ON_SOURCE), diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1652ead170f004..e08dabb47b06b3 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -144,8 +144,6 @@ class PipelineTask { bool sink_can_write() { return _sink->can_write(); } - bool can_steal() const { return _can_steal; } - Status finalize(); PipelineFragmentContext* fragment_context() { return _fragment_context; } @@ -214,7 +212,6 @@ class PipelineTask { bool _prepared; bool _opened; - bool _can_steal; RuntimeState* _state; int _previous_schedule_id = -1; uint32_t _schedule_time = 0; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index d3dc4440bb04d6..c9a73deae00b7d 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -34,9 +34,6 @@ PipelineTask* SubTaskQueue::try_take(bool is_steal) { return nullptr; } auto task = _queue.front(); - if (!task->can_steal() && is_steal) { - return nullptr; - } _queue.pop(); return task; }