From dfd1addd975055f04fee842f50bddd288fd3d72d Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sat, 6 Apr 2024 11:42:05 +0800 Subject: [PATCH] Support start KeepWrite bthread urgent --- src/brpc/controller.cpp | 1 + src/brpc/controller.h | 6 ++++++ src/brpc/policy/baidu_rpc_protocol.cpp | 1 + src/brpc/policy/http_rpc_protocol.cpp | 1 + src/brpc/policy/hulu_pbrpc_protocol.cpp | 1 + src/brpc/policy/mongo_protocol.cpp | 1 + src/brpc/policy/nshead_protocol.cpp | 1 + src/brpc/policy/sofa_pbrpc_protocol.cpp | 1 + src/brpc/socket.cpp | 8 ++++++-- src/brpc/socket.h | 7 ++++++- 10 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 1d9b1bb968..70275457bc 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1201,6 +1201,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { wopt.auth_flags = _auth_flags; wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); wopt.write_in_background = write_to_socket_in_background(); + wopt.keep_write_urgent = keep_write_urgent(); int rc; size_t packet_size = 0; if (user_packet_guard) { diff --git a/src/brpc/controller.h b/src/brpc/controller.h index d3ffb99f8c..601887c285 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -148,6 +148,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); + static const uint32_t FLAGS_KEEP_WRITE_URGENT = (1 << 23); public: struct Inheritable { @@ -388,6 +389,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); } bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); } + // Create a KEEPWRITE bthread to write to socket for + // requests or responses of RPCs. + void set_keep_write_urgent(bool f) { set_flag(FLAGS_KEEP_WRITE_URGENT, f); } + bool keep_write_urgent() const { return has_flag(FLAGS_KEEP_WRITE_URGENT); } + // ------------------------------------------------------------------------ // Server-side methods. // These calls shall be made from the server side only. Their results are diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 6fa17d6ca2..e98dd1a0e3 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -275,6 +275,7 @@ void SendRpcResponse(int64_t correlation_id, // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + wopt.keep_write_urgent = cntl->keep_write_urgent(); if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 2587cf3f04..3f8c1f5c9a 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -926,6 +926,7 @@ HttpResponseSender::~HttpResponseSender() { // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + wopt.keep_write_urgent = _cntl->keep_write_urgent(); if (is_http2) { if (is_grpc) { // Append compressed and length before body diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp b/src/brpc/policy/hulu_pbrpc_protocol.cpp index 2b63189eac..bfcea44433 100644 --- a/src/brpc/policy/hulu_pbrpc_protocol.cpp +++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp @@ -306,6 +306,7 @@ static void SendHuluResponse(int64_t correlation_id, // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + wopt.keep_write_urgent = cntl->keep_write_urgent(); if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; diff --git a/src/brpc/policy/mongo_protocol.cpp b/src/brpc/policy/mongo_protocol.cpp index 82bb3e0b36..a01f980043 100644 --- a/src/brpc/policy/mongo_protocol.cpp +++ b/src/brpc/policy/mongo_protocol.cpp @@ -100,6 +100,7 @@ void SendMongoResponse::Run() { // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + wopt.keep_write_urgent = cntl.keep_write_urgent(); if (socket->Write(&res_buf, &wopt) != 0) { PLOG(WARNING) << "Fail to write into " << *socket; return; diff --git a/src/brpc/policy/nshead_protocol.cpp b/src/brpc/policy/nshead_protocol.cpp index e51be36149..c3610acea5 100644 --- a/src/brpc/policy/nshead_protocol.cpp +++ b/src/brpc/policy/nshead_protocol.cpp @@ -114,6 +114,7 @@ void NsheadClosure::Run() { // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + wopt.keep_write_urgent = _controller.keep_write_urgent(); if (sock->Write(&write_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp b/src/brpc/policy/sofa_pbrpc_protocol.cpp index 7584f79bd4..ab263364e3 100644 --- a/src/brpc/policy/sofa_pbrpc_protocol.cpp +++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp @@ -283,6 +283,7 @@ static void SendSofaResponse(int64_t correlation_id, // users to set max_concurrency. Socket::WriteOptions wopt; wopt.ignore_eovercrowded = true; + wopt.keep_write_urgent = cntl->keep_write_urgent(); if (sock->Write(&res_buf, &wopt) != 0) { const int errcode = errno; PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 447bac5fe7..f9175109e3 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1787,8 +1787,12 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { KEEPWRITE_IN_BACKGROUND: ReAddress(&ptr_for_keep_write); req->set_socket(ptr_for_keep_write.release()); - if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, - KeepWrite, req) != 0) { + if (opt.keep_write_urgent) { + ret = bthread_start_urgent(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req); + } else { + ret = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req); + } + if (ret != 0) { LOG(FATAL) << "Fail to start KeepWrite"; KeepWrite(req); } diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 97ce568522..06036bbad5 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -371,6 +371,10 @@ friend class policy::H2GlobalStreamCreator; // Default: false bool shutdown_write; + // KeepWrite with `bthread_start_urgent' or `bthread_start_background'. + // Default: false + bool keep_write_urgent; + WriteOptions() : id_wait(INVALID_BTHREAD_ID) , notify_on_success(false) @@ -379,7 +383,8 @@ friend class policy::H2GlobalStreamCreator; , auth_flags(0) , ignore_eovercrowded(false) , write_in_background(false) - , shutdown_write(false) {} + , shutdown_write(false) + , keep_write_urgent(false) {} }; // True if write of socket is shutdown.