Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add brpop cmd #48

Merged
merged 22 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,84 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};

auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
it->second->emplace_back(expire_time, client, type);
}
}
Comment on lines +109 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for empty keys.

The function should validate that the keys vector is not empty before proceeding with the blocking operation.

Apply this diff to add input validation:

 void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
                                           std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
+  if (keys.empty()) {
+    return;
+  }
   std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
   auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto key : keys) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};
auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
it->second->emplace_back(expire_time, client, type);
}
}
void BaseCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time,
std::shared_ptr<PClient> client, BlockedConnNode::Type type) {
if (keys.empty()) {
return;
}
std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
for (auto key : keys) {
kiwi::BlockKey blpop_key{client->GetCurrentDB(), key};
auto it = key_to_conns.find(blpop_key);
if (it == key_to_conns.end()) {
key_to_conns.emplace(blpop_key, std::make_unique<std::list<BlockedConnNode>>());
it = key_to_conns.find(blpop_key);
}
it->second->emplace_back(expire_time, client, type);
}
}


void BaseCmd::ServeAndUnblockConns(PClient* client) {
kiwi::BlockKey key{client->GetCurrentDB(), client->Key()};

std::lock_guard<std::shared_mutex> map_lock(g_kiwi->GetBlockMtx());
auto& key_to_conns = g_kiwi->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}

auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的迭代器 在哪里 ++ 的, 不在 for()的最后 ++conn_blocked 是出于什么考虑

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for循环中包含迭代器的erase操作,返回值赋值等同于++

auto BlockedClient = conn_blocked->GetBlockedClient();

Comment on lines +142 to +143
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add null check for BlockedClient pointer.

The function should validate that the BlockedClient pointer is not null before dereferencing it.

Apply this diff to add null check:

     auto BlockedClient = conn_blocked->GetBlockedClient();
+    if (!BlockedClient) {
+      conn_blocked = waitting_list->erase(conn_blocked);
+      continue;
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto BlockedClient = conn_blocked->GetBlockedClient();
auto BlockedClient = conn_blocked->GetBlockedClient();
if (!BlockedClient) {
conn_blocked = waitting_list->erase(conn_blocked);
continue;
}

if (BlockedClient->State() == ClientState::kClosed) {
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
continue;
}

switch (conn_blocked->GetCmdType()) {
case BlockedConnNode::Type::BLPop:
s = STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements);
break;
case BlockedConnNode::Type::BRPop:
s = STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements);
break;
}

if (s.ok()) {
BlockedClient->AppendArrayLen(2);
BlockedClient->AppendString(client->Key());
BlockedClient->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
BlockedClient->SetRes(CmdRes::kErrOther, s.ToString());
}
BlockedClient->SendPacket();
// remove this conn from current waiting list
conn_blocked = waitting_list->erase(conn_blocked);
g_kiwi->CleanBlockedNodes(BlockedClient);
}
}

bool BlockedConnNode::IsExpired(std::chrono::system_clock::time_point now) {
if (expire_time_ == 0) {
return false;
}
int64_t now_in_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

判断是否过期, 从调用 IsExpired 的地方传递一个 时间戳过来, 会不会更好,或者提供一个函数重载,可以通过外部传入时间戳, 避免在多次调用时,重复获取当前时间

if (expire_time_ <= now_in_ms) {
return true;
}
return false;
}

