Skip to content

Commit

Permalink
[fix](executor)cancel exchange buffer rpc when query is cancelled (ap…
Browse files Browse the repository at this point in the history
…ache#22226)

when brpc client make a request to a server, if the server doesn't response and may not response forever(such as BE restart), the query can be cancelled at once, but the ExchangeSinkBuffer can not be cancelled until rpc timeout.
So we hope when the query is cancelled, the ExchangeSinkBuffer can be closed at once.
  • Loading branch information
wangbo authored Jul 27, 2023
1 parent 9a95d66 commit aa75f79
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
29 changes: 27 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <butil/iobuf_inl.h>
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <stddef.h>
Expand Down Expand Up @@ -72,11 +71,25 @@ bool ExchangeSinkBuffer::can_write() const {
return total_package_size <= max_package_size;
}

bool ExchangeSinkBuffer::is_pending_finish() const {
bool ExchangeSinkBuffer::is_pending_finish() {
//note(wb) angly implementation here, because operator couples the scheduling logic
// graceful implementation maybe as follows:
// 1 make ExchangeSinkBuffer support try close which calls brpc::StartCancel
// 2 make BlockScheduler calls tryclose when query is cancel
bool need_cancel = _context->is_canceled();

for (auto& pair : _instance_to_package_queue_mutex) {
std::unique_lock<std::mutex> lock(*(pair.second));
auto& id = pair.first;
if (!_instance_to_sending_by_pipeline.at(id)) {
// when pending finish, we need check whether current query is cancelled
if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) {
auto& rpc_ctx = _instance_to_rpc_ctx[id];
if (!rpc_ctx.is_cancelled) {
brpc::StartCancel(rpc_ctx._closure->cntl.call_id());
rpc_ctx.is_cancelled = true;
}
}
return true;
}
}
Expand Down Expand Up @@ -177,6 +190,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
brpc_request->set_allocated_block(request.block.get());
}
auto* closure = request.channel->get_closure(id, request.eos, nullptr);

ExchangeRpcContext rpc_ctx;
rpc_ctx._closure = closure;
rpc_ctx.is_cancelled = false;
_instance_to_rpc_ctx[id] = rpc_ctx;

closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
closure->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
Expand Down Expand Up @@ -221,6 +240,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
auto* closure = request.channel->get_closure(id, request.eos, request.block_holder);

ExchangeRpcContext rpc_ctx;
rpc_ctx._closure = closure;
rpc_ctx.is_cancelled = false;
_instance_to_rpc_ctx[id] = rpc_ctx;

closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
closure->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <brpc/controller.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <parallel_hashmap/phmap.h>
#include <stdint.h>
Expand Down Expand Up @@ -153,6 +154,11 @@ class SelfDeleteClosure : public google::protobuf::Closure {
vectorized::BroadcastPBlockHolder* _data;
};

struct ExchangeRpcContext {
SelfDeleteClosure<PTransmitDataResult>* _closure = nullptr;
bool is_cancelled = false;
};

// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer {
public:
Expand All @@ -162,7 +168,7 @@ class ExchangeSinkBuffer {
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
bool can_write() const;
bool is_pending_finish() const;
bool is_pending_finish();
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_profile(RuntimeProfile* profile);
Expand All @@ -185,6 +191,7 @@ class ExchangeSinkBuffer {
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> _instance_to_rpc_ctx;

std::atomic<bool> _is_finishing;
PUniqueId _query_id;
Expand Down

0 comments on commit aa75f79

Please sign in to comment.