Skip to content

Commit

Permalink
[pipeline](task_queue) remove disable steal in task queue to speed up…
Browse files Browse the repository at this point in the history
… query (apache#21692)

TPCH Q9

before: 2.74s
after: 2.33s
  • Loading branch information
HappenLee authored Jul 10, 2023
1 parent f5641b5 commit 307149d
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 15 deletions.
8 changes: 1 addition & 7 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
: _complete_dependency(0),
_pipeline_id(pipeline_id),
_context(context),
_can_steal(true) {
: _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {
_init_profile();
}

Expand Down Expand Up @@ -84,8 +81,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }

void disable_task_steal() { _can_steal = false; }

private:
void _init_profile();
std::atomic<uint32_t> _complete_dependency;
Expand All @@ -98,7 +93,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
bool _can_steal;
int _previous_schedule_id = -1;

std::unique_ptr<RuntimeProfile> _pipeline_profile;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,6 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
new_pipe->disable_task_steal();

RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source = std::make_shared<HashJoinProbeOperatorBuilder>(
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState*
_sink(sink),
_prepared(false),
_opened(false),
_can_steal(pipeline->_can_steal),
_state(state),
_cur_state(PipelineTaskState::NOT_READY),
_data_state(SourceState::DEPEND_ON_SOURCE),
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ class PipelineTask {

bool sink_can_write() { return _sink->can_write(); }

bool can_steal() const { return _can_steal; }

Status finalize();

PipelineFragmentContext* fragment_context() { return _fragment_context; }
Expand Down Expand Up @@ -214,7 +212,6 @@ class PipelineTask {

bool _prepared;
bool _opened;
bool _can_steal;
RuntimeState* _state;
int _previous_schedule_id = -1;
uint32_t _schedule_time = 0;
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ PipelineTask* SubTaskQueue::try_take(bool is_steal) {
return nullptr;
}
auto task = _queue.front();
if (!task->can_steal() && is_steal) {
return nullptr;
}
_queue.pop();
return task;
}
Expand Down

0 comments on commit 307149d

Please sign in to comment.