Skip to content

Commit

Permalink
feat: add brpop cmd (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks authored Feb 22, 2025
2 parents e4b39dc + e5e1ba6 commit 87bf491
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 9 deletions.
78 changes: 78 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,84 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};

auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
it->second->emplace_back(expire_time, client, type);
}
}

void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};

std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}

auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
auto BlockedClient = conn_blocked->GetBlockedClient();

if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
}
}

bool BlockedConnNode::IsExpired(std::chrono::system_clock::time_point now) {
if (expire_time_ == 0) {
return false;
}
int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
if (expire_time_ <= now_in_ms) {
return true;
}
return false;
}

bool BaseCmdGroup::DoInitial(PClient* client) {
client->SetSubCmdName(client->argv_[1]);
if (!subCmds_.contains(client->SubCmdName())) {
Expand Down
37 changes: 37 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ const std::string kCmdNameRPush = "rpush";
const std::string kCmdNameRPushx = "rpushx";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameBRPop = "brpop";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLTrim = "ltrim";
Expand Down Expand Up @@ -219,6 +221,22 @@ enum AclCategory {
kAclCategoryRaft = (1 << 21),
};

class BlockedConnNode {
public:
enum Type { NotAny = 0, BLPop, BRPop };
virtual ~BlockedConnNode() = default;
BlockedConnNode(int64_t expire_time, std::shared_ptr<PClient> client, Type type)
: expire_time_(expire_time), client_(client), type_(type) {}
bool IsExpired(std::chrono::system_clock::time_point now = std::chrono::system_clock::now());
std::shared_ptr<PClient> GetBlockedClient() { return client_; }
Type GetCmdType() { return type_; }

private:
Type type_ = NotAny;
int64_t expire_time_ = 0;
std::shared_ptr<PClient> client_;
};

/**
* @brief Base class for all commands
* BaseCmd, as the base class for all commands, mainly implements some common functions
Expand Down Expand Up @@ -282,6 +300,11 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(PClient* client);

void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, std::shared_ptr<PClient> client,
BlockedConnNode::Type type);

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down Expand Up @@ -321,4 +344,18 @@ class BaseCmdGroup : public BaseCmd {
private:
std::map<std::string, std::unique_ptr<BaseCmd>> subCmds_;
};

struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi.
BlockKey(int db_id, const std::string& key) : db_id_(db_id), key_(key) {}

int db_id_ = -1;
std::string key_;
bool operator==(const BlockKey& p) const { return p.db_id_ == db_id_ && p.key_ == key_; }
};
struct BlockKeyHash {
std::size_t operator()(const BlockKey& k) const {
return std::hash<int>{}(k.db_id_) ^ std::hash<std::string>{}(k.key_);
}
};

} // namespace kiwi
2 changes: 2 additions & 0 deletions src/cmd_admin.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ void SortCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
client->SetKey(store_key_);
ServeAndUnblockConns(client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down
1 change: 1 addition & 0 deletions src/cmd_keys.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ void RenameCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
client->SetKey(client->argv_[2]);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else {
Expand Down
62 changes: 58 additions & 4 deletions src/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

#include "cmd_list.h"
#include "src/scope_record_lock.h"
#include "std_string.h"
#include "store.h"

Expand All @@ -27,6 +28,7 @@ void LPushCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -60,10 +62,6 @@ RPoplpushCmd::RPoplpushCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool RPoplpushCmd::DoInitial(PClient* client) {
if (((arity_ > 0 && client->argv_.size() != arity_) || (arity_ < 0 && client->argv_.size() < -arity_))) {
client->SetRes(CmdRes::kWrongNum, kCmdNameRPoplpush);
return false;
}
source_ = client->argv_[1];
receiver_ = client->argv_[2];
return true;
Expand All @@ -75,6 +73,8 @@ void RPoplpushCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value);
if (s.ok()) {
client->AppendString(value);
client->SetKey(receiver_);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->AppendString("");
} else if (s.IsInvalidArgument()) {
Expand All @@ -99,6 +99,7 @@ void RPushCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -172,6 +173,59 @@ void RPopCmd::DoCmd(PClient* client) {
}
}

BLPopCmd::BLPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BLPopCmd::DoInitial(PClient* client) { return true; }

void BLPopCmd::DoCmd(PClient* client) {}

BRPopCmd::BRPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BRPopCmd::DoInitial(PClient* client) {
std::vector<std::string> keys(client->argv_.begin() + 1, client->argv_.end() - 1);
client->SetKey(keys);

int64_t timeout = 0;
if (!kstd::String2int(client->argv_.back(), &timeout)) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}
if (timeout < 0) {
client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value");
return false;
}
if (timeout > 0) {
auto now = std::chrono::system_clock::now();
expire_time_ =
std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count() + timeout * 1000;
}
return true;
}

void BRPopCmd::DoCmd(PClient* client) {
std::vector<std::string> elements;
std::vector<std::string> list_keys(client->Keys().begin(), client->Keys().end());
storage::MultiScopeRecordLock(STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys);
for (auto& list_key : list_keys) {
storage::Status s =
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPopWithoutLock(list_key, 1, &elements);
if (s.ok()) {
client->AppendArrayLen(2);
client->AppendString(list_key);
client->AppendString(elements[0]);
return;
} else if (s.IsNotFound()) {
continue;
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BRPop);
}

LRangeCmd::LRangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategoryList) {}

Expand Down
25 changes: 25 additions & 0 deletions src/cmd_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd {
private:
void DoCmd(PClient* client) override;
};

class BLPopCmd : public BaseCmd {
public:
BLPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class BRPopCmd : public BaseCmd {
public:
BRPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class LRangeCmd : public BaseCmd {
public:
LRangeCmd(const std::string& name, int16_t arity);
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(LPush, -3);
ADD_COMMAND(RPush, -3);
ADD_COMMAND(RPop, 2);
ADD_COMMAND(BLPop, -3);
ADD_COMMAND(BRPop, -3);
ADD_COMMAND(LRem, 4);
ADD_COMMAND(LRange, 4);
ADD_COMMAND(LTrim, 4);
Expand Down
55 changes: 55 additions & 0 deletions src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,57 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& cl
ClientMap::getInstance().AddClient(client->GetUniqueID(), client);
}

void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;

std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}

for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}

void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}

bool KiwiDB::Init() {
char runid[kRunidSize + 1] = "";
getRandomHexChars(runid, kRunidSize);
Expand Down Expand Up @@ -241,6 +292,10 @@ bool KiwiDB::Init() {
timerTask->SetCallback([]() { PREPL.Cron(); });
event_server_->AddTimerTask(timerTask);

auto BLRPopTimerTask = std::make_shared<net::CommonTimerTask>(250);
BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this));
event_server_->AddTimerTask(BLRPopTimerTask);

time(&start_time_s_);

return true;
Expand Down
Loading

0 comments on commit 87bf491

Please sign in to comment.