diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8115b87c8fe5ac..ccd8757aba7f28 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -166,12 +166,10 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_runtime_filter_compute_timer); _runtime_filter_slots->insert(block); } - } else if (p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) { - throw Exception(ErrorCode::INTERNAL_ERROR, - "build_sink::close meet error state, shared_hash_table_signaled: {}, " - "complete_build_stage: {}", - p._shared_hash_table_context->signaled, - p._shared_hash_table_context->complete_build_stage); + } else if ((p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) || + (p._shared_hash_table_context && + !p._shared_hash_table_context->complete_build_stage)) { + throw Exception(ErrorCode::INTERNAL_ERROR, "build_sink::close meet error state"); } SCOPED_TIMER(_publish_runtime_filter_timer); @@ -179,9 +177,12 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } catch (Exception& e) { return Status::InternalError( "rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: " - "{}, _finish_dependency: {}", + "{}, _finish_dependency: {}, complete_build_stage: {}, shared_hash_table_signaled: " + "{}", e.to_string(), state->get_task()->wake_up_by_downstream(), _should_build_hash_table, - _finish_dependency->debug_string()); + _finish_dependency->debug_string(), + p._shared_hash_table_context && !p._shared_hash_table_context->complete_build_stage, + p._shared_hashtable_controller && !p._shared_hash_table_context->signaled); } return Base::close(state, exec_status); } diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 96da754daa5d98..e4678b7dcf3a83 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -112,7 +112,12 @@ void Pipeline::make_all_runnable() { if (_sink->count_down_destination()) { for (auto* task : _tasks) { if (task) { - task->clear_blocking_state(true); + task->set_wake_up_by_downstream(); + } + } + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(); } } } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 3b4627f589dc54..4bb062122c0c08 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -135,11 +135,12 @@ class PipelineTask { int task_id() const { return _index; }; bool is_finalized() const { return _finalized; } - void clear_blocking_state(bool wake_up_by_downstream = false) { + void set_wake_up_by_downstream() { _wake_up_by_downstream = true; } + + void clear_blocking_state() { _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); - _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finalized) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) {