From aa75f79fad87ac845bb7552fd81ad83df6ab4759 Mon Sep 17 00:00:00 2001 From: wangbo Date: Thu, 27 Jul 2023 14:38:25 +0800 Subject: [PATCH] [fix](executor)cancel exchange buffer rpc when query is cancelled (#22226) when brpc client make a request to a server, if the server doesn't response and may not response forever(such as BE restart), the query can be cancelled at once, but the ExchangeSinkBuffer can not be cancelled until rpc timeout. So we hope when the query is cancelled, the ExchangeSinkBuffer can be closed at once. --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 29 +++++++++++++++++-- be/src/pipeline/exec/exchange_sink_buffer.h | 9 +++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index e06bb24752fd7e..0a487922a21b68 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -72,11 +71,25 @@ bool ExchangeSinkBuffer::can_write() const { return total_package_size <= max_package_size; } -bool ExchangeSinkBuffer::is_pending_finish() const { +bool ExchangeSinkBuffer::is_pending_finish() { + //note(wb) angly implementation here, because operator couples the scheduling logic + // graceful implementation maybe as follows: + // 1 make ExchangeSinkBuffer support try close which calls brpc::StartCancel + // 2 make BlockScheduler calls tryclose when query is cancel + bool need_cancel = _context->is_canceled(); + for (auto& pair : _instance_to_package_queue_mutex) { std::unique_lock lock(*(pair.second)); auto& id = pair.first; if (!_instance_to_sending_by_pipeline.at(id)) { + // when pending finish, we need check whether current query is cancelled + if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) { + auto& rpc_ctx = _instance_to_rpc_ctx[id]; + if (!rpc_ctx.is_cancelled) { + brpc::StartCancel(rpc_ctx._closure->cntl.call_id()); + rpc_ctx.is_cancelled = true; + } + } return true; } } @@ -177,6 +190,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block.get()); } auto* closure = request.channel->get_closure(id, request.eos, nullptr); + + ExchangeRpcContext rpc_ctx; + rpc_ctx._closure = closure; + rpc_ctx.is_cancelled = false; + _instance_to_rpc_ctx[id] = rpc_ctx; + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); @@ -221,6 +240,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block_holder->get_block()); } auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); + + ExchangeRpcContext rpc_ctx; + rpc_ctx._closure = closure; + rpc_ctx.is_cancelled = false; + _instance_to_rpc_ctx[id] = rpc_ctx; + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index dcea246f91367b..c4636563108320 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -153,6 +154,11 @@ class SelfDeleteClosure : public google::protobuf::Closure { vectorized::BroadcastPBlockHolder* _data; }; +struct ExchangeRpcContext { + SelfDeleteClosure* _closure = nullptr; + bool is_cancelled = false; +}; + // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer { public: @@ -162,7 +168,7 @@ class ExchangeSinkBuffer { Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); bool can_write() const; - bool is_pending_finish() const; + bool is_pending_finish(); void close(); void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); @@ -185,6 +191,7 @@ class ExchangeSinkBuffer { phmap::flat_hash_map _instance_to_sending_by_pipeline; phmap::flat_hash_map _instance_to_receiver_eof; phmap::flat_hash_map _instance_to_rpc_time; + phmap::flat_hash_map _instance_to_rpc_ctx; std::atomic _is_finishing; PUniqueId _query_id;