From d525c40df8239cf79738ef0279f0a86a0dc5850d Mon Sep 17 00:00:00 2001 From: SamuelSze1 Date: Sat, 21 Sep 2024 16:47:23 +0800 Subject: [PATCH 01/18] feat: add bpop cmd --- src/base_cmd.cc | 87 +++++++++++++++++++++++++++++++++++++++- src/base_cmd.h | 36 +++++++++++++++++ src/cmd_admin.cc | 2 + src/cmd_keys.cc | 2 + src/cmd_list.cc | 4 ++ src/cmd_list.h | 25 ++++++++++++ src/cmd_table_manager.cc | 2 + src/kiwi.cc | 26 +++++++++++- src/kiwi.h | 27 +++++++++++-- 9 files changed, 206 insertions(+), 5 deletions(-) mode change 100644 => 100755 src/cmd_admin.cc mode change 100644 => 100755 src/cmd_keys.cc mode change 100644 => 100755 src/cmd_list.cc mode change 100644 => 100755 src/cmd_table_manager.cc diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 6ed3715d..30324519 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -15,8 +15,8 @@ #include "common.h" #include "config.h" -#include "log.h" #include "kiwi.h" +#include "log.h" #include "praft/praft.h" namespace kiwi { @@ -106,6 +106,91 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) { return subCmd->second.get(); } +void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, + BlockedConnNode::Type type) { + std::unique_lock latch(g_kiwi->GetBlockMtx()); + auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); + std::shared_ptr> is_done = std::make_shared>(false); + for (auto key : keys) { + kiwi::BlockKey blpop_key{client->GetCurrentDB(), key}; + + auto it = key_to_conns.find(blpop_key); + if (it == key_to_conns.end()) { + key_to_conns.emplace(blpop_key, std::make_unique>()); + it = key_to_conns.find(blpop_key); + } + auto& wait_list_of_this_key = it->second; + wait_list_of_this_key->emplace_back(expire_time, client, type, is_done); + } +} + +void BaseCmd::ServeAndUnblockConns(PClient* client) { + kiwi::BlockKey key{client->GetCurrentDB(), client->Key()}; + + std::shared_lock read_latch(g_kiwi->GetBlockMtx()); + auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); + auto it = key_to_conns.find(key); + if (it == key_to_conns.end()) { + // no client is waitting for this key + return; + } + read_latch.unlock(); + + std::unique_lock write_lock(g_kiwi->GetBlockMtx()); + auto& waitting_list = it->second; + std::vector elements; + storage::Status s; + + // traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“ + for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) { + if (conn_blocked->is_done_->exchange(true)) { + conn_blocked = waitting_list->erase(conn_blocked); + continue; + } + + PClient* BlockedClient = (*conn_blocked).GetBlockedClient(); + + if (BlockedClient->State() == ClientState::kClosed) { + conn_blocked = waitting_list->erase(conn_blocked); + continue; + } + + switch (conn_blocked->GetCmdType()) { + case BlockedConnNode::Type::BLPop: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements); + break; + case BlockedConnNode::Type::BRPop: + s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements); + break; + } + + if (s.ok()) { + BlockedClient->AppendArrayLen(2); + BlockedClient->AppendString(client->Key()); + BlockedClient->AppendString(elements[0]); + } else if (s.IsNotFound()) { + // this key has no more elements to serve more blocked conn. + break; + } else { + BlockedClient->SetRes(CmdRes::kErrOther, s.ToString()); + } + BlockedClient->SendPacket(); + conn_blocked = waitting_list->erase(conn_blocked); // remove this conn from current waiting list + } +} + +bool BlockedConnNode::IsExpired() { + if (expire_time_ == 0) { + return false; + } + auto now = std::chrono::system_clock::now(); + int64_t now_in_ms = std::chrono::time_point_cast(now).time_since_epoch().count(); + if (expire_time_ <= now_in_ms) { + return true; + } + return false; +} + bool BaseCmdGroup::DoInitial(PClient* client) { client->SetSubCmdName(client->argv_[1]); if (!subCmds_.contains(client->SubCmdName())) { diff --git a/src/base_cmd.h b/src/base_cmd.h index 53295835..626708f5 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -135,6 +135,8 @@ const std::string kCmdNameRPush = "rpush"; const std::string kCmdNameRPushx = "rpushx"; const std::string kCmdNameLPop = "lpop"; const std::string kCmdNameRPop = "rpop"; +const std::string kCmdNameBLPop = "blpop"; +const std::string kCmdNameBRPop = "brpop"; const std::string kCmdNameLRem = "lrem"; const std::string kCmdNameLRange = "lrange"; const std::string kCmdNameLTrim = "ltrim"; @@ -210,6 +212,23 @@ enum AclCategory { kAclCategoryRaft = (1 << 21), }; +class BlockedConnNode { + public: + enum Type { BLPop = 0, BRPop }; + virtual ~BlockedConnNode() {} + BlockedConnNode(int64_t expire_time, PClient* client, Type type, std::shared_ptr> is_done) + : expire_time_(expire_time), client_(client), type_(type), is_done_(is_done) {} + bool IsExpired(); + PClient* GetBlockedClient() { return client_; } + std::shared_ptr> is_done_; + Type GetCmdType() { return type_; } + + private: + Type type_; + int64_t expire_time_; + PClient* client_; +}; + /** * @brief Base class for all commands * BaseCmd, as the base class for all commands, mainly implements some common functions @@ -273,6 +292,11 @@ class BaseCmd : public std::enable_shared_from_this { uint32_t GetCmdID() const; + void ServeAndUnblockConns(PClient* client); + + void BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, + BlockedConnNode::Type type); + protected: // Execute a specific command virtual void DoCmd(PClient* client) = 0; @@ -312,4 +336,16 @@ class BaseCmdGroup : public BaseCmd { private: std::map> subCmds_; }; + +struct BlockKey { // this data struct is made for the scenario of multi dbs in pika. + int db_id; + std::string key; + bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } +}; +struct BlockKeyHash { + std::size_t operator()(const BlockKey& k) const { + return std::hash{}(k.db_id) ^ std::hash{}(k.key); + } +}; + } // namespace kiwi diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc old mode 100644 new mode 100755 index 3e214d63..12da8441 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -592,6 +592,8 @@ void SortCmd::DoCmd(PClient* client) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(store_key_, ret_, &reply_num); if (s.ok()) { client->AppendInteger(reply_num); + client->SetKey(store_key_); + ServeAndUnblockConns(client); } else { client->SetRes(CmdRes::kErrOther, s.ToString()); } diff --git a/src/cmd_keys.cc b/src/cmd_keys.cc old mode 100644 new mode 100755 index 9003e639..bcbc763e --- a/src/cmd_keys.cc +++ b/src/cmd_keys.cc @@ -250,6 +250,8 @@ void RenameCmd::DoCmd(PClient* client) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->Rename(client->Key(), client->argv_[2]); if (s.ok()) { client->SetRes(CmdRes::kOK); + client->SetKey(client->argv_[2]); + ServeAndUnblockConns(client); } else if (s.IsNotFound()) { client->SetRes(CmdRes::kNotFound, s.ToString()); } else { diff --git a/src/cmd_list.cc b/src/cmd_list.cc old mode 100644 new mode 100755 index 64a4dbe1..383d514b --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -27,6 +27,7 @@ void LPushCmd::DoCmd(PClient* client) { PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPush(client->Key(), list_values, &reply_num); if (s.ok()) { client->AppendInteger(reply_num); + ServeAndUnblockConns(client); } else if (s.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { @@ -74,6 +75,8 @@ void RPoplpushCmd::DoCmd(PClient* client) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPoplpush(source_, receiver_, &value); if (s.ok()) { client->AppendString(value); + client->SetKey(receiver_); + ServeAndUnblockConns(client); } else if (s.IsNotFound()) { client->AppendStringLen(-1); } else if (s.IsInvalidArgument()) { @@ -98,6 +101,7 @@ void RPushCmd::DoCmd(PClient* client) { PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPush(client->Key(), list_values, &reply_num); if (s.ok()) { client->AppendInteger(reply_num); + ServeAndUnblockConns(client); } else if (s.IsInvalidArgument()) { client->SetRes(CmdRes::kMultiKey); } else { diff --git a/src/cmd_list.h b/src/cmd_list.h index 409681f6..906090cc 100644 --- a/src/cmd_list.h +++ b/src/cmd_list.h @@ -43,6 +43,31 @@ class RPopCmd : public BaseCmd { private: void DoCmd(PClient* client) override; }; + +class BLPopCmd : public BaseCmd { + public: + BLPopCmd(const std::string& name, int16_t arity); + + protected: + bool DoInitial(PClient* client) override; + + private: + void DoCmd(PClient* client) override; + int64_t expire_time_{0}; +}; + +class BRPopCmd : public BaseCmd { + public: + BRPopCmd(const std::string& name, int16_t arity); + + protected: + bool DoInitial(PClient* client) override; + + private: + void DoCmd(PClient* client) override; + int64_t expire_time_{0}; +}; + class LRangeCmd : public BaseCmd { public: LRangeCmd(const std::string& name, int16_t arity); diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc old mode 100644 new mode 100755 index 589ccd90..02b91d2d --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -154,6 +154,8 @@ void CmdTableManager::InitCmdTable() { ADD_COMMAND(LPush, -3); ADD_COMMAND(RPush, -3); ADD_COMMAND(RPop, 2); + ADD_COMMAND(BLPop, -3); + ADD_COMMAND(BRPop, -3); ADD_COMMAND(LRem, 4); ADD_COMMAND(LRange, 4); ADD_COMMAND(LTrim, 4); diff --git a/src/kiwi.cc b/src/kiwi.cc index b9f6353f..04a87771 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -165,6 +165,26 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr& cl client->OnConnect(); } +void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { + std::unique_lock latch(block_mtx_); + auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns(); + for (auto& it : key_to_blocked_conns) { + auto& conns_list = it.second; + for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) { + if (conn_node->is_done_->exchange(true) || conn_node->GetBlockedClient()->State() == ClientState::kClosed) { + conn_node = conns_list->erase(conn_node); + } else if (conn_node->IsExpired()) { + PClient* conn_ptr = conn_node->GetBlockedClient(); + conn_ptr->AppendString(""); + conn_ptr->SendPacket(); + conn_node = conns_list->erase(conn_node); + } else { + conn_node++; + } + } + } +} + bool KiwiDB::Init() { char runid[kRunidSize + 1] = ""; getRandomHexChars(runid, kRunidSize); @@ -201,7 +221,7 @@ bool KiwiDB::Init() { PREPL.SetMasterAddr(g_config.master_ip.ToString().c_str(), g_config.master_port.load()); } - event_server_ =std::make_unique>>(num); + event_server_ = std::make_unique>>(num); event_server_->SetRwSeparation(true); @@ -232,6 +252,10 @@ bool KiwiDB::Init() { timerTask->SetCallback([]() { PREPL.Cron(); }); event_server_->AddTimerTask(timerTask); + auto BLRPopTimerTask = std::make_shared(250); + BLRPopTimerTask->SetCallback(std::bind(&KiwiDB::ScanEvictedBlockedConnsOfBlrpop, this)); + event_server_->AddTimerTask(BLRPopTimerTask); + time(&start_time_s_); return true; diff --git a/src/kiwi.h b/src/kiwi.h index 6921d84c..3fd68c6f 100644 --- a/src/kiwi.h +++ b/src/kiwi.h @@ -54,13 +54,19 @@ class KiwiDB final { event_server_->SendPacket(client, std::move(msg)); } + std::unordered_map>, kiwi::BlockKeyHash>& + GetMapFromKeyToConns() { + return key_to_blocked_conns_; + } + + std::shared_mutex& GetBlockMtx() { return block_mtx_; }; + + void ScanEvictedBlockedConnsOfBlrpop(); inline void SendPacket2Client(const std::shared_ptr& client, std::string&& msg) { event_server_->SendPacket(client, std::move(msg)); } - inline void CloseConnection(const std::shared_ptr& client) { - event_server_->CloseConnection(client); - } + inline void CloseConnection(const std::shared_ptr& client) { event_server_->CloseConnection(client); } void TCPConnect( const net::SocketAddr& addr, @@ -88,6 +94,21 @@ class KiwiDB final { uint32_t cmd_id_ = 0; time_t start_time_s_ = 0; + + /* + * Blpop/BRpop used + */ + /* key_to_blocked_conns_: + * mapping from key to a list that stored the nodes of client-connections that + * were blocked by command blpop/brpop with key. + */ + std::unordered_map>, kiwi::BlockKeyHash> + key_to_blocked_conns_; + + /* + * latch of above map. + */ + std::shared_mutex block_mtx_; }; extern std::unique_ptr g_kiwi; From c5d9c3fbe8db648b03041d3ee5c2096cde41c74a Mon Sep 17 00:00:00 2001 From: luky116 Date: Sat, 16 Nov 2024 19:06:23 +0800 Subject: [PATCH 02/18] reforamt code --- src/proto_parser.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) mode change 100755 => 100644 src/proto_parser.cc diff --git a/src/proto_parser.cc b/src/proto_parser.cc old mode 100755 new mode 100644 index c0e78446..b13d6e3c --- a/src/proto_parser.cc +++ b/src/proto_parser.cc @@ -26,7 +26,6 @@ void PProtoParser::Reset() { numOfParam_ = 0; params_.clear(); - } PParseResult PProtoParser::ParseRequest(const char*& ptr, const char* end) { @@ -96,7 +95,7 @@ PParseResult PProtoParser::parseStrval(const char*& ptr, const char* end, PStrin assert(paramLen_ >= 0); if (static_cast(end - ptr) < paramLen_ + 2) { - paramLen_-=(end-ptr); + paramLen_ -= (end - ptr); result.append(ptr, end - ptr); return PParseResult::kWait; } From ea7da1b06f540b89a8330f7c76e90e483eefb80b Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Thu, 5 Dec 2024 22:12:22 +0800 Subject: [PATCH 03/18] fixed some potential bugs --- src/base_cmd.cc | 21 ++++++++------------- src/base_cmd.h | 11 +++++------ src/kiwi.cc | 37 ++++++++++++++++++++++++++++++++++--- src/kiwi.h | 2 ++ 4 files changed, 49 insertions(+), 22 deletions(-) diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 30324519..1d8d64a2 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -108,9 +108,8 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) { void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, BlockedConnNode::Type type) { - std::unique_lock latch(g_kiwi->GetBlockMtx()); + std::lock_guard map_lock(g_kiwi->GetBlockMtx()); auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); - std::shared_ptr> is_done = std::make_shared>(false); for (auto key : keys) { kiwi::BlockKey blpop_key{client->GetCurrentDB(), key}; @@ -120,38 +119,32 @@ void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_ it = key_to_conns.find(blpop_key); } auto& wait_list_of_this_key = it->second; - wait_list_of_this_key->emplace_back(expire_time, client, type, is_done); + wait_list_of_this_key->emplace_back(expire_time, client, type); } } void BaseCmd::ServeAndUnblockConns(PClient* client) { kiwi::BlockKey key{client->GetCurrentDB(), client->Key()}; - std::shared_lock read_latch(g_kiwi->GetBlockMtx()); + std::lock_guard map_lock(g_kiwi->GetBlockMtx()); auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); auto it = key_to_conns.find(key); if (it == key_to_conns.end()) { // no client is waitting for this key return; } - read_latch.unlock(); - std::unique_lock write_lock(g_kiwi->GetBlockMtx()); auto& waitting_list = it->second; std::vector elements; storage::Status s; // traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“ for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) { - if (conn_blocked->is_done_->exchange(true)) { - conn_blocked = waitting_list->erase(conn_blocked); - continue; - } - - PClient* BlockedClient = (*conn_blocked).GetBlockedClient(); + auto BlockedClient = conn_blocked->GetBlockedClient(); if (BlockedClient->State() == ClientState::kClosed) { conn_blocked = waitting_list->erase(conn_blocked); + CleanBlockedNodes(BlockedClient); continue; } @@ -175,7 +168,9 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) { BlockedClient->SetRes(CmdRes::kErrOther, s.ToString()); } BlockedClient->SendPacket(); - conn_blocked = waitting_list->erase(conn_blocked); // remove this conn from current waiting list + // remove this conn from current waiting list + conn_blocked = waitting_list->erase(conn_blocked); + CleanBlockedNodes(BlockedClient); } } diff --git a/src/base_cmd.h b/src/base_cmd.h index 626708f5..10f5b777 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -216,17 +216,16 @@ class BlockedConnNode { public: enum Type { BLPop = 0, BRPop }; virtual ~BlockedConnNode() {} - BlockedConnNode(int64_t expire_time, PClient* client, Type type, std::shared_ptr> is_done) - : expire_time_(expire_time), client_(client), type_(type), is_done_(is_done) {} + BlockedConnNode(int64_t expire_time, PClient* client, Type type) + : expire_time_(expire_time), client_(client), type_(type) {} bool IsExpired(); - PClient* GetBlockedClient() { return client_; } - std::shared_ptr> is_done_; + std::shared_ptr GetBlockedClient() { return client_; } Type GetCmdType() { return type_; } private: Type type_; int64_t expire_time_; - PClient* client_; + std::shared_ptr client_; }; /** @@ -337,7 +336,7 @@ class BaseCmdGroup : public BaseCmd { std::map> subCmds_; }; -struct BlockKey { // this data struct is made for the scenario of multi dbs in pika. +struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi. int db_id; std::string key; bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } diff --git a/src/kiwi.cc b/src/kiwi.cc index e094f341..005a27ee 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -158,22 +158,53 @@ void KiwiDB::OnNewConnection(uint64_t connId, std::shared_ptr& cl } void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { - std::unique_lock latch(block_mtx_); + std::vector keys_need_remove; + + std::lock_guard map_lock(block_mtx_); auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns(); for (auto& it : key_to_blocked_conns) { auto& conns_list = it.second; for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) { - if (conn_node->is_done_->exchange(true) || conn_node->GetBlockedClient()->State() == ClientState::kClosed) { + if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) { conn_node = conns_list->erase(conn_node); + CleanBlockedNodes(conn_ptr); } else if (conn_node->IsExpired()) { - PClient* conn_ptr = conn_node->GetBlockedClient(); + auto conn_ptr = conn_node->GetBlockedClient(); conn_ptr->AppendString(""); conn_ptr->SendPacket(); conn_node = conns_list->erase(conn_node); + CleanBlockedNodes(conn_ptr); } else { conn_node++; } } + if(conns_list.empty()){ + keys_need_remove.push_back(it.first); + } + } + + for(auto& remove_key : keys_need_remove){ + key_to_blocked_conns.erase(remove_key); + } +} + +void KiwiDB::CleanBlockedNodes(const std::shared_ptr& client){ + std::vector blocked_keys; + for(auto key:client->Keys()){ + blocked_keys.emplace_back(client->GetCurrentDB(), key); + } + auto &key_to_blocked_conns=g_kiwi->GetMapFromKeyToConns(); + for(auto& blocked_key : blocked_keys){ + auto &it=key_to_blocked_conns.find(blocked_key); + if(it!=key_to_blocked_conns.end()){ + auto &conns_list=it->second; + for(auto conn_node=conns_list->begin();conn_node!=conns_list->end();conn_node++){ + if(conn_node->GetBlockedClient()->GetConnId()==client->GetConnId()){ + conns_list->erase(conn_node); + break; + } + } + } } } diff --git a/src/kiwi.h b/src/kiwi.h index 54fae442..8cf4207d 100644 --- a/src/kiwi.h +++ b/src/kiwi.h @@ -70,6 +70,8 @@ class KiwiDB final { std::shared_mutex& GetBlockMtx() { return block_mtx_; }; void ScanEvictedBlockedConnsOfBlrpop(); + // erase all blocked nodes of this client + void CleanBlockedNodes(const std::shared_ptr& client); inline void SendPacket2Client(const std::shared_ptr& client, std::string&& msg) { event_server_->SendPacket(client, std::move(msg)); } From 60c7bf81e5c65969b4b268e34cf99d319e59edfc Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 7 Dec 2024 21:58:09 +0800 Subject: [PATCH 04/18] add brpop command --- src/base_cmd.cc | 4 ++-- src/cmd_keys.cc | 1 - src/cmd_list.cc | 56 +++++++++++++++++++++++++++++++++++++++++++++---- src/kiwi.cc | 8 +++---- 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 1d8d64a2..6ad95a0e 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -144,7 +144,7 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) { if (BlockedClient->State() == ClientState::kClosed) { conn_blocked = waitting_list->erase(conn_blocked); - CleanBlockedNodes(BlockedClient); + g_kiwi->CleanBlockedNodes(BlockedClient); continue; } @@ -170,7 +170,7 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) { BlockedClient->SendPacket(); // remove this conn from current waiting list conn_blocked = waitting_list->erase(conn_blocked); - CleanBlockedNodes(BlockedClient); + g_kiwi->CleanBlockedNodes(BlockedClient); } } diff --git a/src/cmd_keys.cc b/src/cmd_keys.cc index bcbc763e..09d2064e 100755 --- a/src/cmd_keys.cc +++ b/src/cmd_keys.cc @@ -251,7 +251,6 @@ void RenameCmd::DoCmd(PClient* client) { if (s.ok()) { client->SetRes(CmdRes::kOK); client->SetKey(client->argv_[2]); - ServeAndUnblockConns(client); } else if (s.IsNotFound()) { client->SetRes(CmdRes::kNotFound, s.ToString()); } else { diff --git a/src/cmd_list.cc b/src/cmd_list.cc index 383d514b..0a9b013e 100755 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -61,10 +61,6 @@ RPoplpushCmd::RPoplpushCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} bool RPoplpushCmd::DoInitial(PClient* client) { - if (((arity_ > 0 && client->argv_.size() != arity_) || (arity_ < 0 && client->argv_.size() < -arity_))) { - client->SetRes(CmdRes::kWrongNum, kCmdNameRPoplpush); - return false; - } source_ = client->argv_[1]; receiver_ = client->argv_[2]; return true; @@ -175,6 +171,58 @@ void RPopCmd::DoCmd(PClient* client) { } } +BLPopCmd::BLPopCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} + +bool BLPopCmd::DoInitial(PClient* client) { return true; } + +void BLPopCmd::DoCmd(PClient* client) {} + +BRPopCmd::BRPopCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} + +bool BRPopCmd::DoInitial(PClient* client) { + std::vector keys(client->argv_.begin() + 1, client->argv_.end()-1); + client->SetKey(keys); + + int64_t timeout = 0; + if (!pstd::String2int(client->argv_.back(), &timeout)) { + client->SetRes(CmdRes::kInvalidInt); + return false; + } + if (timeout < 0 ) { + client->SetRes(CmdRes::kErrOther, + "timeout can't be a negative value"); + return false; + } + if (timeout > 0) { + auto now = std::chrono::system_clock::now(); + expire_time_ = + std::chrono::time_point_cast(now).time_since_epoch().count() + timeout * 1000; + } + return true; +} + +void BRPopCmd::DoCmd(PClient* client) { + std::vector elements; + std::vector list_keys(client->Keys().begin(), client->Keys().end()); + for(auto &list_key:list_keys){ + storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(list_key, 1, &elements); + if (s.ok()) { + client->AppendArrayLen(2); + client->AppendString(list_key); + client->AppendString(elements[0]); + return; + } else if (s.IsNotFound()) { + continue; + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + return; + } + } + BlockThisClientToWaitLRPush(list_keys,expire_time_,client,BlockedConnNode::Type::BRPop); +} + LRangeCmd::LRangeCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategoryList) {} diff --git a/src/kiwi.cc b/src/kiwi.cc index 005a27ee..458204e0 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -165,11 +165,11 @@ void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { for (auto& it : key_to_blocked_conns) { auto& conns_list = it.second; for (auto conn_node = conns_list->begin(); conn_node != conns_list->end();) { + auto conn_ptr = conn_node->GetBlockedClient(); if (conn_node->GetBlockedClient()->State() == ClientState::kClosed) { conn_node = conns_list->erase(conn_node); CleanBlockedNodes(conn_ptr); } else if (conn_node->IsExpired()) { - auto conn_ptr = conn_node->GetBlockedClient(); conn_ptr->AppendString(""); conn_ptr->SendPacket(); conn_node = conns_list->erase(conn_node); @@ -178,7 +178,7 @@ void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { conn_node++; } } - if(conns_list.empty()){ + if(conns_list->empty()){ keys_need_remove.push_back(it.first); } } @@ -189,13 +189,13 @@ void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { } void KiwiDB::CleanBlockedNodes(const std::shared_ptr& client){ - std::vector blocked_keys; + std::vector blocked_keys; for(auto key:client->Keys()){ blocked_keys.emplace_back(client->GetCurrentDB(), key); } auto &key_to_blocked_conns=g_kiwi->GetMapFromKeyToConns(); for(auto& blocked_key : blocked_keys){ - auto &it=key_to_blocked_conns.find(blocked_key); + const auto &it=key_to_blocked_conns.find(blocked_key); if(it!=key_to_blocked_conns.end()){ auto &conns_list=it->second; for(auto conn_node=conns_list->begin();conn_node!=conns_list->end();conn_node++){ From 561f0340a0f340c651a9d7aa9c04e2dcdfa9348f Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 7 Dec 2024 22:47:30 +0800 Subject: [PATCH 05/18] format --- src/cmd_admin.h | 4 ++-- src/cmd_list.cc | 11 +++++------ src/kiwi.cc | 24 ++++++++++++------------ src/kiwi.h | 2 +- src/net/base_event.h | 2 +- src/net/base_socket.h | 2 +- src/net/callback_function.h | 3 +-- src/net/client_socket.h | 2 +- src/net/epoll_event.h | 2 +- src/net/event_server.h | 6 ++++-- src/net/io_thread.h | 2 +- src/net/kqueue_event.h | 2 +- src/net/thread_manager.h | 13 +++++++++---- src/storage/src/base_data_value_format.h | 2 +- 14 files changed, 41 insertions(+), 36 deletions(-) mode change 100755 => 100644 src/cmd_list.cc diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 37f23c17..b3f48b75 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -37,7 +37,7 @@ class CmdConfig : public BaseCmdGroup { private: // std::vector subCmd_; - void DoCmd(PClient* client) override{}; + void DoCmd(PClient* client) override {}; }; class CmdConfigGet : public BaseCmd { @@ -178,7 +178,7 @@ class CmdDebug : public BaseCmdGroup { bool DoInitial(PClient* client) override { return true; }; private: - void DoCmd(PClient* client) override{}; + void DoCmd(PClient* client) override {}; }; class CmdDebugHelp : public BaseCmd { diff --git a/src/cmd_list.cc b/src/cmd_list.cc old mode 100755 new mode 100644 index 0a9b013e..c9161921 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -182,7 +182,7 @@ BRPopCmd::BRPopCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {} bool BRPopCmd::DoInitial(PClient* client) { - std::vector keys(client->argv_.begin() + 1, client->argv_.end()-1); + std::vector keys(client->argv_.begin() + 1, client->argv_.end() - 1); client->SetKey(keys); int64_t timeout = 0; @@ -190,9 +190,8 @@ bool BRPopCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kInvalidInt); return false; } - if (timeout < 0 ) { - client->SetRes(CmdRes::kErrOther, - "timeout can't be a negative value"); + if (timeout < 0) { + client->SetRes(CmdRes::kErrOther, "timeout can't be a negative value"); return false; } if (timeout > 0) { @@ -206,7 +205,7 @@ bool BRPopCmd::DoInitial(PClient* client) { void BRPopCmd::DoCmd(PClient* client) { std::vector elements; std::vector list_keys(client->Keys().begin(), client->Keys().end()); - for(auto &list_key:list_keys){ + for (auto& list_key : list_keys) { storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(list_key, 1, &elements); if (s.ok()) { client->AppendArrayLen(2); @@ -220,7 +219,7 @@ void BRPopCmd::DoCmd(PClient* client) { return; } } - BlockThisClientToWaitLRPush(list_keys,expire_time_,client,BlockedConnNode::Type::BRPop); + BlockThisClientToWaitLRPush(list_keys, expire_time_, client, BlockedConnNode::Type::BRPop); } LRangeCmd::LRangeCmd(const std::string& name, int16_t arity) diff --git a/src/kiwi.cc b/src/kiwi.cc index 458204e0..32e68093 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -178,28 +178,28 @@ void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { conn_node++; } } - if(conns_list->empty()){ + if (conns_list->empty()) { keys_need_remove.push_back(it.first); } } - - for(auto& remove_key : keys_need_remove){ + + for (auto& remove_key : keys_need_remove) { key_to_blocked_conns.erase(remove_key); } } -void KiwiDB::CleanBlockedNodes(const std::shared_ptr& client){ +void KiwiDB::CleanBlockedNodes(const std::shared_ptr& client) { std::vector blocked_keys; - for(auto key:client->Keys()){ + for (auto key : client->Keys()) { blocked_keys.emplace_back(client->GetCurrentDB(), key); } - auto &key_to_blocked_conns=g_kiwi->GetMapFromKeyToConns(); - for(auto& blocked_key : blocked_keys){ - const auto &it=key_to_blocked_conns.find(blocked_key); - if(it!=key_to_blocked_conns.end()){ - auto &conns_list=it->second; - for(auto conn_node=conns_list->begin();conn_node!=conns_list->end();conn_node++){ - if(conn_node->GetBlockedClient()->GetConnId()==client->GetConnId()){ + auto& key_to_blocked_conns = g_kiwi->GetMapFromKeyToConns(); + for (auto& blocked_key : blocked_keys) { + const auto& it = key_to_blocked_conns.find(blocked_key); + if (it != key_to_blocked_conns.end()) { + auto& conns_list = it->second; + for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); conn_node++) { + if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) { conns_list->erase(conn_node); break; } diff --git a/src/kiwi.h b/src/kiwi.h index 8cf4207d..0f2020bb 100644 --- a/src/kiwi.h +++ b/src/kiwi.h @@ -71,7 +71,7 @@ class KiwiDB final { void ScanEvictedBlockedConnsOfBlrpop(); // erase all blocked nodes of this client - void CleanBlockedNodes(const std::shared_ptr& client); + void CleanBlockedNodes(const std::shared_ptr& client); inline void SendPacket2Client(const std::shared_ptr& client, std::string&& msg) { event_server_->SendPacket(client, std::move(msg)); } diff --git a/src/net/base_event.h b/src/net/base_event.h index fd0abdec..4d0a97c4 100644 --- a/src/net/base_event.h +++ b/src/net/base_event.h @@ -45,7 +45,7 @@ class BaseEvent : public std::enable_shared_from_this { const static int EVENT_HUB; BaseEvent(const std::shared_ptr &listen, int8_t mode, int8_t type) - : listen_(listen), mode_(mode), type_(type){}; + : listen_(listen), mode_(mode), type_(type) {}; virtual ~BaseEvent() = default; diff --git a/src/net/base_socket.h b/src/net/base_socket.h index a1c8b39e..e57c4b6f 100644 --- a/src/net/base_socket.h +++ b/src/net/base_socket.h @@ -34,7 +34,7 @@ class BaseSocket : public NetEvent { ~BaseSocket() override = default; - void OnError() override{}; + void OnError() override {}; void Close() override; diff --git a/src/net/callback_function.h b/src/net/callback_function.h index 10385c50..1ceb1c59 100644 --- a/src/net/callback_function.h +++ b/src/net/callback_function.h @@ -35,8 +35,7 @@ concept HasSetFdFunction = requires(T t, uint64_t id, int8_t index) { { (*t).GetConnId() } -> std::same_as; // GetFd return type is int { (*t).SetThreadIndex(index) } -> std::same_as; // SetThreadIndex return type is void { (*t).GetThreadIndex() } -> std::same_as; // GetThreadIndex return type is int8_t -} -|| std::is_class_v; // If T is an ordinary class, the member function is called directly +} || std::is_class_v; // If T is an ordinary class, the member function is called directly template requires HasSetFdFunction diff --git a/src/net/client_socket.h b/src/net/client_socket.h index 307a2688..fbde78b2 100644 --- a/src/net/client_socket.h +++ b/src/net/client_socket.h @@ -13,7 +13,7 @@ namespace net { class ClientSocket : public StreamSocket { public: - explicit ClientSocket(const SocketAddr& addr) : StreamSocket(0, SOCKET_TCP), addr_(addr){}; + explicit ClientSocket(const SocketAddr& addr) : StreamSocket(0, SOCKET_TCP), addr_(addr) {}; ~ClientSocket() override = default; diff --git a/src/net/epoll_event.h b/src/net/epoll_event.h index d0efeb14..5f2d3ff2 100644 --- a/src/net/epoll_event.h +++ b/src/net/epoll_event.h @@ -23,7 +23,7 @@ namespace net { class EpollEvent : public BaseEvent { public: explicit EpollEvent(const std::shared_ptr &listen, int8_t mode) - : BaseEvent(listen, mode, BaseEvent::EVENT_TYPE_EPOLL){}; + : BaseEvent(listen, mode, BaseEvent::EVENT_TYPE_EPOLL) {}; ~EpollEvent() override { Close(); } diff --git a/src/net/event_server.h b/src/net/event_server.h index 4c353520..e9f61fc4 100644 --- a/src/net/event_server.h +++ b/src/net/event_server.h @@ -104,7 +104,8 @@ class EventServer final { }; template -requires HasSetFdFunction std::pair EventServer::StartServer(int64_t interval) { +requires HasSetFdFunction +std::pair EventServer::StartServer(int64_t interval) { if (threadNum_ <= 0) { return std::pair(false, "thread num must be greater than 0"); } @@ -143,7 +144,8 @@ requires HasSetFdFunction std::pair EventServer::StartS } template -requires HasSetFdFunction std::pair EventServer::StartClientServer() { +requires HasSetFdFunction +std::pair EventServer::StartClientServer() { if (threadNum_ <= 0) { return std::pair(false, "thread num must be greater than 0"); } diff --git a/src/net/io_thread.h b/src/net/io_thread.h index 4420b611..d9ed681f 100644 --- a/src/net/io_thread.h +++ b/src/net/io_thread.h @@ -16,7 +16,7 @@ namespace net { class IOThread { public: - explicit IOThread(const std::shared_ptr &event) : baseEvent_(event){}; + explicit IOThread(const std::shared_ptr &event) : baseEvent_(event) {}; ~IOThread() = default; diff --git a/src/net/kqueue_event.h b/src/net/kqueue_event.h index e013d19f..d21b4026 100644 --- a/src/net/kqueue_event.h +++ b/src/net/kqueue_event.h @@ -24,7 +24,7 @@ namespace net { class KqueueEvent : public BaseEvent { public: explicit KqueueEvent(std::shared_ptr listen, int8_t mode) - : BaseEvent(std::move(listen), mode, BaseEvent::EVENT_TYPE_KQUEUE){}; + : BaseEvent(std::move(listen), mode, BaseEvent::EVENT_TYPE_KQUEUE) {}; ~KqueueEvent() override { Close(); } diff --git a/src/net/thread_manager.h b/src/net/thread_manager.h index 5204d2dd..d831b326 100644 --- a/src/net/thread_manager.h +++ b/src/net/thread_manager.h @@ -114,7 +114,10 @@ class ThreadManager { }; template -requires HasSetFdFunction ThreadManager::~ThreadManager() { Stop(); } +requires HasSetFdFunction +ThreadManager::~ThreadManager() { + Stop(); +} template requires HasSetFdFunction @@ -201,7 +204,9 @@ void ThreadManager::OnNetEventClose(uint64_t connId, std::string &&err) { template requires HasSetFdFunction -void ThreadManager::CloseConnection(uint64_t connId) { OnNetEventClose(connId, ""); } +void ThreadManager::CloseConnection(uint64_t connId) { + OnNetEventClose(connId, ""); +} template requires HasSetFdFunction @@ -326,8 +331,8 @@ bool ThreadManager::CreateWriteThread() { } template -requires HasSetFdFunction uint64_t ThreadManager::DoTCPConnect(T &t, int fd, - const std::shared_ptr &conn) { +requires HasSetFdFunction +uint64_t ThreadManager::DoTCPConnect(T &t, int fd, const std::shared_ptr &conn) { auto connId = getConnId(); if constexpr (IsPointer_v) { t->SetConnId(connId); diff --git a/src/storage/src/base_data_value_format.h b/src/storage/src/base_data_value_format.h index 2b933afd..e765092d 100644 --- a/src/storage/src/base_data_value_format.h +++ b/src/storage/src/base_data_value_format.h @@ -97,7 +97,7 @@ class ParsedBaseDataValue : public ParsedInternalValue { } protected: - virtual void SetVersionToValue() override{}; + virtual void SetVersionToValue() override {}; private: const size_t kBaseDataValueSuffixLength = kSuffixReserveLength + kTimestampLength; From 0ae4aa2ea8a2fccc9ca21e8bceba4e03f2f7a119 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Tue, 24 Dec 2024 16:37:16 +0800 Subject: [PATCH 06/18] Fixed the potential multi-threading issue caused by brpop --- src/cmd_list.cc | 5 ++- src/storage/include/storage/storage.h | 7 ++++ src/storage/src/redis.cc | 4 +-- src/storage/src/redis.h | 3 +- src/storage/src/redis_lists.cc | 47 +++++++++++++++++++++++++++ src/storage/src/storage.cc | 9 ++++- 6 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/cmd_list.cc b/src/cmd_list.cc index c9161921..68458d5d 100644 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -9,6 +9,7 @@ #include "cmd_list.h" #include "pstd_string.h" +#include "src/scope_record_lock.h" #include "store.h" namespace kiwi { @@ -205,8 +206,10 @@ bool BRPopCmd::DoInitial(PClient* client) { void BRPopCmd::DoCmd(PClient* client) { std::vector elements; std::vector list_keys(client->Keys().begin(), client->Keys().end()); + storage::MultiScopeRecordLock(PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys); for (auto& list_key : list_keys) { - storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(list_key, 1, &elements); + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPopWithoutLock(list_key, 1, &elements); if (s.ok()) { client->AppendArrayLen(2); client->AppendString(list_key); diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 7cc9e0ec..2fe1820a 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -28,6 +28,7 @@ #include "pstd/env.h" #include "pstd/pstd_mutex.h" #include "src/base_data_value_format.h" +#include "src/lock_mgr.h" #include "storage/slot_indexer.h" namespace kiwi { @@ -191,6 +192,8 @@ class Storage { std::unique_ptr& GetDBInstance(const std::string& key); + std::shared_ptr GetLockMgr() { return lock_mgr_; } + // Strings Commands // Set key to hold the string value. if key @@ -573,6 +576,8 @@ class Storage { // Removes and returns the last elements of the list stored at key. Status RPop(const Slice& key, int64_t count, std::vector* elements); + Status RPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); + // Returns the element at index index in the list stored at key. The index is // zero-based, so 0 means the first element, 1 the second element and so on. // Negative indices can be used to designate elements starting at the tail of @@ -1112,6 +1117,8 @@ class Storage { std::unique_ptr slot_indexer_; std::atomic is_opened_ = false; + std::shared_ptr lock_mgr_; + std::unique_ptr> cursors_store_; // Storage start the background thread for compaction task diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 2bba2252..fbd58081 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -45,10 +45,10 @@ rocksdb::Comparator* ZSetsScoreKeyComparator() { return &zsets_score_key_compare; } -Redis::Redis(Storage* const s, int32_t index) +Redis::Redis(Storage* const s, int32_t index, std::shared_ptr lock_mgr) : storage_(s), index_(index), - lock_mgr_(std::make_shared(1000, 0, std::make_shared())), + lock_mgr_(lock_mgr), small_compaction_threshold_(5000), small_compaction_duration_threshold_(10000) { statistics_store_ = std::make_unique>(); diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 0e16d773..22deeb45 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -35,7 +35,7 @@ using Slice = rocksdb::Slice; class Redis { public: - Redis(Storage* storage, int32_t index); + Redis(Storage* storage, int32_t index, std::shared_ptr lock_mgr); virtual ~Redis(); rocksdb::DB* GetDB() { return db_; } @@ -251,6 +251,7 @@ class Redis { Status LSet(const Slice& key, int64_t index, const Slice& value); Status LTrim(const Slice& key, int64_t start, int64_t stop); Status RPop(const Slice& key, int64_t count, std::vector* elements); + Status RPopWithoutLock(const Slice& key, int64_t count, std::vector* elements); Status RPoplpush(const Slice& source, const Slice& destination, std::string* element); Status RPush(const Slice& key, const std::vector& values, uint64_t* ret); Status RPushx(const Slice& key, const std::vector& values, uint64_t* len); diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index 43191f5e..33073283 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -746,6 +746,53 @@ Status Redis::RPop(const Slice& key, int64_t count, std::vector* el return s; } +Status Redis::RPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { + uint32_t statistic = 0; + elements->clear(); + + auto batch = Batch::CreateBatch(this); + + std::string meta_value; + + BaseMetaKey base_meta_key(key); + Status s = db_->Get(default_read_options_, handles_[kMetaCF], base_meta_key.Encode(), &meta_value); + if (s.ok()) { + if (IsStale(meta_value)) { + return Status::NotFound(); + } else if (!ExpectedMetaValue(DataType::kLists, meta_value)) { + return Status::InvalidArgument(fmt::format("WRONGTYPE, key: {}, expect type: {}, get type: {}", key.ToString(), + DataTypeStrings[static_cast(DataType::kLists)], + DataTypeStrings[static_cast(GetMetaValueType(meta_value))])); + } else { + ParsedListsMetaValue parsed_lists_meta_value(&meta_value); + auto size = static_cast(parsed_lists_meta_value.Count()); + uint64_t version = parsed_lists_meta_value.Version(); + int32_t start_index = 0; + auto stop_index = static_cast(count <= size ? count - 1 : size - 1); + int32_t cur_index = 0; + ListsDataKey lists_data_key(key, version, parsed_lists_meta_value.RightIndex() - 1); + rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[kListsDataCF]); + for (iter->SeekForPrev(lists_data_key.Encode()); iter->Valid() && cur_index <= stop_index; + iter->Prev(), ++cur_index) { + statistic++; + ParsedBaseDataValue parsed_value(iter->value()); + elements->push_back(parsed_value.UserValue().ToString()); + batch->Delete(kListsDataCF, iter->key()); + + parsed_lists_meta_value.ModifyCount(-1); + parsed_lists_meta_value.ModifyRightIndex(-1); + } + batch->Put(kMetaCF, base_meta_key.Encode(), meta_value); + delete iter; + } + } + if (batch->Count() != 0U) { + s = batch->Commit(); + UpdateSpecificKeyStatistics(DataType::kLists, key.ToString(), statistic); + } + return s; +} + Status Redis::RPoplpush(const Slice& source, const Slice& destination, std::string* element) { element->clear(); uint32_t statistic = 0; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 4c50d144..c8f5a4d6 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -146,8 +146,9 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d LogIndexAndSequenceCollector::max_gap_.store(storage_options.max_gap); storage_options.options.write_buffer_manager = std::make_shared(storage_options.mem_manager_size); + lock_mgr_ = std::make_shared(1000, 0, std::make_shared()); for (size_t index = 0; index < db_instance_num_; index++) { - insts_.emplace_back(std::make_unique(this, index)); + insts_.emplace_back(std::make_unique(this, index, lock_mgr_)); Status s = insts_.back()->Open(storage_options, AppendSubDirectory(db_path, index)); if (!s.ok()) { ERROR("open RocksDB{} failed {}", index, s.ToString()); @@ -905,6 +906,12 @@ Status Storage::RPop(const Slice& key, int64_t count, std::vector* return inst->RPop(key, count, elements); } +Status Storage::RPopWithoutLock(const Slice& key, int64_t count, std::vector* elements) { + elements->clear(); + auto& inst = GetDBInstance(key); + return inst->RPopWithoutLock(key, count, elements); +} + Status Storage::LIndex(const Slice& key, int64_t index, std::string* element) { element->clear(); auto& inst = GetDBInstance(key); From fec46cd2adfe33d0be31fef71502e759a6519009 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Tue, 24 Dec 2024 17:56:19 +0800 Subject: [PATCH 07/18] Fixed PClient double free --- src/base_cmd.cc | 4 ++-- src/base_cmd.h | 4 ++-- src/cmd_list.cc | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 6ad95a0e..7b095df3 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -106,8 +106,8 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) { return subCmd->second.get(); } -void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, - BlockedConnNode::Type type) { +void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, + std::shared_ptr client, BlockedConnNode::Type type) { std::lock_guard map_lock(g_kiwi->GetBlockMtx()); auto& key_to_conns = g_kiwi->GetMapFromKeyToConns(); for (auto key : keys) { diff --git a/src/base_cmd.h b/src/base_cmd.h index 10f5b777..3a72ff25 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -216,7 +216,7 @@ class BlockedConnNode { public: enum Type { BLPop = 0, BRPop }; virtual ~BlockedConnNode() {} - BlockedConnNode(int64_t expire_time, PClient* client, Type type) + BlockedConnNode(int64_t expire_time, std::shared_ptr client, Type type) : expire_time_(expire_time), client_(client), type_(type) {} bool IsExpired(); std::shared_ptr GetBlockedClient() { return client_; } @@ -293,7 +293,7 @@ class BaseCmd : public std::enable_shared_from_this { void ServeAndUnblockConns(PClient* client); - void BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, PClient* client, + void BlockThisClientToWaitLRPush(std::vector& keys, int64_t expire_time, std::shared_ptr client, BlockedConnNode::Type type); protected: diff --git a/src/cmd_list.cc b/src/cmd_list.cc index 68458d5d..d384936b 100644 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -222,7 +222,7 @@ void BRPopCmd::DoCmd(PClient* client) { return; } } - BlockThisClientToWaitLRPush(list_keys, expire_time_, client, BlockedConnNode::Type::BRPop); + BlockThisClientToWaitLRPush(list_keys, expire_time_, client->shared_from_this(), BlockedConnNode::Type::BRPop); } LRangeCmd::LRangeCmd(const std::string& name, int16_t arity) From 30830a42600134c5cfd7c0c1a1f9fe0effa264ba Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Thu, 26 Dec 2024 21:35:34 +0800 Subject: [PATCH 08/18] changes for some comments --- src/base_cmd.cc | 3 +-- src/base_cmd.h | 4 ++-- src/kiwi.cc | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 5120a8b8..2fd0f9fa 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -174,11 +174,10 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) { } } -bool BlockedConnNode::IsExpired() { +bool BlockedConnNode::IsExpired(std::chrono::system_clock::time_point now) { if (expire_time_ == 0) { return false; } - auto now = std::chrono::system_clock::now(); int64_t now_in_ms = std::chrono::time_point_cast(now).time_since_epoch().count(); if (expire_time_ <= now_in_ms) { return true; diff --git a/src/base_cmd.h b/src/base_cmd.h index 38bb17f8..fba7d446 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -222,10 +222,10 @@ enum AclCategory { class BlockedConnNode { public: enum Type { BLPop = 0, BRPop }; - virtual ~BlockedConnNode() {} + virtual ~BlockedConnNode() =default; BlockedConnNode(int64_t expire_time, std::shared_ptr client, Type type) : expire_time_(expire_time), client_(client), type_(type) {} - bool IsExpired(); + bool IsExpired(std::chrono::system_clock::time_point now = std::chrono::system_clock::now()); std::shared_ptr GetBlockedClient() { return client_; } Type GetCmdType() { return type_; } diff --git a/src/kiwi.cc b/src/kiwi.cc index ac39241d..ab55fd35 100644 --- a/src/kiwi.cc +++ b/src/kiwi.cc @@ -178,7 +178,7 @@ void KiwiDB::ScanEvictedBlockedConnsOfBlrpop() { conn_node = conns_list->erase(conn_node); CleanBlockedNodes(conn_ptr); } else { - conn_node++; + ++conn_node; } } if (conns_list->empty()) { @@ -201,7 +201,7 @@ void KiwiDB::CleanBlockedNodes(const std::shared_ptr& client) { const auto& it = key_to_blocked_conns.find(blocked_key); if (it != key_to_blocked_conns.end()) { auto& conns_list = it->second; - for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); conn_node++) { + for (auto conn_node = conns_list->begin(); conn_node != conns_list->end(); ++conn_node) { if (conn_node->GetBlockedClient()->GetConnId() == client->GetConnId()) { conns_list->erase(conn_node); break; From 9bb4f8155a5be9e46b40e08391f3d66ebf07bd53 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 4 Jan 2025 20:22:57 +0800 Subject: [PATCH 09/18] add default parameters to some class members --- src/base_cmd.cc | 3 +- src/base_cmd.h | 10 +-- src/client.h | 3 +- src/net/event_server.h | 6 +- src/proto_parser.cc | 134 ----------------------------------------- 5 files changed, 12 insertions(+), 144 deletions(-) delete mode 100644 src/proto_parser.cc diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 2fd0f9fa..0fbfaa0e 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -118,8 +118,7 @@ void BaseCmd::BlockThisClientToWaitLRPush(std::vector& keys, int64_ key_to_conns.emplace(blpop_key, std::make_unique>()); it = key_to_conns.find(blpop_key); } - auto& wait_list_of_this_key = it->second; - wait_list_of_this_key->emplace_back(expire_time, client, type); + it->second->emplace_back(expire_time, client, type); } } diff --git a/src/base_cmd.h b/src/base_cmd.h index fba7d446..b9f36be8 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -221,8 +221,8 @@ enum AclCategory { class BlockedConnNode { public: - enum Type { BLPop = 0, BRPop }; - virtual ~BlockedConnNode() =default; + enum Type { NotAny = 0, BLPop, BRPop }; + virtual ~BlockedConnNode() = default; BlockedConnNode(int64_t expire_time, std::shared_ptr client, Type type) : expire_time_(expire_time), client_(client), type_(type) {} bool IsExpired(std::chrono::system_clock::time_point now = std::chrono::system_clock::now()); @@ -230,8 +230,8 @@ class BlockedConnNode { Type GetCmdType() { return type_; } private: - Type type_; - int64_t expire_time_; + Type type_ = NotAny; + int64_t expire_time_ = 0; std::shared_ptr client_; }; @@ -344,7 +344,7 @@ class BaseCmdGroup : public BaseCmd { }; struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi. - int db_id; + int db_id = -1; std::string key; bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } }; diff --git a/src/client.h b/src/client.h index f7fc7861..b4243118 100644 --- a/src/client.h +++ b/src/client.h @@ -136,7 +136,8 @@ class PClient : public std::enable_shared_from_this { // If T is of type string, then the contents of string must be numbers template - requires(std::integral || std::same_as) void AppendArrayLen(T value) { + requires(std::integral || std::same_as) + void AppendArrayLen(T value) { AppendStringRaw(fmt::format("*{}\r\n", value)); } diff --git a/src/net/event_server.h b/src/net/event_server.h index 63906e94..77428424 100644 --- a/src/net/event_server.h +++ b/src/net/event_server.h @@ -101,7 +101,8 @@ class EventServer final { }; template -requires HasSetFdFunction std::pair EventServer::StartServer(int64_t interval) { +requires HasSetFdFunction +std::pair EventServer::StartServer(int64_t interval) { if (opt_.GetThreadNum() <= 0) { return std::pair(false, "thread num must be greater than 0"); } @@ -140,7 +141,8 @@ requires HasSetFdFunction std::pair EventServer::StartS } template -requires HasSetFdFunction std::pair EventServer::StartClientServer() { +requires HasSetFdFunction +std::pair EventServer::StartClientServer() { if (opt_.GetThreadNum() <= 0) { return std::pair(false, "thread num must be greater than 0"); } diff --git a/src/proto_parser.cc b/src/proto_parser.cc deleted file mode 100644 index b13d6e3c..00000000 --- a/src/proto_parser.cc +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2023-present, Arana/Kiwi Community. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory - -/* - Responsible for interfacing with the Redis client protocol. - */ - -#include - -#include "common.h" -#include "proto_parser.h" - -// 1 request -> multi strlist -// 2 multi -> * number crlf -// 3 strlist -> str strlist | empty -// 4 str -> strlen strval -// 5 strlen -> $ number crlf -// 6 strval -> string crlf - -namespace kiwi { -void PProtoParser::Reset() { - multi_ = -1; - paramLen_ = -1; - numOfParam_ = 0; - - params_.clear(); -} - -PParseResult PProtoParser::ParseRequest(const char*& ptr, const char* end) { - if (multi_ == -1) { - auto parseRet = parseMulti(ptr, end, multi_); - if (parseRet == PParseResult::kError || multi_ < -1) { - return PParseResult::kError; - } - - if (parseRet != PParseResult::kOK) { - return PParseResult::kWait; - } - } - - return parseStrlist(ptr, end, params_); -} - -PParseResult PProtoParser::parseMulti(const char*& ptr, const char* end, int& result) { - if (end - ptr < 3) { - return PParseResult::kWait; - } - - if (*ptr != '*') { - return PParseResult::kError; - } - - ++ptr; - - return GetIntUntilCRLF(ptr, end - ptr, result); -} - -PParseResult PProtoParser::parseStrlist(const char*& ptr, const char* end, std::vector& results) { - while (static_cast(numOfParam_) < multi_) { - if (results.size() < numOfParam_ + 1) { - results.resize(numOfParam_ + 1); - } - - auto parseRet = parseStr(ptr, end, results[numOfParam_]); - - if (parseRet == PParseResult::kOK) { - ++numOfParam_; - } else { - return parseRet; - } - } - - results.resize(numOfParam_); - return PParseResult::kOK; -} - -PParseResult PProtoParser::parseStr(const char*& ptr, const char* end, PString& result) { - if (paramLen_ == -1) { - auto parseRet = parseStrlen(ptr, end, paramLen_); - if (parseRet == PParseResult::kError || paramLen_ < -1) { - return PParseResult::kError; - } - - if (parseRet != PParseResult::kOK) { - return PParseResult::kWait; - } - } - - return parseStrval(ptr, end, result); -} - -PParseResult PProtoParser::parseStrval(const char*& ptr, const char* end, PString& result) { - assert(paramLen_ >= 0); - - if (static_cast(end - ptr) < paramLen_ + 2) { - paramLen_ -= (end - ptr); - result.append(ptr, end - ptr); - return PParseResult::kWait; - } - - auto tail = ptr + paramLen_; - if (tail[0] != '\r' || tail[1] != '\n') { - return PParseResult::kError; - } - - result.append(ptr, tail - ptr); - ptr = tail + 2; - paramLen_ = -1; - - return PParseResult::kOK; -} - -PParseResult PProtoParser::parseStrlen(const char*& ptr, const char* end, int& result) { - if (end - ptr < 3) { - return PParseResult::kWait; - } - - if (*ptr != '$') { - return PParseResult::kError; - } - - ++ptr; - - const auto ret = GetIntUntilCRLF(ptr, end - ptr, result); - if (ret != PParseResult::kOK) { - --ptr; - } - - return ret; -} - -} // namespace kiwi From 5b0332ffed45113957c9d005381bc12845ca45b4 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Fri, 17 Jan 2025 15:35:06 +0800 Subject: [PATCH 10/18] format --- src/client.h | 3 +-- src/cmd_admin.h | 4 ++-- src/net/base_event.h | 2 +- src/net/base_socket.h | 2 +- src/net/callback_function.h | 3 ++- src/net/client_socket.h | 2 +- src/net/epoll_event.h | 2 +- src/net/event_server.h | 6 ++---- src/net/io_thread.h | 2 +- src/net/kqueue_event.h | 2 +- src/net/thread_manager.h | 13 ++++--------- src/storage/src/base_data_value_format.h | 2 +- 12 files changed, 18 insertions(+), 25 deletions(-) diff --git a/src/client.h b/src/client.h index b4243118..f7fc7861 100644 --- a/src/client.h +++ b/src/client.h @@ -136,8 +136,7 @@ class PClient : public std::enable_shared_from_this { // If T is of type string, then the contents of string must be numbers template - requires(std::integral || std::same_as) - void AppendArrayLen(T value) { + requires(std::integral || std::same_as) void AppendArrayLen(T value) { AppendStringRaw(fmt::format("*{}\r\n", value)); } diff --git a/src/cmd_admin.h b/src/cmd_admin.h index e61c7251..50971826 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -37,7 +37,7 @@ class CmdConfig : public BaseCmdGroup { private: // std::vector subCmd_; - void DoCmd(PClient* client) override {}; + void DoCmd(PClient* client) override{}; }; class CmdConfigGet : public BaseCmd { @@ -256,7 +256,7 @@ class CmdDebug : public BaseCmdGroup { bool DoInitial(PClient* client) override { return true; }; private: - void DoCmd(PClient* client) override {}; + void DoCmd(PClient* client) override{}; }; class CmdDebugHelp : public BaseCmd { diff --git a/src/net/base_event.h b/src/net/base_event.h index d4189a98..16a336e0 100644 --- a/src/net/base_event.h +++ b/src/net/base_event.h @@ -46,7 +46,7 @@ class BaseEvent : public std::enable_shared_from_this { const static int EVENT_NULL; BaseEvent(const std::shared_ptr &listen, int8_t mode, int8_t type) - : listen_(listen), mode_(mode), type_(type) {}; + : listen_(listen), mode_(mode), type_(type){}; virtual ~BaseEvent() = default; diff --git a/src/net/base_socket.h b/src/net/base_socket.h index 62195798..5161cf2f 100644 --- a/src/net/base_socket.h +++ b/src/net/base_socket.h @@ -34,7 +34,7 @@ class BaseSocket : public NetEvent { ~BaseSocket() override = default; - void OnError() override {}; + void OnError() override{}; void Close() override; diff --git a/src/net/callback_function.h b/src/net/callback_function.h index 1ceb1c59..10385c50 100644 --- a/src/net/callback_function.h +++ b/src/net/callback_function.h @@ -35,7 +35,8 @@ concept HasSetFdFunction = requires(T t, uint64_t id, int8_t index) { { (*t).GetConnId() } -> std::same_as; // GetFd return type is int { (*t).SetThreadIndex(index) } -> std::same_as; // SetThreadIndex return type is void { (*t).GetThreadIndex() } -> std::same_as; // GetThreadIndex return type is int8_t -} || std::is_class_v; // If T is an ordinary class, the member function is called directly +} +|| std::is_class_v; // If T is an ordinary class, the member function is called directly template requires HasSetFdFunction diff --git a/src/net/client_socket.h b/src/net/client_socket.h index 3117c993..6eb51641 100644 --- a/src/net/client_socket.h +++ b/src/net/client_socket.h @@ -13,7 +13,7 @@ namespace net { class ClientSocket : public StreamSocket { public: - explicit ClientSocket(const SocketAddr& addr) : StreamSocket(0, SOCKET_TCP), addr_(addr) {}; + explicit ClientSocket(const SocketAddr& addr) : StreamSocket(0, SOCKET_TCP), addr_(addr){}; ~ClientSocket() override = default; diff --git a/src/net/epoll_event.h b/src/net/epoll_event.h index 5f2d3ff2..d0efeb14 100644 --- a/src/net/epoll_event.h +++ b/src/net/epoll_event.h @@ -23,7 +23,7 @@ namespace net { class EpollEvent : public BaseEvent { public: explicit EpollEvent(const std::shared_ptr &listen, int8_t mode) - : BaseEvent(listen, mode, BaseEvent::EVENT_TYPE_EPOLL) {}; + : BaseEvent(listen, mode, BaseEvent::EVENT_TYPE_EPOLL){}; ~EpollEvent() override { Close(); } diff --git a/src/net/event_server.h b/src/net/event_server.h index 77428424..63906e94 100644 --- a/src/net/event_server.h +++ b/src/net/event_server.h @@ -101,8 +101,7 @@ class EventServer final { }; template -requires HasSetFdFunction -std::pair EventServer::StartServer(int64_t interval) { +requires HasSetFdFunction std::pair EventServer::StartServer(int64_t interval) { if (opt_.GetThreadNum() <= 0) { return std::pair(false, "thread num must be greater than 0"); } @@ -141,8 +140,7 @@ std::pair EventServer::StartServer(int64_t interval) { } template -requires HasSetFdFunction -std::pair EventServer::StartClientServer() { +requires HasSetFdFunction std::pair EventServer::StartClientServer() { if (opt_.GetThreadNum() <= 0) { return std::pair(false, "thread num must be greater than 0"); } diff --git a/src/net/io_thread.h b/src/net/io_thread.h index 00d22770..56169558 100644 --- a/src/net/io_thread.h +++ b/src/net/io_thread.h @@ -16,7 +16,7 @@ namespace net { class IOThread { public: - explicit IOThread(const std::shared_ptr &event) : baseEvent_(event) {}; + explicit IOThread(const std::shared_ptr &event) : baseEvent_(event){}; ~IOThread() = default; diff --git a/src/net/kqueue_event.h b/src/net/kqueue_event.h index d21b4026..e013d19f 100644 --- a/src/net/kqueue_event.h +++ b/src/net/kqueue_event.h @@ -24,7 +24,7 @@ namespace net { class KqueueEvent : public BaseEvent { public: explicit KqueueEvent(std::shared_ptr listen, int8_t mode) - : BaseEvent(std::move(listen), mode, BaseEvent::EVENT_TYPE_KQUEUE) {}; + : BaseEvent(std::move(listen), mode, BaseEvent::EVENT_TYPE_KQUEUE){}; ~KqueueEvent() override { Close(); } diff --git a/src/net/thread_manager.h b/src/net/thread_manager.h index 9fc1d4c7..66591cce 100644 --- a/src/net/thread_manager.h +++ b/src/net/thread_manager.h @@ -116,10 +116,7 @@ class ThreadManager { }; template -requires HasSetFdFunction -ThreadManager::~ThreadManager() { - Stop(); -} +requires HasSetFdFunction ThreadManager::~ThreadManager() { Stop(); } template requires HasSetFdFunction @@ -209,9 +206,7 @@ void ThreadManager::OnNetEventClose(uint64_t connId, std::string &&err) { template requires HasSetFdFunction -void ThreadManager::CloseConnection(uint64_t connId) { - OnNetEventClose(connId, ""); -} +void ThreadManager::CloseConnection(uint64_t connId) { OnNetEventClose(connId, ""); } template requires HasSetFdFunction @@ -335,8 +330,8 @@ bool ThreadManager::CreateWriteThread() { } template -requires HasSetFdFunction -uint64_t ThreadManager::DoTCPConnect(T &t, int fd, const std::shared_ptr &conn) { +requires HasSetFdFunction uint64_t ThreadManager::DoTCPConnect(T &t, int fd, + const std::shared_ptr &conn) { auto connId = getConnId(); if constexpr (IsPointer_v) { t->SetConnId(connId); diff --git a/src/storage/src/base_data_value_format.h b/src/storage/src/base_data_value_format.h index e765092d..2b933afd 100644 --- a/src/storage/src/base_data_value_format.h +++ b/src/storage/src/base_data_value_format.h @@ -97,7 +97,7 @@ class ParsedBaseDataValue : public ParsedInternalValue { } protected: - virtual void SetVersionToValue() override {}; + virtual void SetVersionToValue() override{}; private: const size_t kBaseDataValueSuffixLength = kSuffixReserveLength + kTimestampLength; From 8161ea97f58e59adc5935343de2d0ce0cdf903e5 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Wed, 22 Jan 2025 19:22:59 +0800 Subject: [PATCH 11/18] some changes for compatibility --- src/base_cmd.cc | 4 ++-- src/cmd_list.cc | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 587d4e24..6bc711c3 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -149,10 +149,10 @@ void BaseCmd::ServeAndUnblockConns(PClient* client) { switch (conn_blocked->GetCmdType()) { case BlockedConnNode::Type::BLPop: - s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements); + s = STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &elements); break; case BlockedConnNode::Type::BRPop: - s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements); + s = STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPop(client->Key(), 1, &elements); break; } diff --git a/src/cmd_list.cc b/src/cmd_list.cc index 51ea5cfb..bad773aa 100644 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -8,7 +8,7 @@ */ #include "cmd_list.h" -#include "pstd_string.h" +#include "std_string.h" #include "src/scope_record_lock.h" #include "store.h" @@ -188,7 +188,7 @@ bool BRPopCmd::DoInitial(PClient* client) { client->SetKey(keys); int64_t timeout = 0; - if (!pstd::String2int(client->argv_.back(), &timeout)) { + if (!kstd::String2int(client->argv_.back(), &timeout)) { client->SetRes(CmdRes::kInvalidInt); return false; } @@ -207,10 +207,10 @@ bool BRPopCmd::DoInitial(PClient* client) { void BRPopCmd::DoCmd(PClient* client) { std::vector elements; std::vector list_keys(client->Keys().begin(), client->Keys().end()); - storage::MultiScopeRecordLock(PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys); + storage::MultiScopeRecordLock(STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->GetLockMgr(), list_keys); for (auto& list_key : list_keys) { storage::Status s = - PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->RPopWithoutLock(list_key, 1, &elements); + STORE_INST.GetBackend(client->GetCurrentDB())->GetStorage()->RPopWithoutLock(list_key, 1, &elements); if (s.ok()) { client->AppendArrayLen(2); client->AppendString(list_key); From 617782f5de14584011c70b1ada322f2175fd7613 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Wed, 22 Jan 2025 19:33:23 +0800 Subject: [PATCH 12/18] format --- src/cmd_list.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmd_list.cc b/src/cmd_list.cc index bad773aa..9ffcdc07 100644 --- a/src/cmd_list.cc +++ b/src/cmd_list.cc @@ -8,8 +8,8 @@ */ #include "cmd_list.h" -#include "std_string.h" #include "src/scope_record_lock.h" +#include "std_string.h" #include "store.h" namespace kiwi { From 3bf80ee3fab5e5a30a83ec9aaa45db0e822ed11e Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 25 Jan 2025 17:21:50 +0800 Subject: [PATCH 13/18] change something for comment --- src/storage/src/redis_lists.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index a90e9e11..9a124499 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -721,7 +721,7 @@ Status Redis::RPop(const Slice& key, int64_t count, std::vector* el auto size = static_cast(parsed_lists_meta_value.Count()); uint64_t version = parsed_lists_meta_value.Version(); int32_t start_index = 0; - auto stop_index = static_cast(count <= size ? count - 1 : size - 1); + auto stop_index = static_cast(std::min(count, size) - 1); int32_t cur_index = 0; ListsDataKey lists_data_key(key, version, parsed_lists_meta_value.RightIndex() - 1); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[kListsDataCF]); @@ -768,7 +768,7 @@ Status Redis::RPopWithoutLock(const Slice& key, int64_t count, std::vector(parsed_lists_meta_value.Count()); uint64_t version = parsed_lists_meta_value.Version(); int32_t start_index = 0; - auto stop_index = static_cast(count <= size ? count - 1 : size - 1); + auto stop_index = static_cast(std::min(count, size) - 1); int32_t cur_index = 0; ListsDataKey lists_data_key(key, version, parsed_lists_meta_value.RightIndex() - 1); rocksdb::Iterator* iter = db_->NewIterator(default_read_options_, handles_[kListsDataCF]); From 14f302c73ddb889448a35f1ff1ab9b8bacb5608c Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 22 Feb 2025 21:27:08 +0800 Subject: [PATCH 14/18] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dmacos=20build=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/base_cmd.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/base_cmd.h b/src/base_cmd.h index 1cf02a4f..02db75f2 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -346,6 +346,8 @@ class BaseCmdGroup : public BaseCmd { }; struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi. + BlockKey(int client_db_id,const std::string&client_key):db_id(client_db_id),key(client_key){} + int db_id = -1; std::string key; bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } From ccf2fa09ae1f060cfd3ec5bfcac26bccd5025249 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 22 Feb 2025 21:36:24 +0800 Subject: [PATCH 15/18] =?UTF-8?q?=E4=B8=80=E4=BA=9B=E6=88=90=E5=91=98?= =?UTF-8?q?=E5=90=8D=E7=A7=B0=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/base_cmd.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/base_cmd.h b/src/base_cmd.h index 02db75f2..37986938 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -346,10 +346,10 @@ class BaseCmdGroup : public BaseCmd { }; struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi. - BlockKey(int client_db_id,const std::string&client_key):db_id(client_db_id),key(client_key){} + BlockKey(int db_id,const std::string&key):db_id_(db_id),key_(key){} - int db_id = -1; - std::string key; + int db_id_ = -1; + std::string key_; bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } }; struct BlockKeyHash { From f1aae55a8e501a528a14df8faaa616393abe2bb9 Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 22 Feb 2025 21:37:47 +0800 Subject: [PATCH 16/18] format --- src/base_cmd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/base_cmd.h b/src/base_cmd.h index 37986938..d2dff7fe 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -346,7 +346,7 @@ class BaseCmdGroup : public BaseCmd { }; struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi. - BlockKey(int db_id,const std::string&key):db_id_(db_id),key_(key){} + BlockKey(int db_id, const std::string&key) : db_id_(db_id), key_(key) {} int db_id_ = -1; std::string key_; From a69489cebe5575093da1fd7c31f876f2e46a1ebc Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 22 Feb 2025 21:39:22 +0800 Subject: [PATCH 17/18] format again --- src/base_cmd.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/base_cmd.h b/src/base_cmd.h index d2dff7fe..c20c5ff1 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -346,7 +346,7 @@ class BaseCmdGroup : public BaseCmd { }; struct BlockKey { // this data struct is made for the scenario of multi dbs in kiwi. - BlockKey(int db_id, const std::string&key) : db_id_(db_id), key_(key) {} + BlockKey(int db_id, const std::string& key) : db_id_(db_id), key_(key) {} int db_id_ = -1; std::string key_; From e5e1ba6e148a3ccbf9e210996d4253e73aa2a87a Mon Sep 17 00:00:00 2001 From: yy <3229833855@qq.com> Date: Sat, 22 Feb 2025 22:03:07 +0800 Subject: [PATCH 18/18] fix build error --- src/base_cmd.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/base_cmd.h b/src/base_cmd.h index c20c5ff1..9d923fc4 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -350,11 +350,11 @@ struct BlockKey { // this data struct is made for the scenario of multi dbs in int db_id_ = -1; std::string key_; - bool operator==(const BlockKey& p) const { return p.db_id == db_id && p.key == key; } + bool operator==(const BlockKey& p) const { return p.db_id_ == db_id_ && p.key_ == key_; } }; struct BlockKeyHash { std::size_t operator()(const BlockKey& k) const { - return std::hash{}(k.db_id) ^ std::hash{}(k.key); + return std::hash{}(k.db_id_) ^ std::hash{}(k.key_); } };