Skip to content

Commit

Permalink
chore: print effective QPS of the server. (#3274)
Browse files Browse the repository at this point in the history
Also refactor ReceiveFB into multiple functions.
Finally, fix the memcached command in local monitoring stack.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 7, 2024
1 parent e213d60 commit 374a5f5
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
66 changes: 36 additions & 30 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ class Driver {
}

private:
void PopRequest(base::Histogram* dest);
void ReceiveFb(base::Histogram* dest);
void ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf, base::Histogram* dest);

struct Req {
uint64_t start;
Expand All @@ -183,7 +185,7 @@ class Driver {

unique_ptr<FiberSocketBase> socket_;
queue<Req> reqs_;
uint64_t hit_count_ = 0, hit_opportunities_ = 0;
uint64_t hit_count_ = 0, hit_opportunities_ = 0, num_resp_ = 0;
};

// Per thread client.
Expand Down Expand Up @@ -323,27 +325,26 @@ static string_view FindLine(io::Bytes buf) {
return {};
};

void Driver::PopRequest(base::Histogram* dest) {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
dest->Add(usec);
hit_opportunities_ += reqs_.front().might_hit;

reqs_.pop();
++num_resp_;
}

void Driver::ReceiveFb(base::Histogram* dest) {
facade::RedisParser parser{1 << 16, false};
io::IoBuf io_buf{512};
unsigned num_resp = 0;

auto pop_req = [&] {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
dest->Add(usec);
hit_opportunities_ += reqs_.front().might_hit;

reqs_.pop();
++num_resp;
};

unsigned blob_len = 0;

while (true) {
io_buf.EnsureCapacity(256);
auto buf = io_buf.AppendBuffer();
VLOG(2) << "Socket read: " << reqs_.size() << " " << num_resp;
VLOG(2) << "Socket read: " << reqs_.size() << " " << num_resp_;

::io::Result<size_t> recv_sz = socket_->Recv(buf);
if (!recv_sz && FiberSocketBase::IsConnClosed(recv_sz.error())) {
Expand All @@ -353,21 +354,7 @@ void Driver::ReceiveFb(base::Histogram* dest) {
io_buf.CommitWrite(*recv_sz);

if (protocol == RESP) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser.Parse(io_buf.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) {
++hit_count_;
}
parse_args.clear();
pop_req();
}
io_buf.ConsumeInput(consumed);
} while (result == RedisParser::OK);
ParseRESP(&parser, &io_buf, dest);
} else {
// MC_TEXT
while (true) {
Expand All @@ -376,7 +363,7 @@ void Driver::ReceiveFb(base::Histogram* dest) {
break;
CHECK_EQ(line.back(), '\n');
if (line == "STORED\r\n" || line == "END\r\n") {
pop_req();
PopRequest(dest);
blob_len = 0;
} else if (absl::StartsWith(line, "VALUE")) {
// last token is a blob length.
Expand All @@ -403,6 +390,24 @@ void Driver::ReceiveFb(base::Histogram* dest) {
VLOG(1) << "ReceiveFb done";
}

void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf, base::Histogram* dest) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser->Parse(io_buf->InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) {
++hit_count_;
}
parse_args.clear();
PopRequest(dest);
}
io_buf->ConsumeInput(consumed);
} while (result == RedisParser::OK);
}

void TLocalClient::Connect(tcp::endpoint ep) {
VLOG(2) << "Connecting client...";
vector<fb2::Fiber> fbs(drivers_.size());
Expand Down Expand Up @@ -476,7 +481,7 @@ int main(int argc, char* argv[]) {

const uint32_t qps = GetFlag(FLAGS_qps);
const int64_t interval = 1000000000LL / qps;
uint32_t num_reqs = GetFlag(FLAGS_n);
uint64_t num_reqs = GetFlag(FLAGS_n);

CONSOLE_INFO << "Running all threads, sending " << num_reqs << " requests at a rate of "
<< GetFlag(FLAGS_qps) << " qps per connection, i.e. request every "
Expand Down Expand Up @@ -504,6 +509,7 @@ int main(int argc, char* argv[]) {
client.reset();
});

CONSOLE_INFO << "Effective QPS: " << num_reqs * 1000 / (duration / absl::Milliseconds(1));
CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString();
if (hit_opportunities) {
CONSOLE_INFO << "----------------------------------\nHit rate: "
Expand Down
5 changes: 1 addition & 4 deletions tools/local/monitoring/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ services:
restart: unless-stopped
ports:
- "11211:11211"
command:
- '-t 8'
- '-m 10000'
- '--pidfile=/memcached/memcached.pid'
command: "-t 8 -m 10000 -c 10000 --pidfile=/memcached/memcached.pid"
pid: host
volumes:
- memcached_data:/memcached
Expand Down

0 comments on commit 374a5f5

Please sign in to comment.