Skip to content

Commit

Permalink
[Bug](runtime-filter) set all task's wake_up_by_downstream before set…
Browse files Browse the repository at this point in the history
… dependency ready (apache#45003)

### What problem does this PR solve?
should_build_hash_table's instance close early and signal this instance
but function make_all_runnable still running and not set this instance's
wake_up_by_downstream to true
  • Loading branch information
BiteTheDDDDt authored Dec 5, 2024
1 parent 6a8ae77 commit 61df4e6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
17 changes: 9 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,23 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
} else if (p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"build_sink::close meet error state, shared_hash_table_signaled: {}, "
"complete_build_stage: {}",
p._shared_hash_table_context->signaled,
p._shared_hash_table_context->complete_build_stage);
} else if ((p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) ||
(p._shared_hash_table_context &&
!p._shared_hash_table_context->complete_build_stage)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "build_sink::close meet error state");
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
return Status::InternalError(
"rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: "
"{}, _finish_dependency: {}",
"{}, _finish_dependency: {}, complete_build_stage: {}, shared_hash_table_signaled: "
"{}",
e.to_string(), state->get_task()->wake_up_by_downstream(), _should_build_hash_table,
_finish_dependency->debug_string());
_finish_dependency->debug_string(),
p._shared_hash_table_context && !p._shared_hash_table_context->complete_build_stage,
p._shared_hashtable_controller && !p._shared_hash_table_context->signaled);
}
return Base::close(state, exec_status);
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
task->set_wake_up_by_downstream();
}
}
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state();
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ class PipelineTask {
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }

void clear_blocking_state(bool wake_up_by_downstream = false) {
void set_wake_up_by_downstream() { _wake_up_by_downstream = true; }

void clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
Expand Down

0 comments on commit 61df4e6

Please sign in to comment.