Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support start KeepWrite bthread urgent #2591

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
wopt.auth_flags = _auth_flags;
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
wopt.write_in_background = write_to_socket_in_background();
wopt.keep_write_urgent = keep_write_urgent();
int rc;
size_t packet_size = 0;
if (user_packet_guard) {
Expand Down
6 changes: 6 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
static const uint32_t FLAGS_KEEP_WRITE_URGENT = (1 << 23);

public:
struct Inheritable {
Expand Down Expand Up @@ -388,6 +389,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); }
bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); }

// Create a KEEPWRITE bthread to write to socket for
// requests or responses of RPCs.
void set_keep_write_urgent(bool f) { set_flag(FLAGS_KEEP_WRITE_URGENT, f); }
bool keep_write_urgent() const { return has_flag(FLAGS_KEEP_WRITE_URGENT); }

// ------------------------------------------------------------------------
// Server-side methods.
// These calls shall be made from the server side only. Their results are
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ void SendRpcResponse(int64_t correlation_id,
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl->keep_write_urgent();
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ HttpResponseSender::~HttpResponseSender() {
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = _cntl->keep_write_urgent();
if (is_http2) {
if (is_grpc) {
// Append compressed and length before body
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ static void SendHuluResponse(int64_t correlation_id,
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl->keep_write_urgent();
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/mongo_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void SendMongoResponse::Run() {
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl.keep_write_urgent();
if (socket->Write(&res_buf, &wopt) != 0) {
PLOG(WARNING) << "Fail to write into " << *socket;
return;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/nshead_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void NsheadClosure::Run() {
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = _controller.keep_write_urgent();
if (sock->Write(&write_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/sofa_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ static void SendSofaResponse(int64_t correlation_id,
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
wopt.keep_write_urgent = cntl->keep_write_urgent();
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
Expand Down
8 changes: 6 additions & 2 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1787,8 +1787,12 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
KEEPWRITE_IN_BACKGROUND:
ReAddress(&ptr_for_keep_write);
req->set_socket(ptr_for_keep_write.release());
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
KeepWrite, req) != 0) {
if (opt.keep_write_urgent) {
ret = bthread_start_urgent(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req);
} else {
ret = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, KeepWrite, req);
}
if (ret != 0) {
LOG(FATAL) << "Fail to start KeepWrite";
KeepWrite(req);
}
Expand Down
7 changes: 6 additions & 1 deletion src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ friend class policy::H2GlobalStreamCreator;
// Default: false
bool shutdown_write;

// KeepWrite with `bthread_start_urgent' or `bthread_start_background'.
// Default: false
bool keep_write_urgent;

WriteOptions()
: id_wait(INVALID_BTHREAD_ID)
, notify_on_success(false)
Expand All @@ -379,7 +383,8 @@ friend class policy::H2GlobalStreamCreator;
, auth_flags(0)
, ignore_eovercrowded(false)
, write_in_background(false)
, shutdown_write(false) {}
, shutdown_write(false)
, keep_write_urgent(false) {}
};

// True if write of socket is shutdown.
Expand Down
Loading