From 665b6fc0ef72d1aeb1e416dfbad7416a95cbdd96 Mon Sep 17 00:00:00 2001 From: Vincent Lohse Date: Thu, 24 Oct 2024 23:58:39 +0200 Subject: [PATCH] Multi-threaded support (WorkerThreads support) (#4) * Replaced HolyThread with DispatchThread * Dividing Ndb objects between worker threads * FIX: Enabled sending back message to in WorkerThread * Added ping command * Add redis-server to dependencies * Bumped number of connections --- .devcontainer/devcontainer.json | 2 +- pink/examples/binlog_parser_test.cc | 4 +- pink/examples/mydispatch_srv.cc | 148 +++++++++++-------- pink/examples/myredis_srv.cc | 204 +++++++++++++++----------- pink/rondis/common.cc | 3 - pink/rondis/common.h | 6 +- pink/rondis/rondb.cc | 94 +++++++----- pink/rondis/rondb.h | 8 +- pink/rondis/rondis.cc | 80 ++++++++-- pink/rondis/string/commands.cc | 32 ++-- pink/rondis/string/commands.h | 12 +- pink/rondis/string/db_interactions.cc | 16 +- pink/src/dispatch_thread.h | 15 +- pink/src/worker_thread.cc | 2 +- 14 files changed, 379 insertions(+), 247 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index e40357e5..e379fc9a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -26,7 +26,7 @@ } } }, - "postCreateCommand": "sudo apt-get update && sudo apt-get install -y build-essential libprotobuf-dev protobuf-compiler", + "postCreateCommand": "sudo apt-get update && sudo apt-get install -y build-essential redis-server libprotobuf-dev protobuf-compiler", "containerEnv": { "RONDB_PATH": "/workspaces/pink/rondb-22.10.5-linux-glibc2.28-arm64_v8" // Do this manually: diff --git a/pink/examples/binlog_parser_test.cc b/pink/examples/binlog_parser_test.cc index 60d44558..50bb9059 100644 --- a/pink/examples/binlog_parser_test.cc +++ b/pink/examples/binlog_parser_test.cc @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) { std::string one_command = "*3\r\n$3\r\nSET\r\n$1\r\na\r\n$2\r\nab\r\n"; std::string binlog_body; - slash::PutFixed16(&binlog_body, 1); // type + // slash::PutFixed16(&binlog_body, 1); // type slash::PutFixed32(&binlog_body, 0); //exec_time slash::PutFixed32(&binlog_body, 10); // server_id slash::PutFixed64(&binlog_body, 0); // logic_id @@ -42,7 +42,7 @@ int main(int argc, char* argv[]) { binlog_body.append(one_command); std::string header; - slash::PutFixed16(&header, 2); + // slash::PutFixed16(&header, 2); slash::PutFixed32(&header, binlog_body.size()); std::string command = header + binlog_body; diff --git a/pink/examples/mydispatch_srv.cc b/pink/examples/mydispatch_srv.cc index 27144da4..f46390d5 100644 --- a/pink/examples/mydispatch_srv.cc +++ b/pink/examples/mydispatch_srv.cc @@ -16,84 +16,104 @@ using namespace pink; -class MyConn: public PbConn { - public: - MyConn(int fd, const std::string& ip_port, Thread *thread, - void* worker_specific_data); - virtual ~MyConn(); - protected: - virtual int DealMessage(); - - private: - myproto::Ping ping_; - myproto::PingRes ping_res_; +class MyConn : public PbConn +{ +public: + MyConn( + int fd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data); + virtual ~MyConn(); + +protected: + virtual int DealMessage(); + +private: + myproto::Ping ping_; + myproto::PingRes ping_res_; }; -MyConn::MyConn(int fd, const std::string& ip_port, Thread *thread, - void* worker_specific_data) - : PbConn(fd, ip_port, thread) { - // Handle worker_specific_data ... +MyConn::MyConn( + int fd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data) + : PbConn(fd, ip_port, thread) +{ + // Handle worker_specific_data ... } -MyConn::~MyConn() { +MyConn::~MyConn() +{ } -int MyConn::DealMessage() { - printf("In the myconn DealMessage branch\n"); - ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); - ping_res_.Clear(); - ping_res_.set_res(11234); - ping_res_.set_mess("heiheidfdfdf"); - printf ("DealMessage receive (%s)\n", ping_res_.mess().c_str()); - std::string res; - ping_res_.SerializeToString(&res); - WriteResp(res); - return 0; +int MyConn::DealMessage() +{ + printf("In the myconn DealMessage branch\n"); + ping_.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + ping_res_.Clear(); + ping_res_.set_res(11234); + ping_res_.set_mess("heiheidfdfdf"); + printf("DealMessage receive (%s)\n", ping_res_.mess().c_str()); + std::string res; + ping_res_.SerializeToString(&res); + WriteResp(res); + return 0; } -class MyConnFactory : public ConnFactory { - public: - virtual std::shared_ptr NewPinkConn(int connfd, const std::string &ip_port, - Thread *thread, - void* worker_specific_data, - PinkEpoll* pink_epoll) const { - return std::make_shared(connfd, ip_port, thread, worker_specific_data); - } +class MyConnFactory : public ConnFactory +{ +public: + virtual std::shared_ptr NewPinkConn( + int connfd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data, + PinkEpoll *pink_epoll) const + { + return std::make_shared(connfd, ip_port, thread, worker_specific_data); + } }; static std::atomic running(false); -static void IntSigHandle(const int sig) { - printf("Catch Signal %d, cleanup...\n", sig); - running.store(false); - printf("server Exit"); +static void IntSigHandle(const int sig) +{ + printf("Catch Signal %d, cleanup...\n", sig); + running.store(false); + printf("server Exit"); } -static void SignalSetup() { - signal(SIGHUP, SIG_IGN); - signal(SIGPIPE, SIG_IGN); - signal(SIGINT, &IntSigHandle); - signal(SIGQUIT, &IntSigHandle); - signal(SIGTERM, &IntSigHandle); +static void SignalSetup() +{ + signal(SIGHUP, SIG_IGN); + signal(SIGPIPE, SIG_IGN); + signal(SIGINT, &IntSigHandle); + signal(SIGQUIT, &IntSigHandle); + signal(SIGTERM, &IntSigHandle); } -int main() { - SignalSetup(); - ConnFactory *my_conn_factory = new MyConnFactory(); - ServerThread *st = NewDispatchThread(9211, 10, my_conn_factory, 1000); - - if (st->StartThread() != 0) { - printf("StartThread error happened!\n"); - exit(-1); - } - running.store(true); - while (running.load()) { - sleep(1); - } - st->StopThread(); - - delete st; - delete my_conn_factory; - - return 0; +int main() +{ + SignalSetup(); + ConnFactory *my_conn_factory = new MyConnFactory(); + ServerThread *st = NewDispatchThread(9211, 10, my_conn_factory, 1000); + + if (st->StartThread() != 0) + { + printf("StartThread error happened!\n"); + exit(-1); + } + running.store(true); + while (running.load()) + { + sleep(1); + } + st->StopThread(); + + delete st; + delete my_conn_factory; + + return 0; } diff --git a/pink/examples/myredis_srv.cc b/pink/examples/myredis_srv.cc index 7a18916b..bb434516 100644 --- a/pink/examples/myredis_srv.cc +++ b/pink/examples/myredis_srv.cc @@ -14,106 +14,136 @@ using namespace pink; std::map db; - -class MyConn: public RedisConn { - public: - MyConn(int fd, const std::string& ip_port, Thread *thread, - void* worker_specific_data); - virtual ~MyConn() = default; - - protected: - int DealMessage(const RedisCmdArgsType& argv, std::string* response) override; - - private: +class MyConn : public RedisConn +{ +public: + MyConn( + int fd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data); + virtual ~MyConn() = default; + +protected: + int DealMessage(const RedisCmdArgsType &argv, std::string *response) override; + +private: }; -MyConn::MyConn(int fd, const std::string& ip_port, - Thread *thread, void* worker_specific_data) - : RedisConn(fd, ip_port, thread) { - // Handle worker_specific_data ... +MyConn::MyConn( + int fd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data) + : RedisConn(fd, ip_port, thread) +{ + // Handle worker_specific_data ... } -int MyConn::DealMessage(const RedisCmdArgsType& argv, std::string* response) { - printf("Get redis message "); - for (int i = 0; i < argv.size(); i++) { - printf("%s ", argv[i].c_str()); - } - printf("\n"); - - std::string val = "result"; - std::string res; - // set command - if (argv.size() == 3) { - response->append("+OK\r\n"); - db[argv[1]] = argv[2]; - } else if (argv.size() == 2) { - std::map::iterator iter = db.find(argv[1]); - if (iter != db.end()) { - const std::string& val = iter->second; - response->append("*1\r\n$"); - response->append(std::to_string(val.length())); - response->append("\r\n"); - response->append(val); - response->append("\r\n"); - } else { - response->append("$-1\r\n"); +int MyConn::DealMessage(const RedisCmdArgsType &argv, std::string *response) +{ + printf("Get redis message "); + for (int i = 0; i < argv.size(); i++) + { + printf("%s ", argv[i].c_str()); + } + printf("\n"); + + std::string val = "result"; + std::string res; + // set command + if (argv.size() == 3) + { + response->append("+OK\r\n"); + db[argv[1]] = argv[2]; } - } else { - response->append("+OK\r\n"); - } - return 0; + else if (argv.size() == 2) + { + std::map::iterator iter = db.find(argv[1]); + if (iter != db.end()) + { + const std::string &val = iter->second; + response->append("*1\r\n$"); + response->append(std::to_string(val.length())); + response->append("\r\n"); + response->append(val); + response->append("\r\n"); + } + else + { + response->append("$-1\r\n"); + } + } + else + { + response->append("+OK\r\n"); + } + return 0; } -class MyConnFactory : public ConnFactory { - public: - virtual std::shared_ptr NewPinkConn(int connfd, const std::string &ip_port, - Thread *thread, - void* worker_specific_data, pink::PinkEpoll* pink_epoll=nullptr) const { - return std::make_shared(connfd, ip_port, thread, worker_specific_data); - } +class MyConnFactory : public ConnFactory +{ +public: + virtual std::shared_ptr NewPinkConn( + int connfd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data, + pink::PinkEpoll *pink_epoll = nullptr) const + { + return std::make_shared(connfd, ip_port, thread, worker_specific_data); + } }; static std::atomic running(false); -static void IntSigHandle(const int sig) { - printf("Catch Signal %d, cleanup...\n", sig); - running.store(false); - printf("server Exit"); +static void IntSigHandle(const int sig) +{ + printf("Catch Signal %d, cleanup...\n", sig); + running.store(false); + printf("server Exit"); } -static void SignalSetup() { - signal(SIGHUP, SIG_IGN); - signal(SIGPIPE, SIG_IGN); - signal(SIGINT, &IntSigHandle); - signal(SIGQUIT, &IntSigHandle); - signal(SIGTERM, &IntSigHandle); +static void SignalSetup() +{ + signal(SIGHUP, SIG_IGN); + signal(SIGPIPE, SIG_IGN); + signal(SIGINT, &IntSigHandle); + signal(SIGQUIT, &IntSigHandle); + signal(SIGTERM, &IntSigHandle); } -int main(int argc, char* argv[]) { - if (argc < 2) { - printf("server will listen to 6379\n"); - } else { - printf("server will listen to %d\n", atoi(argv[1])); - } - int my_port = (argc > 1) ? atoi(argv[1]) : 6379; - - SignalSetup(); - - ConnFactory *conn_factory = new MyConnFactory(); - - ServerThread* my_thread = new HolyThread(my_port, conn_factory, 1000, NULL, false); - if (my_thread->StartThread() != 0) { - printf("StartThread error happened!\n"); - exit(-1); - } - running.store(true); - while (running.load()) { - sleep(1); - } - my_thread->StopThread(); - - delete my_thread; - delete conn_factory; - - return 0; +int main(int argc, char *argv[]) +{ + if (argc < 2) + { + printf("server will listen to 6379\n"); + } + else + { + printf("server will listen to %d\n", atoi(argv[1])); + } + int my_port = (argc > 1) ? atoi(argv[1]) : 6379; + + SignalSetup(); + + ConnFactory *conn_factory = new MyConnFactory(); + + ServerThread *my_thread = new HolyThread(my_port, conn_factory, 1000, NULL, false); + if (my_thread->StartThread() != 0) + { + printf("StartThread error happened!\n"); + exit(-1); + } + running.store(true); + while (running.load()) + { + sleep(1); + } + my_thread->StopThread(); + + delete my_thread; + delete conn_factory; + + return 0; } diff --git a/pink/rondis/common.cc b/pink/rondis/common.cc index 651f6b44..46aae1ac 100644 --- a/pink/rondis/common.cc +++ b/pink/rondis/common.cc @@ -5,9 +5,6 @@ #include "common.h" -#define MAX_CONNECTIONS 1 -#define MAX_NDB_PER_CONNECTION 1 - #define FOREIGN_KEY_RESTRICT_ERROR 256 diff --git a/pink/rondis/common.h b/pink/rondis/common.h index 11b8c702..03dcd487 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -1,8 +1,7 @@ #include #include -#define MAX_CONNECTIONS 1 -#define MAX_NDB_PER_CONNECTION 1 +#define MAX_CONNECTIONS 4 #define REDIS_DB_NAME "redis" @@ -11,9 +10,6 @@ #define RONDB_INTERNAL_ERROR 2 #define READ_ERROR 626 -extern Ndb_cluster_connection *rondb_conn[MAX_CONNECTIONS]; -extern Ndb *rondb_ndb[MAX_CONNECTIONS][MAX_NDB_PER_CONNECTION]; - int execute_no_commit(NdbTransaction *trans, int &ret_code, bool allow_fail); int execute_commit(Ndb *ndb, NdbTransaction *trans, int &ret_code); int write_formatted(char *buffer, int bufferSize, const char *format, ...); diff --git a/pink/rondis/rondb.cc b/pink/rondis/rondb.cc index 814ad3d8..b9e71837 100644 --- a/pink/rondis/rondb.cc +++ b/pink/rondis/rondb.cc @@ -7,11 +7,18 @@ #include "string/table_definitions.h" #include "string/commands.h" -Ndb_cluster_connection *rondb_conn[MAX_CONNECTIONS]; -Ndb *rondb_ndb[MAX_CONNECTIONS][MAX_NDB_PER_CONNECTION]; - -int initialize_connections(const char *connect_string) +/* + Ndb objects are not thread-safe. Hence, each worker thread / RonDB connection should + have its own Ndb object. If we have more worker threads than cluster connections, we + can create multiple Ndb objects from a single cluster connection. + Essentially we want: + num worker threads == number Ndbs objects + whereby some cluster connections may have created more Ndb objects than others. +*/ +int initialize_ndb_objects(const char *connect_string, int num_ndb_objects) { + Ndb_cluster_connection *rondb_conn[MAX_CONNECTIONS]; + for (unsigned int i = 0; i < MAX_CONNECTIONS; i++) { rondb_conn[i] = new Ndb_cluster_connection(connect_string); @@ -27,38 +34,41 @@ int initialize_connections(const char *connect_string) return -1; } printf("RonDB data node connection nr. %d is ready\n", i); - for (unsigned int j = 0; j < MAX_NDB_PER_CONNECTION; j++) + } + + for (unsigned int j = 0; j < num_ndb_objects; j++) + { + int connection_num = j % MAX_CONNECTIONS; + Ndb *ndb = new Ndb(rondb_conn[connection_num], REDIS_DB_NAME); + if (ndb == nullptr) { - Ndb *ndb = new Ndb(rondb_conn[i], REDIS_DB_NAME); - if (ndb == nullptr) - { - printf("Failed creating Ndb object nr. %d for cluster connection %d\n", j, i); - return -1; - } - if (ndb->init() != 0) - { - printf("Failed initializing Ndb object nr. %d for cluster connection %d\n", j, i); - return -1; - } - printf("Successfully initialized Ndb object nr. %d for cluster connection %d\n", j, i); - rondb_ndb[i][j] = ndb; + printf("Failed creating Ndb object nr. %d for cluster connection %d\n", j, connection_num); + return -1; } + if (ndb->init() != 0) + { + printf("Failed initializing Ndb object nr. %d for cluster connection %d\n", j, connection_num); + return -1; + } + printf("Successfully initialized Ndb object nr. %d for cluster connection %d\n", j, connection_num); + ndb_objects[j] = ndb; } + return 0; } -int setup_rondb(const char *connect_string) +int setup_rondb(const char *connect_string, int num_ndb_objects) { // Creating static thread-safe Ndb objects for all connections ndb_init(); - int res = initialize_connections(connect_string); + int res = initialize_ndb_objects(connect_string, num_ndb_objects); if (res != 0) { return res; } - Ndb *ndb = rondb_ndb[0][0]; + Ndb *ndb = ndb_objects[0]; NdbDictionary::Dictionary *dict = ndb->getDictionary(); if (init_string_records(dict) != 0) @@ -78,30 +88,44 @@ void rondb_end() int rondb_redis_handler(const pink::RedisCmdArgsType &argv, std::string *response, - int fd) + int worker_id) { - if (argv[0] == "GET") + // First check non-ndb commands + if (argv[0] == "ping") { - if (argv.size() != 2) + if (argv.size() != 1) { - printf("Invalid number of arguments for GET command\n"); + printf("Invalid number of arguments for ping command\n"); return -1; } - rondb_get_command(argv, response, fd); + response->append("+OK\r\n"); } - else if (argv[0] == "SET") + else { - if (argv.size() != 3) + Ndb *ndb = ndb_objects[worker_id]; + if (argv[0] == "GET") + { + if (argv.size() != 2) + { + printf("Invalid number of arguments for GET command\n"); + return -1; + } + rondb_get_command(ndb, argv, response); + } + else if (argv[0] == "SET") { - printf("Invalid number of arguments for SET command\n"); + if (argv.size() != 3) + { + printf("Invalid number of arguments for SET command\n"); + return -1; + } + rondb_set_command(ndb, argv, response); + } + else + { + printf("Unsupported command\n"); return -1; } - rondb_set_command(argv, response, fd); - } - else - { - printf("Unsupported command\n"); - return -1; } return 0; } diff --git a/pink/rondis/rondb.h b/pink/rondis/rondb.h index f0376130..3bb78fd7 100644 --- a/pink/rondis/rondb.h +++ b/pink/rondis/rondb.h @@ -1,9 +1,13 @@ #include #include "pink/include/redis_conn.h" +#include +#include -int initialize_connections(const char *connect_string); +extern std::vector ndb_objects; -int setup_rondb(const char *connect_string); +int initialize_ndb_objects(const char *connect_string, int num_ndb_objects); + +int setup_rondb(const char *connect_string, int num_ndb_objects); void rondb_end(); diff --git a/pink/rondis/rondis.cc b/pink/rondis/rondis.cc index 5124ed97..17ea22e2 100644 --- a/pink/rondis/rondis.cc +++ b/pink/rondis/rondis.cc @@ -1,52 +1,92 @@ #include +#include +#include +#include +#include #include "pink/include/server_thread.h" #include "pink/include/pink_conn.h" #include "pink/include/redis_conn.h" #include "pink/include/pink_thread.h" +#include "pink/src/dispatch_thread.h" #include "rondb.h" +#include "common.h" using namespace pink; +std::vector ndb_objects; std::map db; +class RondisHandle : public ServerHandle +{ +public: + RondisHandle() : counter(0) {} + + /* + We define this so each connection knows from which worker thread it is + running from. This enables us to to distribute Ndb objects across + multiple worker threads. + */ + int CreateWorkerSpecificData(void **data) const override + { + std::lock_guard lock(mutex); + *data = new int(counter++); + return 0; + } + +private: + mutable std::mutex mutex; + mutable int counter; +}; + class RondisConn : public RedisConn { public: - RondisConn(int fd, const std::string &ip_port, Thread *thread, - void *worker_specific_data); + RondisConn( + int fd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data); virtual ~RondisConn() = default; protected: int DealMessage(const RedisCmdArgsType &argv, std::string *response) override; private: + int _worker_id; }; -RondisConn::RondisConn(int fd, const std::string &ip_port, - Thread *thread, void *worker_specific_data) +RondisConn::RondisConn( + int fd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data) : RedisConn(fd, ip_port, thread) { - // Handle worker_specific_data ... + int worker_id = *static_cast(worker_specific_data); + _worker_id = worker_id; } int RondisConn::DealMessage(const RedisCmdArgsType &argv, std::string *response) { - printf("Get redis message "); + printf("Received Redis message: "); for (int i = 0; i < argv.size(); i++) { printf("%s ", argv[i].c_str()); } printf("\n"); - return rondb_redis_handler(argv, response, 0); + return rondb_redis_handler(argv, response, _worker_id); } class RondisConnFactory : public ConnFactory { public: - virtual std::shared_ptr NewPinkConn(int connfd, const std::string &ip_port, - Thread *thread, - void *worker_specific_data, pink::PinkEpoll *pink_epoll = nullptr) const + virtual std::shared_ptr NewPinkConn( + int connfd, + const std::string &ip_port, + Thread *thread, + void *worker_specific_data, + pink::PinkEpoll *pink_epoll = nullptr) const { return std::make_shared(connfd, ip_port, thread, worker_specific_data); } @@ -74,19 +114,27 @@ int main(int argc, char *argv[]) { int port = 6379; char *connect_string = "localhost:13000"; - if (argc != 3) + int worker_threads = 1; + if (argc != 4) { - printf("Not receiving 2 arguments, just using defaults\n"); + printf("Not receiving 3 arguments, just using defaults\n"); } else { port = atoi(argv[1]); connect_string = argv[2]; + worker_threads = atoi(argv[3]); } printf("Server will listen to %d and connect to MGMd at %s\n", port, connect_string); - // TODO: Distribute resources across pink threads - if (setup_rondb(connect_string) != 0) + if (worker_threads < MAX_CONNECTIONS) { + printf("Number of worker threads must be at least %d, otherwise we are wasting resources\n", MAX_CONNECTIONS); + return -1; + } + + ndb_objects.resize(worker_threads); + + if (setup_rondb(connect_string, worker_threads) != 0) { printf("Failed to setup RonDB environment\n"); return -1; @@ -95,7 +143,9 @@ int main(int argc, char *argv[]) ConnFactory *conn_factory = new RondisConnFactory(); - ServerThread *my_thread = NewHolyThread(port, conn_factory, 1000); + RondisHandle *handle = new RondisHandle(); + + ServerThread *my_thread = NewDispatchThread(port, worker_threads, conn_factory, 1000, 1000, handle); if (my_thread->StartThread() != 0) { printf("StartThread error happened!\n"); diff --git a/pink/rondis/string/commands.cc b/pink/rondis/string/commands.cc index c76eeee5..e0136296 100644 --- a/pink/rondis/string/commands.cc +++ b/pink/rondis/string/commands.cc @@ -9,9 +9,9 @@ #include "../common.h" #include "table_definitions.h" -void rondb_get_command(const pink::RedisCmdArgsType &argv, - std::string *response, - int fd) +void rondb_get_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) { const char *key_str = argv[1].c_str(); Uint32 key_len = argv[1].size(); @@ -21,7 +21,6 @@ void rondb_get_command(const pink::RedisCmdArgsType &argv, return; } - Ndb *ndb = rondb_ndb[0][0]; const NdbDictionary::Dictionary *dict = ndb->getDictionary(); const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME); if (tab == nullptr) @@ -59,28 +58,37 @@ void rondb_get_command(const pink::RedisCmdArgsType &argv, } } -void rondb_set_command(const pink::RedisCmdArgsType &argv, - std::string *response, - int fd) +void rondb_set_command( + Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) { - Ndb *ndb = rondb_ndb[0][0]; - const char *key_str = argv[1].c_str(); Uint32 key_len = argv[1].size(); - const char *value_str = argv[2].c_str(); - Uint32 value_len = argv[2].size(); if (key_len > MAX_KEY_VALUE_LEN) { failed_large_key(response); return; } + + const char *key_str = argv[1].c_str(); + const char *value_str = argv[2].c_str(); + Uint32 value_len = argv[2].size(); + const NdbDictionary::Dictionary *dict = ndb->getDictionary(); + if (dict == nullptr) + { + append_response(response, + "RonDB Error: Failed to get Ndb dictionary:", + dict->getNdbError().code); + return; + } const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME); if (tab == nullptr) { failed_create_table(response, dict->getNdbError().code); return; } - printf("Kilroy came here III\n"); + NdbTransaction *trans = ndb->startTransaction(tab, key_str, key_len); if (trans == nullptr) { diff --git a/pink/rondis/string/commands.h b/pink/rondis/string/commands.h index 49d2f312..e66e25a9 100644 --- a/pink/rondis/string/commands.h +++ b/pink/rondis/string/commands.h @@ -11,10 +11,10 @@ All STRING commands: https://redis.io/docs/latest/commands/?group=string */ -void rondb_get_command(const pink::RedisCmdArgsType &argv, - std::string *response, - int fd); +void rondb_get_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response); -void rondb_set_command(const pink::RedisCmdArgsType &argv, - std::string *response, - int fd); +void rondb_set_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response); diff --git a/pink/rondis/string/db_interactions.cc b/pink/rondis/string/db_interactions.cc index f25a931e..e50e16af 100644 --- a/pink/rondis/string/db_interactions.cc +++ b/pink/rondis/string/db_interactions.cc @@ -23,7 +23,7 @@ int create_key_row(std::string *response, const char *value_str, Uint32 value_len, Uint32 field_rows, - Uint32 value_rows, + Uint32 num_value_rows, Uint32 row_state, char *buf) { @@ -50,7 +50,7 @@ int create_key_row(std::string *response, write_op->setValue(KEY_TABLE_COL_rondb_key, key_id); } write_op->setValue(KEY_TABLE_COL_tot_value_len, value_len); - write_op->setValue(KEY_TABLE_COL_num_rows, value_rows); + write_op->setValue(KEY_TABLE_COL_num_rows, num_value_rows); write_op->setValue(KEY_TABLE_COL_value_data_type, row_state); write_op->setValue(KEY_TABLE_COL_expiry_date, 0); @@ -73,7 +73,7 @@ int create_key_row(std::string *response, } { int ret_code = 0; - if (((value_rows == 0) && + if (((num_value_rows == 0) && (execute_commit(ndb, trans, ret_code) == 0)) || (execute_no_commit(trans, ret_code, true) == 0)) { @@ -98,7 +98,7 @@ int create_key_row(std::string *response, * After deleting the key row we are now ready to insert the * key row. */ - if (value_rows == 0) + if (num_value_rows == 0) { ndb->closeTransaction(trans); ndb->startTransaction(tab, key_str, key_len); @@ -148,7 +148,7 @@ int create_key_row(std::string *response, insert_op->insertTuple(); insert_op->equal(KEY_TABLE_COL_redis_key, buf); insert_op->setValue(KEY_TABLE_COL_tot_value_len, value_len); - insert_op->setValue("value_rows", value_rows); + insert_op->setValue(KEY_TABLE_COL_num_rows, num_value_rows); insert_op->setValue(KEY_TABLE_COL_value_data_type, row_state); insert_op->setValue(KEY_TABLE_COL_expiry_date, 0); { @@ -295,11 +295,9 @@ int get_simple_key_row(std::string *response, response->append(buf); response->append((const char *)&key_row->value_start[2], key_row->tot_value_len); response->append("\r\n"); - printf("Respond with len: %d, %u tot_value_len, string: %s, string_len: %u\n", - len, + printf("Respond with tot_value_len: %u, string: %s\n", key_row->tot_value_len, - response->c_str(), - Uint32(response->length())); + (const char *)&key_row->value_start[2], key_row->tot_value_len); ndb->closeTransaction(trans); return 0; } diff --git a/pink/src/dispatch_thread.h b/pink/src/dispatch_thread.h index e0dc79c1..b2084972 100644 --- a/pink/src/dispatch_thread.h +++ b/pink/src/dispatch_thread.h @@ -25,17 +25,22 @@ class WorkerThread; class DispatchThread : public ServerThread { public: DispatchThread(int port, - int work_num, ConnFactory* conn_factory, + int work_num, + ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle); - DispatchThread(const std::string &ip, int port, - int work_num, ConnFactory* conn_factory, + DispatchThread(const std::string &ip, + int port, + int work_num, + ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle); - DispatchThread(const std::set& ips, int port, - int work_num, ConnFactory* conn_factory, + DispatchThread(const std::set& ips, + int port, + int work_num, + ConnFactory* conn_factory, int cron_interval, int queue_limit, const ServerHandle* handle); diff --git a/pink/src/worker_thread.cc b/pink/src/worker_thread.cc index c393a8f0..bf9555b3 100644 --- a/pink/src/worker_thread.cc +++ b/pink/src/worker_thread.cc @@ -199,7 +199,7 @@ void *WorkerThread::ThreadMain() { ReadStatus read_status = in_conn->GetRequest(); in_conn->set_last_interaction(now); if (read_status == kReadAll) { - pink_epoll_->PinkModEvent(pfe->fd, 0, 0); + pink_epoll_->PinkModEvent(pfe->fd, 0, EPOLLOUT); // Wait for the conn complete asynchronous task and // Mod Event to EPOLLOUT } else if (read_status == kReadHalf) {