Skip to content

Commit

Permalink
Add CONFIG RESETSTAT command. Start working on RPOPLPUSH
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Apr 27, 2022
1 parent 72e90bb commit d3764ef
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) {
stats->io_write_bytes += builder->io_write_bytes();

for (const auto& k_v : builder->err_count()) {
stats->err_count[k_v.first] += k_v.second;
stats->err_count_map[k_v.first] += k_v.second;
}
builder->reset_io_stats();
}
Expand Down
8 changes: 4 additions & 4 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(num_replicas);
ADD(num_blocked_clients);

for (const auto& k_v : o.err_count) {
err_count[k_v.first] += k_v.second;
for (const auto& k_v : o.err_count_map) {
err_count_map[k_v.first] += k_v.second;
}

for (const auto& k_v : o.cmd_count) {
cmd_count[k_v.first] += k_v.second;
for (const auto& k_v : o.cmd_count_map) {
cmd_count_map[k_v.first] += k_v.second;
}

return *this;
Expand Down
4 changes: 2 additions & 2 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ using CmdArgVec = std::vector<MutableSlice>;


struct ConnectionStats {
absl::flat_hash_map<std::string, uint64_t> err_count;
absl::flat_hash_map<std::string, uint64_t> cmd_count;
absl::flat_hash_map<std::string, uint64_t> err_count_map;
absl::flat_hash_map<std::string, uint64_t> cmd_count_map;

size_t read_buf_capacity = 0;
size_t io_read_cnt = 0;
Expand Down
46 changes: 45 additions & 1 deletion src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ extern "C" {
#include <absl/strings/numbers.h>

#include "base/logging.h"

#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
Expand Down Expand Up @@ -238,6 +237,36 @@ void ListFamily::RPop(CmdArgList args, ConnectionContext* cntx) {
return PopGeneric(ListDir::RIGHT, std::move(args), cntx);
}

void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
string_view src = ArgS(args, 1);
string_view dest = ArgS(args, 2);

OpResult<string> result;
if (dest == src) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRPopLPushSingleKey(OpArgs{shard, t->db_index()}, src);
};

result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
return (*cntx)->SendError("tbd: not_implemented");
}

if (result) {
return (*cntx)->SendBulkString(*result);
}

switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
(*cntx)->SendNull();
break;

default:
(*cntx)->SendError(result.status());
break;
}
}

void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) {
auto key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand Down Expand Up @@ -782,6 +811,20 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
return str_vec;
}

OpResult<string> ListFamily::OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();

PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(op_args.db_ind, it);
string val = ListPop(ListDir::RIGHT, ql);
quicklistPushHead(ql, val.data(), val.size());
return val;
}

using CI = CommandId;

#define HFUNC(x) SetHandler(&ListFamily::x)
Expand All @@ -793,6 +836,7 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
<< CI{"RPOPLPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, 3, 1, 2, 1}.HFUNC(RPopLPush)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
Expand Down
3 changes: 3 additions & 0 deletions src/server/list_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ListFamily {
static void LRange(CmdArgList args, ConnectionContext* cntx);
static void LRem(CmdArgList args, ConnectionContext* cntx);
static void LSet(CmdArgList args, ConnectionContext* cntx);
static void RPopLPush(CmdArgList args, ConnectionContext* cntx);

static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);
static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args,
Expand All @@ -61,6 +62,8 @@ class ListFamily {

static OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long start,
long end);

static OpResult<std::string> OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key);
};

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)

std::move(multi_error).Cancel();

etl.connection_stats.cmd_count[cmd_name]++;
etl.connection_stats.cmd_count_map[cmd_name]++;

if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) {
// TODO: protect against aggregating huge transactions.
Expand Down
13 changes: 11 additions & 2 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,15 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
string_view res[2] = {param, "tbd"};

return (*cntx)->SendStringArr(res);
} else if (sub_cmd == "RESETSTAT") {
ess_.pool()->Await([](auto*) {
auto* stats = ServerState::tl_connection_stats();
stats->cmd_count_map.clear();
stats->err_count_map.clear();
stats->command_cnt = 0;
stats->async_writes_cnt = 0;
});
return (*cntx)->SendOk();
} else {
string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try CONFIG HELP.");
Expand Down Expand Up @@ -561,14 +570,14 @@ tcp_port:)";
append(StrCat("unknown_", k_v.first, ":"), k_v.second);
}

for (const auto& k_v : m.conn_stats.cmd_count) {
for (const auto& k_v : m.conn_stats.cmd_count_map) {
append(StrCat("cmd_", k_v.first, ":"), k_v.second);
}
}

if (should_enter("ERRORSTATS", true)) {
ADD_HEADER("# Errorstats");
for (const auto& k_v : m.conn_stats.err_count) {
for (const auto& k_v : m.conn_stats.err_count_map) {
append(StrCat(k_v.first, ":"), k_v.second);
}
}
Expand Down
6 changes: 6 additions & 0 deletions tests/gen_sets.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

memtier_benchmark -p 6379 --command "sadd __key__ __data__" -n 20 --threads=4 \
-c 10 --command-key-pattern=R --distinct-client-seed -c 30 --data-size=64 \
--key-prefix="key:" --hide-histogram --random-data --key-maximum=10000

0 comments on commit d3764ef

Please sign in to comment.