bool BaseCmdGroup::DoInitial(PClient* client) {
client->SetSubCmdName(client->argv_[1]);
if (!subCmds_.contains(client->SubCmdName())) {
Expand Down
37 changes: 37 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ const std::string kCmdNameRPush = "rpush";
const std::string kCmdNameRPushx = "rpushx";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameBRPop = "brpop";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLTrim = "ltrim";
Expand Down Expand Up @@ -219,6 +221,22 @@ enum AclCategory {
kAclCategoryRaft = (1 << 21),
};

class BlockedConnNode {
public:
enum Type { NotAny = 0, BLPop, BRPop };
virtual ~BlockedConnNode() = default;
BlockedConnNode(int64_t expire_time, std::shared_ptr<PClient> client, Type type)
: expire_time_(expire_time), client_(client), type_(type) {}
bool IsExpired(std::chrono::system_clock::time_point now = std::chrono::system_clock::now());
std::shared_ptr<PClient> GetBlockedClient() { return client_; }
Type GetCmdType() { return type_; }

private:
Type type_ = NotAny;
int64_t expire_time_ = 0;
std::shared_ptr<PClient> client_;
};

/**
* @brief Base class for all commands
* BaseCmd, as the base class for all commands, mainly implements some common functions
Expand Down Expand Up @@ -282,6 +300,11 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(PClient* client);

void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, int64_t expire_time, std::shared_ptr<PClient> client,
BlockedConnNode::Type type);

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down Expand Up @@ -321,4 +344,18 @@ class BaseCmdGroup : public BaseCmd {
private:
std::map<std::string, std::unique_ptr<BaseCmd>> subCmds_;
};

struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi.
BlockKey(int db_id, const std::string& key) : db_id_(db_id), key_(key) {}

int db_id_ = -1;
std::string key_;
bool operator==(const BlockKey& p) const { return p.db_id_ == db_id_ && p.key_ == key_; }
};
struct BlockKeyHash {
std::size_t operator()(const BlockKey& k) const {
return std::hash<int>{}(k.db_id_) ^ std::hash<std::string>{}(k.key_);
}
};

} // namespace kiwi
2 changes: 2 additions & 0 deletions src/cmd_admin.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ void SortCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
client->SetKey(store_key_);
ServeAndUnblockConns(client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
Expand Down
1 change: 1 addition & 0 deletions src/cmd_keys.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ void RenameCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]);
if (s.ok()) {
client->SetRes(CmdRes::kOK);
client->SetKey(client->argv_[2]);
} else if (s.IsNotFound()) {
client->SetRes(CmdRes::kNotFound, s.ToString());
} else {
Expand Down
62 changes: 58 additions & 4 deletions src/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

#include "cmd_list.h"
#include "src/scope_record_lock.h"
#include "std_string.h"
#include "store.h"

Expand All @@ -27,6 +28,7 @@ void LPushCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -60,10 +62,6 @@ RPoplpushCmd::RPoplpushCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool RPoplpushCmd::DoInitial(PClient* client) {
if (((arity_ > 0 && client->argv_.size() != arity_) || (arity_ < 0 && client->argv_.size() < -arity_))) {
client->SetRes(CmdRes::kWrongNum, kCmdNameRPoplpush);
return false;
}
source_ = client->argv_[1];
receiver_ = client->argv_[2];
return true;
Expand All @@ -75,6 +73,8 @@ void RPoplpushCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value);
if (s.ok()) {
client->AppendString(value);
client->SetKey(receiver_);
ServeAndUnblockConns(client);
} else if (s.IsNotFound()) {
client->AppendString("");
} else if (s.IsInvalidArgument()) {
Expand All @@ -99,6 +99,7 @@ void RPushCmd::DoCmd(PClient* client) {
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num);
if (s.ok()) {
client->AppendInteger(reply_num);
ServeAndUnblockConns(client);
} else if (s.IsInvalidArgument()) {
client->SetRes(CmdRes::kMultiKey);
} else {
Expand Down Expand Up @@ -172,6 +173,59 @@ void RPopCmd::DoCmd(PClient* client) {
}
}

BLPopCmd::BLPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BLPopCmd::DoInitial(PClient* client) { return true; }

void BLPopCmd::DoCmd(PClient* client) {}
Comment on lines +176 to +181
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement BLPopCmd functionality.

The BLPopCmd has an empty implementation. It should mirror BRPopCmd's functionality but for the left side of the list.

Would you like me to generate the implementation for BLPopCmd?


BRPopCmd::BRPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BRPopCmd::DoInitial(PClient* client) {
std::vector<std::string> keys(client->argv_.begin() + 1, client->argv_.end() - 1);
client->SetKey(keys);

int64_t timeout = 0;
if (!kstd::String2int(client->argv_.back(), &timeout)) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}
if (timeout < 0) {
client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value");
return false;
}
if (timeout > 0) {
auto now = std::chrono::system_clock::now();
expire_time_ =
std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count() + timeout * 1000;
}
return true;
}

void BRPopCmd::DoCmd(PClient* client) {
std::vector<std::string> elements;
std::vector<std::string> list_keys(client->Keys().begin(), client->Keys().end());
storage::MultiScopeRecordLock(STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys);
for (auto& list_key : list_keys) {
storage::Status s =
STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPopWithoutLock(list_key, 1, &elements);
if (s.ok()) {
client->AppendArrayLen(2);
client->AppendString(list_key);
client->AppendString(elements[0]);
return;
} else if (s.IsNotFound()) {
continue;
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BRPop);
}

LRangeCmd::LRangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategoryList) {}

Expand Down
25 changes: 25 additions & 0 deletions src/cmd_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd {
private:
void DoCmd(PClient* client) override;
};

class BLPopCmd : public BaseCmd {
public:
BLPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class BRPopCmd : public BaseCmd {
public:
BRPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
int64_t expire_time_{0};
};

class LRangeCmd : public BaseCmd {
public:
LRangeCmd(const std::string& name, int16_t arity);
Expand Down
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(LPush, -3);
ADD_COMMAND(RPush, -3);
ADD_COMMAND(RPop, 2);
ADD_COMMAND(BLPop, -3);
ADD_COMMAND(BRPop, -3);
ADD_COMMAND(LRem, 4);
ADD_COMMAND(LRange, 4);
ADD_COMMAND(LTrim, 4);
Expand Down
55 changes: 55 additions & 0 deletions src/kiwi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,57 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr<kiwi::PClient>& cl
ClientMap::getInstance().AddClient(client->GetUniqueID(), client);
}

void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;

std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}

for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}
Comment on lines +171 to +200
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for SendPacket in ScanEvictedBlockedConnsOfBlrpop.

The method should handle potential exceptions from SendPacket to ensure proper cleanup.

 void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
   std::vector<kiwi::BlockKey> keys_need_remove;
   std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
   auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto& it : key_to_blocked_conns) {
     auto& conns_list = it.second;
     for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
       auto conn_ptr = conn_node->GetBlockedClient();
+      try {
         if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
           conn_node = conns_list->erase(conn_node);
           CleanBlockedNodes(conn_ptr);
         } else if (conn_node->IsExpired()) {
           conn_ptr->AppendString("");
           conn_ptr->SendPacket();
           conn_node = conns_list->erase(conn_node);
           CleanBlockedNodes(conn_ptr);
         } else {
           ++conn_node;
         }
+      } catch (...) {
+        conn_node = conns_list->erase(conn_node);
+        CleanBlockedNodes(conn_ptr);
+      }
     }
     if (conns_list->empty()) {
       keys_need_remove.push_back(it.first);
     }
   }
   for (auto& remove_key : keys_need_remove) {
     key_to_blocked_conns.erase(remove_key);
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;
std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}
for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}
void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() {
std::vector<kiwi::BlockKey> keys_need_remove;
std::lock_guard<std::shared_mutex> map_lock(block_mtx_);
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& it : key_to_blocked_conns) {
auto& conns_list = it.second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
auto conn_ptr = conn_node->GetBlockedClient();
try {
if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else if (conn_node->IsExpired()) {
conn_ptr->AppendString("");
conn_ptr->SendPacket();
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
} else {
++conn_node;
}
} catch (...) {
conn_node = conns_list->erase(conn_node);
CleanBlockedNodes(conn_ptr);
}
}
if (conns_list->empty()) {
keys_need_remove.push_back(it.first);
}
}
for (auto& remove_key : keys_need_remove) {
key_to_blocked_conns.erase(remove_key);
}
}


void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}
Comment on lines +202 to +220
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential iterator invalidation in CleanBlockedNodes.

The method modifies the list while iterating, which could lead to iterator invalidation.

 void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
   std::vector<kiwi::BlockKey> blocked_keys;
   for (auto key : client->Keys()) {
     blocked_keys.emplace_back(client->GetCurrentDB(), key);
   }
   auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
   for (auto& blocked_key : blocked_keys) {
     const auto& it = key_to_blocked_conns.find(blocked_key);
     if (it != key_to_blocked_conns.end()) {
       auto& conns_list = it->second;
-      for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
+      for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
         if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
-          conns_list->erase(conn_node);
+          conn_node = conns_list->erase(conn_node);
           break;
+        } else {
+          ++conn_node;
         }
       }
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conns_list->erase(conn_node);
break;
}
}
}
}
}
void KiwiDB::CleanBlockedNodes(const std::shared_ptr<kiwi::PClient>& client) {
std::vector<kiwi::BlockKey> blocked_keys;
for (auto key : client->Keys()) {
blocked_keys.emplace_back(client->GetCurrentDB(), key);
}
auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns();
for (auto& blocked_key : blocked_keys) {
const auto& it = key_to_blocked_conns.find(blocked_key);
if (it != key_to_blocked_conns.end()) {
auto& conns_list = it->second;
for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) {
if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) {
conn_node = conns_list->erase(conn_node);
break;
} else {
++conn_node;
}
}
}
}
}


bool KiwiDB::Init() {
char runid[kRunidSize + 1] = "";
getRandomHexChars(runid, kRunidSize);
Expand Down Expand Up @@ -241,6 +292,10 @@ bool KiwiDB::Init() {
timerTask->SetCallback([]() { PREPL.Cron(); });
event_server_->AddTimerTask(timerTask);

auto BLRPopTimerTask = std::make_shared<net::CommonTimerTask>(250);
BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this));
event_server_->AddTimerTask(BLRPopTimerTask);

time(&start_time_s_);

return true;
Expand Down
Loading
Loading