diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index c5da36a7c4d286..55d81f449e7927 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -104,33 +104,14 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { - if (_closed) { - return Status::OK(); - } - RETURN_IF_ERROR(Base::close(state, exec_status)); - if (exec_status.ok()) { - DCHECK(_release_count || _exchanger == nullptr || - _exchanger->_running_source_operators == 0) - << "Do not finish correctly! " << debug_string(0) - << " state: { cancel = " << state->is_cancelled() << ", " - << state->cancel_reason().to_string() - << "} query ctx: { cancel = " << state->get_query_ctx()->is_cancelled() << ", " - << state->get_query_ctx()->exec_status().to_string() - << "} Exchanger: " << (void*)_exchanger; - } - return Status::OK(); -} - std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", + "_running_sink_operators: {}, _running_source_operators: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _exchanger->_running_source_operators, - _release_count); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } @@ -143,13 +124,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { - local_state._release_count = true; local_state._shared_state->sub_running_sink_operators(); return Status::EndOfFile("receiver eof"); } if (eos) { local_state._shared_state->sub_running_sink_operators(); - local_state._release_count = true; } return Status::OK(); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 7a98840b4b323e..fa9512677dc8d8 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -43,7 +43,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState dependencies() const override; @@ -69,7 +68,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState