diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index e06bb24752fd7e..0a487922a21b68 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -72,11 +71,25 @@ bool ExchangeSinkBuffer::can_write() const { return total_package_size <= max_package_size; } -bool ExchangeSinkBuffer::is_pending_finish() const { +bool ExchangeSinkBuffer::is_pending_finish() { + //note(wb) angly implementation here, because operator couples the scheduling logic + // graceful implementation maybe as follows: + // 1 make ExchangeSinkBuffer support try close which calls brpc::StartCancel + // 2 make BlockScheduler calls tryclose when query is cancel + bool need_cancel = _context->is_canceled(); + for (auto& pair : _instance_to_package_queue_mutex) { std::unique_lock lock(*(pair.second)); auto& id = pair.first; if (!_instance_to_sending_by_pipeline.at(id)) { + // when pending finish, we need check whether current query is cancelled + if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) { + auto& rpc_ctx = _instance_to_rpc_ctx[id]; + if (!rpc_ctx.is_cancelled) { + brpc::StartCancel(rpc_ctx._closure->cntl.call_id()); + rpc_ctx.is_cancelled = true; + } + } return true; } } @@ -177,6 +190,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block.get()); } auto* closure = request.channel->get_closure(id, request.eos, nullptr); + + ExchangeRpcContext rpc_ctx; + rpc_ctx._closure = closure; + rpc_ctx.is_cancelled = false; + _instance_to_rpc_ctx[id] = rpc_ctx; + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); @@ -221,6 +240,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block_holder->get_block()); } auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); + + ExchangeRpcContext rpc_ctx; + rpc_ctx._closure = closure; + rpc_ctx.is_cancelled = false; + _instance_to_rpc_ctx[id] = rpc_ctx; + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index dcea246f91367b..c4636563108320 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -153,6 +154,11 @@ class SelfDeleteClosure : public google::protobuf::Closure { vectorized::BroadcastPBlockHolder* _data; }; +struct ExchangeRpcContext { + SelfDeleteClosure* _closure = nullptr; + bool is_cancelled = false; +}; + // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer { public: @@ -162,7 +168,7 @@ class ExchangeSinkBuffer { Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); bool can_write() const; - bool is_pending_finish() const; + bool is_pending_finish(); void close(); void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); @@ -185,6 +191,7 @@ class ExchangeSinkBuffer { phmap::flat_hash_map _instance_to_sending_by_pipeline; phmap::flat_hash_map _instance_to_receiver_eof; phmap::flat_hash_map _instance_to_rpc_time; + phmap::flat_hash_map _instance_to_rpc_ctx; std::atomic _is_finishing; PUniqueId _query_id;