From de2ced6c47f77bc858074ae003673300458bc5b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Tue, 5 Nov 2024 17:42:56 +0100 Subject: [PATCH 01/21] RONDB-765: First step to implementing INCR command --- pink/rondis/common.h | 4 + pink/rondis/rondb.cc | 13 ++ pink/rondis/rondb.h | 4 + pink/rondis/string/commands.cc | 189 +++++++++++++++---------- pink/rondis/string/commands.h | 10 +- pink/rondis/string/db_operations.cc | 12 +- pink/rondis/string/db_operations.h | 10 ++ pink/rondis/string/table_definitions.h | 3 + 8 files changed, 164 insertions(+), 81 deletions(-) diff --git a/pink/rondis/common.h b/pink/rondis/common.h index 3794af29..64c505cc 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -1,6 +1,9 @@ #include #include +#ifndef RONDIS_COMMON_H +#define RONDIS_COMMON_H + #define MAX_CONNECTIONS 2 #define REDIS_DB_NAME "redis" @@ -28,3 +31,4 @@ void assign_generic_err_to_response(std::string *response, const char *app_str); #define REDIS_WRONG_NUMBER_OF_ARGS "wrong number of arguments for '%s' command" #define REDIS_NO_SUCH_KEY "$-1\r\n" #define REDIS_KEY_TOO_LARGE "key is too large (3000 bytes max)" +#endif diff --git a/pink/rondis/rondb.cc b/pink/rondis/rondb.cc index b601e5fb..477a5b16 100644 --- a/pink/rondis/rondb.cc +++ b/pink/rondis/rondb.cc @@ -182,6 +182,19 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, assign_generic_err_to_response(response, error_message); } } + else if (argv[0] == "INCR") + { + if (argv.size() == 2) + { + rondb_incr_command(ndb, argv, response); + } + else + { + char error_message[256]; + snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str()); + assign_generic_err_to_response(response, error_message); + } + } else { unsupported_command(argv, response); diff --git a/pink/rondis/rondb.h b/pink/rondis/rondb.h index 3bb78fd7..63bdc33e 100644 --- a/pink/rondis/rondb.h +++ b/pink/rondis/rondb.h @@ -3,6 +3,9 @@ #include #include +#ifndef RONDIS_RONDB_H +#define RONDIS_RONDB_H + extern std::vector ndb_objects; int initialize_ndb_objects(const char *connect_string, int num_ndb_objects); @@ -14,3 +17,4 @@ void rondb_end(); int rondb_redis_handler(const pink::RedisCmdArgsType &argv, std::string *response, int fd); +#endif diff --git a/pink/rondis/string/commands.cc b/pink/rondis/string/commands.cc index 46120274..bdbcaaeb 100644 --- a/pink/rondis/string/commands.cc +++ b/pink/rondis/string/commands.cc @@ -6,9 +6,56 @@ #include #include "db_operations.h" +#include "commands.h" #include "../common.h" #include "table_definitions.h" +bool setup_transaction( + Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response, + struct key_table *key_row, + const char *key_str, + Uint32 key_len, + const NdbDictionary::Dictionary **ret_dict, + const NdbDictionary::Table **ret_tab, + NdbTransaction **ret_trans) +{ + if (key_len > MAX_KEY_VALUE_LEN) + { + assign_generic_err_to_response(response, REDIS_KEY_TOO_LARGE); + return false; + } + const NdbDictionary::Dictionary *dict = ndb->getDictionary(); + if (dict == nullptr) + { + assign_ndb_err_to_response(response, FAILED_GET_DICT, ndb->getNdbError()); + return false; + } + const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME); + if (tab == nullptr) + { + assign_ndb_err_to_response(response, FAILED_CREATE_TABLE_OBJECT, dict->getNdbError()); + return false; + } + memcpy(&key_row->redis_key[2], key_str, key_len); + set_length((char*)&key_row->redis_key[0], key_len); + NdbTransaction *trans = ndb->startTransaction(tab, + &key_row->redis_key[0], + key_len + 2); + if (trans == nullptr) + { + assign_ndb_err_to_response(response, + FAILED_CREATE_TXN_OBJECT, + ndb->getNdbError()); + return false; + } + *ret_tab = tab; + *ret_trans = trans; + *ret_dict = dict; + return true; +} + /* A successful GET will return in this format: $5 @@ -27,45 +74,22 @@ void rondb_get_command(Ndb *ndb, const pink::RedisCmdArgsType &argv, std::string *response) { - Uint32 key_len = argv[1].size(); - if (key_len > MAX_KEY_VALUE_LEN) - { - assign_generic_err_to_response(response, REDIS_KEY_TOO_LARGE); - return; - } - const char *key_str = argv[1].c_str(); - - const NdbDictionary::Dictionary *dict = ndb->getDictionary(); - const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME); - if (tab == nullptr) - { - assign_ndb_err_to_response(response, - FAILED_CREATE_TABLE_OBJECT, - dict->getNdbError()); - return; - } - + const NdbDictionary::Dictionary *dict; + const NdbDictionary::Table *tab = nullptr; + NdbTransaction *trans = nullptr; struct key_table key_row; - // varbinary -> first 2 bytes are length if bigger than 255 - // start copying from 3rd byte - memcpy(&key_row.redis_key[2], key_str, key_len); - // Length as little endian - Uint8 *ptr = (Uint8 *)&key_row.redis_key[0]; - ptr[0] = Uint8(key_len & 255); - ptr[1] = Uint8(key_len >> 8); - - // This is (usually) a local operation to calculate the correct data node, using the - // hash of the pk value. - NdbTransaction *trans = ndb->startTransaction(tab, - &(key_row.redis_key[0]), - key_len + 2); - if (trans == nullptr) - { - assign_ndb_err_to_response(response, - FAILED_CREATE_TXN_OBJECT, - ndb->getNdbError()); - return; - } + const char *key_str = argv[1].c_str(); + Uint32 key_len = argv[1].size(); + if (!setup_transaction(ndb, + argv, + response, + &key_row, + key_str, + key_len, + &dict, + &tab, + &trans)) + return; int ret_code = get_simple_key_row( response, @@ -81,22 +105,11 @@ void rondb_get_command(Ndb *ndb, } printf("Getting %d value rows\n", key_row.num_rows); { - // For some reason this needs to be used from scratch - struct key_table key_row; - // varbinary -> first 2 bytes are length if bigger than 255 - // start copying from 3rd byte - memcpy(&key_row.redis_key[2], key_str, key_len); - // Length as little endian - Uint8 *ptr = (Uint8 *)&key_row.redis_key[0]; - ptr[0] = Uint8(key_len & 255); - ptr[1] = Uint8(key_len >> 8); - /* Our value uses value rows, so a more complex read is required. We're starting from scratch here since we'll use a shared lock on the key table this time we read from it. */ - NdbTransaction *trans = ndb->startTransaction(tab, &(key_row.redis_key[0]), key_len + 2); @@ -124,37 +137,25 @@ void rondb_set_command( const pink::RedisCmdArgsType &argv, std::string *response) { + const NdbDictionary::Dictionary *dict; + const NdbDictionary::Table *tab = nullptr; + NdbTransaction *trans = nullptr; + struct key_table key_row; + const char *key_str = argv[1].c_str(); Uint32 key_len = argv[1].size(); - if (key_len > MAX_KEY_VALUE_LEN) - { - assign_generic_err_to_response(response, REDIS_KEY_TOO_LARGE); - return; - } + if (!setup_transaction(ndb, + argv, + response, + &key_row, + key_str, + key_len, + &dict, + &tab, + &trans)) + return; - const char *key_str = argv[1].c_str(); const char *value_str = argv[2].c_str(); Uint32 value_len = argv[2].size(); - - const NdbDictionary::Dictionary *dict = ndb->getDictionary(); - if (dict == nullptr) - { - assign_ndb_err_to_response(response, FAILED_GET_DICT, ndb->getNdbError()); - return; - } - const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME); - if (tab == nullptr) - { - assign_ndb_err_to_response(response, FAILED_CREATE_TABLE_OBJECT, dict->getNdbError()); - return; - } - - NdbTransaction *trans = ndb->startTransaction(tab, key_str, key_len); - if (trans == nullptr) - { - assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError()); - return; - } - char varsize_param[EXTENSION_VALUE_LEN + 500]; Uint32 num_value_rows = 0; Uint64 rondb_key = 0; @@ -213,7 +214,9 @@ void rondb_set_command( is best done via a cascade delete. We do a delete & insert in a single transaction (plus writing the value rows). */ - trans = ndb->startTransaction(tab, key_str, key_len); + NdbTransaction *trans = ndb->startTransaction(tab, + &(key_row.redis_key[0]), + key_len + 2); if (trans == nullptr) { assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError()); @@ -246,7 +249,6 @@ void rondb_set_command( return; } printf("Inserting %d value rows\n", num_value_rows); - create_all_value_rows(response, ndb, dict, @@ -259,3 +261,34 @@ void rondb_set_command( ndb->closeTransaction(trans); return; } + +void rondb_incr_command( + Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + const NdbDictionary::Dictionary *dict; + const NdbDictionary::Table *tab = nullptr; + NdbTransaction *trans = nullptr; + struct key_table key_row; + const char *key_str = argv[1].c_str(); + Uint32 key_len = argv[1].size(); + if (!setup_transaction(ndb, + argv, + response, + &key_row, + key_str, + key_len, + &dict, + &tab, + &trans)) + return; + + incr_key_row(response, + ndb, + tab, + trans, + &key_row); + ndb->closeTransaction(trans); + return; +} diff --git a/pink/rondis/string/commands.h b/pink/rondis/string/commands.h index 4f79e254..4e97fd1e 100644 --- a/pink/rondis/string/commands.h +++ b/pink/rondis/string/commands.h @@ -4,9 +4,10 @@ #include "pink/include/redis_conn.h" #include #include - #include "db_operations.h" +#ifndef STRING_COMMANDS_H +#define STRING_COMMANDS_H /* All STRING commands: https://redis.io/docs/latest/commands/?group=string @@ -21,6 +22,8 @@ Most importantly, it writes Ndb error messages to the response string. This may however change in the future, since this causes redundancy. */ +void set_length(char* buf, Uint32 key_len); +Uint32 get_length(char* buf); void rondb_get_command(Ndb *ndb, const pink::RedisCmdArgsType &argv, @@ -29,3 +32,8 @@ void rondb_get_command(Ndb *ndb, void rondb_set_command(Ndb *ndb, const pink::RedisCmdArgsType &argv, std::string *response); + +void rondb_incr_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response); +#endif diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index 7129a24c..f4615866 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -14,13 +14,13 @@ NdbRecord *entire_key_record = nullptr; NdbRecord *pk_value_record = nullptr; NdbRecord *entire_value_record = nullptr; -static void set_length(char *buf, Uint32 key_len) { +void set_length(char *buf, Uint32 key_len) { Uint8 *ptr = (Uint8*)buf; ptr[0] = (Uint8)(key_len & 255); ptr[1] = (Uint8)(key_len >> 8); } -static Uint32 get_length(char *buf) { +Uint32 get_length(char *buf) { Uint8 *ptr = (Uint8*)buf; Uint8 low = ptr[0]; Uint8 high = ptr[1]; @@ -614,3 +614,11 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, } return 0; } + +void incr_key_row(std::string *response, + Ndb *ndb, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + struct key_table *key_row) { + return; +} diff --git a/pink/rondis/string/db_operations.h b/pink/rondis/string/db_operations.h index d48cda72..cc80d702 100644 --- a/pink/rondis/string/db_operations.h +++ b/pink/rondis/string/db_operations.h @@ -5,6 +5,9 @@ #include #include +#ifndef STRING_DB_OPERATIONS_H +#define STRING_DB_OPERATIONS_H + const Uint32 ROWS_PER_READ = 2; int create_key_row(std::string *response, @@ -122,3 +125,10 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, Uint64 &key_id, Ndb *ndb, std::string *response); + +void incr_key_row(std::string *response, + Ndb *ndb, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + struct key_table *key_row); +#endif diff --git a/pink/rondis/string/table_definitions.h b/pink/rondis/string/table_definitions.h index 030ecce5..357d0348 100644 --- a/pink/rondis/string/table_definitions.h +++ b/pink/rondis/string/table_definitions.h @@ -1,6 +1,8 @@ #include #include +#ifndef STRING_TABLE_DEFINITIONS_H +#define STRING_TABLE_DEFINITIONS_H /* NdbRecords are used for serialization. They map columns of a table to fields in a struct. For each table we interact with, we define: @@ -82,3 +84,4 @@ int init_record(NdbDictionary::Dictionary *dict, NdbRecord *&record); int init_string_records(NdbDictionary::Dictionary *dict); +#endif From ef7ff4f9c4122bf46c355ad849919e31a66ed5ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Tue, 5 Nov 2024 17:51:09 +0100 Subject: [PATCH 02/21] RONDB-766: Fix compiler warnings --- pink/Makefile | 6 +----- pink/include/redis_conn.h | 6 +++--- pink/rondis/rondis.cc | 2 +- pink/src/holy_thread.cc | 4 ++-- pink/src/pb_cli.cc | 2 +- pink/src/redis_cli.cc | 2 +- pink/src/worker_thread.cc | 4 ++-- 7 files changed, 11 insertions(+), 15 deletions(-) diff --git a/pink/Makefile b/pink/Makefile index 1b330ca9..bd82f9d3 100644 --- a/pink/Makefile +++ b/pink/Makefile @@ -63,11 +63,7 @@ ifeq ($(NO_PB),1) $(filter-out $(VERSION_CC) $(wildcard $(SRC_DIR)/pb_*), $(wildcard $(SRC_DIR)/*.cc)) endif - -ifndef SLASH_PATH - $(warning Warning: missing slash path, using default) - SLASH_PATH=$(CURDIR)/third/slash -endif +SLASH_PATH=$(CURDIR)/third/slash SLASH_INCLUDE_DIR=$(SLASH_PATH) SLASH_LIBRARY=$(SLASH_PATH)/slash/lib/libslash.a diff --git a/pink/include/redis_conn.h b/pink/include/redis_conn.h index 3f9012c9..23f57e0b 100644 --- a/pink/include/redis_conn.h +++ b/pink/include/redis_conn.h @@ -34,9 +34,9 @@ class RedisConn: public PinkConn { const int rbuf_max_len = REDIS_MAX_MESSAGE); virtual ~RedisConn(); - virtual ReadStatus GetRequest(); - virtual WriteStatus SendReply(); - virtual int WriteResp(const std::string& resp); + virtual ReadStatus GetRequest() override; + virtual WriteStatus SendReply() override; + virtual int WriteResp(const std::string& resp) override; void TryResizeBuffer() override; void SetHandleType(const HandleType& handle_type); diff --git a/pink/rondis/rondis.cc b/pink/rondis/rondis.cc index 8916d391..ae1ac1f7 100644 --- a/pink/rondis/rondis.cc +++ b/pink/rondis/rondis.cc @@ -115,7 +115,7 @@ static void SignalSetup() int main(int argc, char *argv[]) { int port = 6379; - char *connect_string = "localhost:13000"; + const char *connect_string = "localhost:13000"; int worker_threads = 2; if (argc != 4) { diff --git a/pink/src/holy_thread.cc b/pink/src/holy_thread.cc index 99306b99..83a13d07 100644 --- a/pink/src/holy_thread.cc +++ b/pink/src/holy_thread.cc @@ -241,10 +241,10 @@ void HolyThread::DoCronTask() { ++iter; } } - for (const auto conn : to_close) { + for (const auto & conn : to_close) { CloseFd(conn); } - for (const auto conn : to_timeout) { + for (const auto & conn : to_timeout) { CloseFd(conn); handle_->FdTimeoutHandle(conn->fd(), conn->ip_port()); } diff --git a/pink/src/pb_cli.cc b/pink/src/pb_cli.cc index ac1d72fa..fcd92703 100644 --- a/pink/src/pb_cli.cc +++ b/pink/src/pb_cli.cc @@ -51,7 +51,7 @@ Status PbCli::Send(void *msg) { google::protobuf::Message *req = reinterpret_cast(msg); - int wbuf_len = req->ByteSize(); + int wbuf_len = req->ByteSizeLong(); req->SerializeToArray(wbuf_ + kCommandHeaderLength, wbuf_len); uint32_t len = htonl(wbuf_len); memcpy(wbuf_, &len, sizeof(uint32_t)); diff --git a/pink/src/redis_cli.cc b/pink/src/redis_cli.cc index df33c4cd..b344c7ff 100644 --- a/pink/src/redis_cli.cc +++ b/pink/src/redis_cli.cc @@ -534,7 +534,7 @@ int redisvFormatCommand(std::string *cmd, const char *format, va_list ap) { memcpy(_format, c, _l); _format[_l] = '\0'; - int n = vsnprintf(buf, REDIS_MAX_MESSAGE, _format, _cpy); + int n = vsnprintf(buf, sizeof(buf), _format, _cpy); curarg.append(buf, n); /* Update current position (note: outer blocks diff --git a/pink/src/worker_thread.cc b/pink/src/worker_thread.cc index 66cb4f1e..442849a9 100644 --- a/pink/src/worker_thread.cc +++ b/pink/src/worker_thread.cc @@ -275,10 +275,10 @@ void WorkerThread::DoCronTask() { ++iter; } } - for (const auto conn : to_close) { + for (const auto & conn : to_close) { CloseFd(conn); } - for (const auto conn : to_timeout) { + for (const auto & conn : to_timeout) { CloseFd(conn); server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port()); } From b78513e7f1e9dfc5ca4a38cb889b3fda7b3f3216 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Wed, 6 Nov 2024 18:24:31 +0100 Subject: [PATCH 03/21] RONDB-761: First functional INCR command --- pink/rondis/Makefile | 5 +- pink/rondis/common.h | 1 + pink/rondis/string/db_operations.cc | 155 +++++++++++++++++++++++- pink/rondis/string/table_definitions.cc | 3 +- pink/rondis/string/table_definitions.h | 1 + 5 files changed, 158 insertions(+), 7 deletions(-) diff --git a/pink/rondis/Makefile b/pink/rondis/Makefile index 64fbccca..43fb315d 100644 --- a/pink/rondis/Makefile +++ b/pink/rondis/Makefile @@ -14,10 +14,7 @@ endif all: rondis -ifndef PINK_PATH - $(warning Warning: missing pink path, using default) - PINK_PATH=$(CURDIR)/../.. -endif +PINK_PATH=$(CURDIR)/../.. PINK_INCLUDE_DIR=$(PINK_PATH) PINK_LIBRARY=$(PINK_PATH)/pink/lib/libpink.a diff --git a/pink/rondis/common.h b/pink/rondis/common.h index 64c505cc..d03759d3 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -23,6 +23,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str); #define FAILED_CREATE_TXN_OBJECT "Failed to create transaction object" #define FAILED_EXEC_TXN "Failed to execute transaction" #define FAILED_READ_KEY "Failed to read key" +#define FAILED_INCR_KEY "Failed to increment key" #define FAILED_GET_OP "Failed to get NdbOperation object" #define FAILED_DEFINE_OP "Failed to define RonDB operation" diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index f4615866..a7b00e7c 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -615,10 +615,163 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, return 0; } +#define RONDB_INSERT 2 +#define RONDB_UPDATE 1 +#define REG0 0 +#define REG1 1 +#define REG2 2 +#define REG3 3 +#define REG4 4 +#define REG5 5 +#define REG6 6 +#define REG7 7 +#define LABEL0 0 void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, struct key_table *key_row) { - return; + + const NdbDictionary::Column *value_start_col = tab->getColumn("value_start"); + const NdbDictionary::Column *tot_value_len_col = tab->getColumn("tot_value_len"); + + NdbOperation::OperationOptions opts; + std::memset(&opts, 0, sizeof(opts)); + /** + * The mask specifies which columns is to be updated after the interpreter + * has finished. The values are set in the key_row. + * We have 7 columns, we will update tot_value_len in interpreter, same with + * value_start. + * + * The rest, redis_key, rondb_key, value_data_type, num_rows and expiry_date + * are updated through final update. + */ + + const Uint32 mask = 0x57; + const unsigned char *mask_ptr = (const unsigned char *)&mask; + + // redis_key already set as this is the Primary key + key_row->null_bits = 1; // Set rondb_key to NULL, first NULL column + key_row->num_rows = 0; + key_row->value_data_type = 0; + key_row->expiry_date = 0; + + /* Define the interpreted program */ + Uint32 code_buffer[128]; + NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); + code.load_const_u16(REG0, 4); //Memory offset 0 + code.load_const_u16(REG6, 0); //Memory offset 0 + int ret_code = code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); //Inserts go to label 1 + + /** + * The first 4 bytes of the memory must be kept for the Attribute header + * REG0 Memory offset == 4 + * REG1 Memory offset == 6 + * REG2 Size of value_start + * REG3 Size of value_start without length bytes + * REG4 Old integer value after conversion + * REG5 New integer value after increment + * REG6 Memory offset == 0 + * REG7 not used + */ + /* UPDATE code */ + code.read_full(value_start_col, REG6, REG2); // Read value_start column + code.load_const_u16(REG1, 6);//Memory offset 2 + code.sub_const_reg(REG3, REG2, 2);//Subtract 2 from length + code.str_to_int64(REG4, REG1, REG3);//Convert string to number into register 6 + code.add_const_reg(REG5, REG4, 1); //New integer value in register 6 + code.int64_to_str(REG3, REG1, REG5);//Convert to string + code.add_const_reg(REG2, REG3, 2); //New value_start length + code.convert_size(REG3, REG0); //Write back length bytes in memory + + code.write_interpreter_output(REG5, 0); //Write into output index 0 + code.write_from_mem(value_start_col, REG6, REG2); // Write to column + code.write_attr(tot_value_len_col, REG3); + code.interpret_exit_ok(); + + /* INSERT code */ + code.def_label(LABEL0); + code.load_const_u16(REG5, 1); + code.load_const_u16(REG3, 1); + code.write_interpreter_output(REG5, 0); //Write into output index 0 + + Uint32 insert_value; + Uint8 *insert_value_ptr = (Uint8*)&insert_value; + insert_value_ptr[0] = 1; // Length is 1 + insert_value_ptr[1] = 0; // Second length byte is 0 + insert_value_ptr[2] = '1'; //Inserts a string '1' + insert_value_ptr[3] = 0; + + code.load_const_mem(REG0, REG2, 3, &insert_value);// Load to memory + code.write_from_mem(value_start_col, REG6, REG2); // Write to column + code.write_attr(tot_value_len_col, REG3); + code.interpret_exit_ok(); + + /* Program end, now compile code */ + ret_code = code.finalise(); + if (ret_code != 0) { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code.getNdbError()); + return; + } + + /* Prepare the interpreted program to be part of the write */ + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT; + opts.interpretedCode = &code; + + /** + * Prepare to get the final value of the Redis row after INCR is finished + * This is performed by the reading the pseudo column that is reading the + * output index written in interpreter program. + */ + NdbOperation::GetValueSpec getvals[1]; + getvals[0].appStorage = nullptr; + getvals[0].recAttr = nullptr; + getvals[0].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE; + opts.numExtraGetFinalValues = 1; + opts.extraGetFinalValues = getvals; + + /* Define the actual operation to be sent to RonDB data node. */ + const NdbOperation *op = trans->writeTuple( + pk_key_record, + (const char*)key_row, + entire_key_record, + (char*)key_row, + mask_ptr, + &opts, + sizeof(opts)); + if (op == nullptr) { + assign_ndb_err_to_response(response, + "Failed to create NdbOperation", + trans->getNdbError()); + return; + } + + /* Send to RonDB and execute the INCR operation */ + if (trans->execute(NdbTransaction::Commit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, + FAILED_INCR_KEY, + trans->getNdbError()); + return; + } + + /* Retrieve the returned new value as an Int64 value */ + NdbRecAttr *recAttr = getvals[0].recAttr; + Int64 new_incremented_value = recAttr->int64_value(); + + /* Send the return message to Redis client */ + char header_buf[20]; + int header_len = write_formatted(header_buf, + sizeof(header_buf), + ":%lld\r\n", + new_incremented_value); + response->assign(header_buf); + return; } diff --git a/pink/rondis/string/table_definitions.cc b/pink/rondis/string/table_definitions.cc index 6759abfb..20345363 100644 --- a/pink/rondis/string/table_definitions.cc +++ b/pink/rondis/string/table_definitions.cc @@ -55,8 +55,7 @@ int init_key_records(NdbDictionary::Dictionary *dict) } std::map> read_all_column_map = { - // TODO: Fix this one - // {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, + {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, {rondb_key_col, {offsetof(struct key_table, rondb_key), 0}}, {expiry_date_col, {offsetof(struct key_table, expiry_date), 1}}, {value_start_col, {offsetof(struct key_table, value_start), 0}}, diff --git a/pink/rondis/string/table_definitions.h b/pink/rondis/string/table_definitions.h index 357d0348..60280c92 100644 --- a/pink/rondis/string/table_definitions.h +++ b/pink/rondis/string/table_definitions.h @@ -37,6 +37,7 @@ extern NdbRecord *entire_key_record; struct key_table { + Uint32 null_bits; char redis_key[MAX_KEY_VALUE_LEN + 2]; Uint64 rondb_key; Uint32 expiry_date; From 51c54982cd7a349fb82bb6e5bd1bf9eed53b7bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 7 Nov 2024 18:44:04 +0100 Subject: [PATCH 04/21] RONDB-761: Improvements to INCR key 1. Make it possible to use dirty writes for higher concurrency with lower consistency 2. Ensure that we don't have any value rows before proceeding 3. Avoid setting NULL value on rondb_key, it will be set anyways (updated mask) --- pink/rondis/string/db_operations.cc | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index a7b00e7c..83caa1c7 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -626,14 +626,16 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, #define REG6 6 #define REG7 7 #define LABEL0 0 +#define LABEL1 1 void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, struct key_table *key_row) { - const NdbDictionary::Column *value_start_col = tab->getColumn("value_start"); - const NdbDictionary::Column *tot_value_len_col = tab->getColumn("tot_value_len"); + const NdbDictionary::Column *value_start_col = tab->getColumn(KEY_TABLE_COL_value_start); + const NdbDictionary::Column *tot_value_len_col = tab->getColumn(KEY_TABLE_COL_tot_value_len); + const NdbDictionary::Column *rondb_key_col = tab->getColumn(KEY_TABLE_COL_rondb_key); NdbOperation::OperationOptions opts; std::memset(&opts, 0, sizeof(opts)); @@ -647,7 +649,7 @@ void incr_key_row(std::string *response, * are updated through final update. */ - const Uint32 mask = 0x57; + const Uint32 mask = 0x55; const unsigned char *mask_ptr = (const unsigned char *)&mask; // redis_key already set as this is the Primary key @@ -662,7 +664,7 @@ void incr_key_row(std::string *response, code.load_const_u16(REG0, 4); //Memory offset 0 code.load_const_u16(REG6, 0); //Memory offset 0 int ret_code = code.load_op_type(REG1); // Read operation type into register 1 - code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); //Inserts go to label 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL1); //Inserts go to label 1 /** * The first 4 bytes of the memory must be kept for the Attribute header @@ -673,9 +675,13 @@ void incr_key_row(std::string *response, * REG4 Old integer value after conversion * REG5 New integer value after increment * REG6 Memory offset == 0 - * REG7 not used + * REG7 Value of rondb_key (should be NULL) */ /* UPDATE code */ + code.read_attr(REG7, rondb_key_col); + code.branch_eq_null(REG7, LABEL0); + code.interpret_exit_nok(); + code.def_label(LABEL0); code.read_full(value_start_col, REG6, REG2); // Read value_start column code.load_const_u16(REG1, 6);//Memory offset 2 code.sub_const_reg(REG3, REG2, 2);//Subtract 2 from length @@ -691,7 +697,7 @@ void incr_key_row(std::string *response, code.interpret_exit_ok(); /* INSERT code */ - code.def_label(LABEL0); + code.def_label(LABEL1); code.load_const_u16(REG5, 1); code.load_const_u16(REG3, 1); code.write_interpreter_output(REG5, 0); //Write into output index 0 @@ -735,6 +741,9 @@ void incr_key_row(std::string *response, opts.numExtraGetFinalValues = 1; opts.extraGetFinalValues = getvals; + if (1) + opts.optionsPresent |= NdbOperation::OperationOptions::OO_DIRTY_FLAG; + /* Define the actual operation to be sent to RonDB data node. */ const NdbOperation *op = trans->writeTuple( pk_key_record, From f32ad1b844ce7637f4944f7090392831ea82dd6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Fri, 8 Nov 2024 12:49:31 +0100 Subject: [PATCH 05/21] RONDB-761: Review comments --- pink/rondis/common.cc | 30 +---------- pink/rondis/common.h | 3 +- pink/rondis/string/db_operations.cc | 83 ++++++++++++++++++----------- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/pink/rondis/common.cc b/pink/rondis/common.cc index bc93430d..00b000b3 100644 --- a/pink/rondis/common.cc +++ b/pink/rondis/common.cc @@ -5,39 +5,13 @@ #include "common.h" -/** - * @brief Writes formatted data to a buffer. - * - * This function writes formatted data to the provided buffer using a format string - * and a variable number of arguments, similar to printf. It ensures that the - * formatted string does not exceed the specified buffer size. - * - * @param buffer A pointer to the buffer where the formatted string will be written. - * @param bufferSize The size of the buffer. - * @param format A format string that specifies how subsequent arguments are converted for output. - * @param ... Additional arguments specifying the data to be formatted. - * @return The number of characters written, excluding the null terminator. If the output - * is truncated due to the buffer size limit, the return value is the number of - * characters (excluding the null terminator) which would have been written if - * enough space had been available. - */ -int write_formatted(char *buffer, int bufferSize, const char *format, ...) -{ - int len = 0; - va_list arguments; - va_start(arguments, format); - len = vsnprintf(buffer, bufferSize, format, arguments); - va_end(arguments); - return len; -} - void assign_ndb_err_to_response( std::string *response, const char *app_str, NdbError error) { char buf[512]; - write_formatted(buf, sizeof(buf), "-ERR %s; NDB(%u) %s\r\n", app_str, error.code, error.message); + snprintf(buf, sizeof(buf), "-ERR %s; NDB(%u) %s\r\n", app_str, error.code, error.message); std::cout << buf; response->assign(buf); } @@ -47,7 +21,7 @@ void assign_generic_err_to_response( const char *app_str) { char buf[512]; - write_formatted(buf, sizeof(buf), "-ERR %s\r\n", app_str); + snprintf(buf, sizeof(buf), "-ERR %s\r\n", app_str); std::cout << buf; response->assign(buf); } diff --git a/pink/rondis/common.h b/pink/rondis/common.h index d03759d3..e48cd0dd 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -23,7 +23,8 @@ void assign_generic_err_to_response(std::string *response, const char *app_str); #define FAILED_CREATE_TXN_OBJECT "Failed to create transaction object" #define FAILED_EXEC_TXN "Failed to execute transaction" #define FAILED_READ_KEY "Failed to read key" -#define FAILED_INCR_KEY "Failed to increment key" +#define FAILED_INCR_KEY "Failed to increment key, multi-row value" +#define FAILED_INCR_KEY_MULTI_ROW "Failed to increment key, multi-row value" #define FAILED_GET_OP "Failed to get NdbOperation object" #define FAILED_DEFINE_OP "Failed to define RonDB operation" diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index 83caa1c7..d25c32f8 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -418,10 +418,10 @@ int get_simple_key_row(std::string *response, return 0; } char header_buf[20]; - int header_len = write_formatted(header_buf, - sizeof(header_buf), - "$%u\r\n", - key_row->tot_value_len); + int header_len = snprintf(header_buf, + sizeof(header_buf), + "$%u\r\n", + key_row->tot_value_len); // The total length of the expected response response->reserve(header_len + key_row->tot_value_len + 2); @@ -574,10 +574,10 @@ int get_complex_key_row(std::string *response, // Writing the Redis header to the response (indicating value length) char header_buf[20]; - int header_len = write_formatted(header_buf, - sizeof(header_buf), - "$%u\r\n", - key_row->tot_value_len); + int header_len = snprintf(header_buf, + sizeof(header_buf), + "$%u\r\n", + key_row->tot_value_len); response->reserve(header_len + key_row->tot_value_len + 2); response->append(header_buf); @@ -627,6 +627,20 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, #define REG7 7 #define LABEL0 0 #define LABEL1 1 + +#define INITIAL_INT_VALUE 1 +#define INITIAL_INT_STRING '1' +#define INITIAL_INT_STRING_LEN 1 +#define INITIAL_INT_STRING_LEN_WITH_LEN_BYTES 3 + +#define MEMORY_OFFSET_START 0 +#define MEMORY_OFFSET_LEN_BYTES 4 +#define MEMORY_OFFSET_STRING 6 +#define NUM_LEN_BYTES 2 +#define INCREMENT_VALUE 1 +#define OUTPUT_INDEX 0 +#define RONDB_KEY_NOT_NULL_ERROR 6000 + void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, @@ -661,9 +675,9 @@ void incr_key_row(std::string *response, /* Define the interpreted program */ Uint32 code_buffer[128]; NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); - code.load_const_u16(REG0, 4); //Memory offset 0 - code.load_const_u16(REG6, 0); //Memory offset 0 - int ret_code = code.load_op_type(REG1); // Read operation type into register 1 + code.load_const_u16(REG0, MEMORY_OFFSET_LEN_BYTES); + code.load_const_u16(REG6, MEMORY_OFFSET_START); + code.load_op_type(REG1); // Read operation type into register 1 code.branch_eq_const(REG1, RONDB_INSERT, LABEL1); //Inserts go to label 1 /** @@ -680,42 +694,45 @@ void incr_key_row(std::string *response, /* UPDATE code */ code.read_attr(REG7, rondb_key_col); code.branch_eq_null(REG7, LABEL0); - code.interpret_exit_nok(); + code.interpret_exit_nok(RONDB_KEY_NOT_NULL_ERROR); code.def_label(LABEL0); - code.read_full(value_start_col, REG6, REG2); // Read value_start column - code.load_const_u16(REG1, 6);//Memory offset 2 - code.sub_const_reg(REG3, REG2, 2);//Subtract 2 from length - code.str_to_int64(REG4, REG1, REG3);//Convert string to number into register 6 - code.add_const_reg(REG5, REG4, 1); //New integer value in register 6 - code.int64_to_str(REG3, REG1, REG5);//Convert to string - code.add_const_reg(REG2, REG3, 2); //New value_start length + code.read_full(value_start_col, REG6, REG2);// Read value_start column + code.load_const_u16(REG1, MEMORY_OFFSET_STRING); + code.sub_const_reg(REG3, REG2, NUM_LEN_BYTES); + code.str_to_int64(REG4, REG1, REG3);//Convert string to number + code.add_const_reg(REG5, REG4, INCREMENT_VALUE); + code.int64_to_str(REG3, REG1, REG5);//Convert number to string + code.add_const_reg(REG2, REG3, NUM_LEN_BYTES); //New value_start length code.convert_size(REG3, REG0); //Write back length bytes in memory - code.write_interpreter_output(REG5, 0); //Write into output index 0 + code.write_interpreter_output(REG5, OUTPUT_INDEX); //Write into output index 0 code.write_from_mem(value_start_col, REG6, REG2); // Write to column code.write_attr(tot_value_len_col, REG3); code.interpret_exit_ok(); /* INSERT code */ code.def_label(LABEL1); - code.load_const_u16(REG5, 1); - code.load_const_u16(REG3, 1); - code.write_interpreter_output(REG5, 0); //Write into output index 0 + code.load_const_u16(REG5, INITIAL_INT_VALUE); + code.load_const_u16(REG3, INITIAL_INT_STRING_LEN); + code.write_interpreter_output(REG5, OUTPUT_INDEX); //Write into output index 0 Uint32 insert_value; Uint8 *insert_value_ptr = (Uint8*)&insert_value; insert_value_ptr[0] = 1; // Length is 1 insert_value_ptr[1] = 0; // Second length byte is 0 - insert_value_ptr[2] = '1'; //Inserts a string '1' + insert_value_ptr[2] = INITIAL_INT_STRING; //Inserts a string '1' insert_value_ptr[3] = 0; - code.load_const_mem(REG0, REG2, 3, &insert_value);// Load to memory + code.load_const_mem(REG0, + REG2, + INITIAL_INT_STRING_LEN_WITH_LEN_BYTES, + &insert_value); code.write_from_mem(value_start_col, REG6, REG2); // Write to column code.write_attr(tot_value_len_col, REG3); code.interpret_exit_ok(); /* Program end, now compile code */ - ret_code = code.finalise(); + int ret_code = code.finalise(); if (ret_code != 0) { assign_ndb_err_to_response(response, "Failed to create Interpreted code", @@ -765,6 +782,12 @@ void incr_key_row(std::string *response, NdbOperation::AbortOnError) != 0 || trans->getNdbError().code != 0) { + if (trans->getNdbError().code == RONDB_KEY_NOT_NULL_ERROR) { + assign_ndb_err_to_response(response, + FAILED_INCR_KEY_MULTI_ROW, + trans->getNdbError()); + return; + } assign_ndb_err_to_response(response, FAILED_INCR_KEY, trans->getNdbError()); @@ -777,10 +800,10 @@ void incr_key_row(std::string *response, /* Send the return message to Redis client */ char header_buf[20]; - int header_len = write_formatted(header_buf, - sizeof(header_buf), - ":%lld\r\n", - new_incremented_value); + int header_len = snprintf(header_buf, + sizeof(header_buf), + ":%lld\r\n", + new_incremented_value); response->assign(header_buf); return; } From 97b69b7aa7d11e4124c60214412f05444d0a3ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Fri, 8 Nov 2024 13:24:40 +0100 Subject: [PATCH 06/21] RONDB-761: Fixed mistake --- pink/rondis/common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pink/rondis/common.h b/pink/rondis/common.h index e48cd0dd..8840296d 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -23,7 +23,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str); #define FAILED_CREATE_TXN_OBJECT "Failed to create transaction object" #define FAILED_EXEC_TXN "Failed to execute transaction" #define FAILED_READ_KEY "Failed to read key" -#define FAILED_INCR_KEY "Failed to increment key, multi-row value" +#define FAILED_INCR_KEY "Failed to increment key" #define FAILED_INCR_KEY_MULTI_ROW "Failed to increment key, multi-row value" #define FAILED_GET_OP "Failed to get NdbOperation object" #define FAILED_DEFINE_OP "Failed to define RonDB operation" From cedb196501caf43ef91ce83bfbf51c3795a9d6d0 Mon Sep 17 00:00:00 2001 From: Vincent Lohse Date: Fri, 8 Nov 2024 12:04:31 +0000 Subject: [PATCH 07/21] Autoformatting --- pink/rondis/string/db_operations.cc | 88 +++++++++++++++-------------- 1 file changed, 47 insertions(+), 41 deletions(-) diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index d25c32f8..a42f38b5 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -14,18 +14,20 @@ NdbRecord *entire_key_record = nullptr; NdbRecord *pk_value_record = nullptr; NdbRecord *entire_value_record = nullptr; -void set_length(char *buf, Uint32 key_len) { - Uint8 *ptr = (Uint8*)buf; - ptr[0] = (Uint8)(key_len & 255); - ptr[1] = (Uint8)(key_len >> 8); +void set_length(char *buf, Uint32 key_len) +{ + Uint8 *ptr = (Uint8 *)buf; + ptr[0] = (Uint8)(key_len & 255); + ptr[1] = (Uint8)(key_len >> 8); } -Uint32 get_length(char *buf) { - Uint8 *ptr = (Uint8*)buf; - Uint8 low = ptr[0]; - Uint8 high = ptr[1]; - Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high); - return len32; +Uint32 get_length(char *buf) +{ + Uint8 *ptr = (Uint8 *)buf; + Uint8 low = ptr[0]; + Uint8 high = ptr[1]; + Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high); + return len32; } int create_key_row(std::string *response, @@ -520,7 +522,7 @@ int read_batched_value_rows(std::string *response, for (Uint32 i = 0; i < num_rows_to_read; i++) { // Transfer char pointer to response's string - Uint32 row_value_len = get_length((char*)&value_rows->value[0]); + Uint32 row_value_len = get_length((char *)&value_rows->value[0]); response->append((const char *)&value_rows[i].value[2], row_value_len); } return 0; @@ -582,7 +584,7 @@ int get_complex_key_row(std::string *response, response->append(header_buf); // Append inline value to response - Uint32 inline_value_len = get_length((char*)&key_row->value_start[0]); + Uint32 inline_value_len = get_length((char *)&key_row->value_start[0]); response->append((const char *)&key_row->value_start[2], inline_value_len); int ret_code = get_value_rows(response, @@ -645,7 +647,8 @@ void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, - struct key_table *key_row) { + struct key_table *key_row) +{ const NdbDictionary::Column *value_start_col = tab->getColumn(KEY_TABLE_COL_value_start); const NdbDictionary::Column *tot_value_len_col = tab->getColumn(KEY_TABLE_COL_tot_value_len); @@ -677,8 +680,8 @@ void incr_key_row(std::string *response, NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); code.load_const_u16(REG0, MEMORY_OFFSET_LEN_BYTES); code.load_const_u16(REG6, MEMORY_OFFSET_START); - code.load_op_type(REG1); // Read operation type into register 1 - code.branch_eq_const(REG1, RONDB_INSERT, LABEL1); //Inserts go to label 1 + code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL1); // Inserts go to label 1 /** * The first 4 bytes of the memory must be kept for the Attribute header @@ -696,17 +699,17 @@ void incr_key_row(std::string *response, code.branch_eq_null(REG7, LABEL0); code.interpret_exit_nok(RONDB_KEY_NOT_NULL_ERROR); code.def_label(LABEL0); - code.read_full(value_start_col, REG6, REG2);// Read value_start column + code.read_full(value_start_col, REG6, REG2); // Read value_start column code.load_const_u16(REG1, MEMORY_OFFSET_STRING); code.sub_const_reg(REG3, REG2, NUM_LEN_BYTES); - code.str_to_int64(REG4, REG1, REG3);//Convert string to number + code.str_to_int64(REG4, REG1, REG3); // Convert string to number code.add_const_reg(REG5, REG4, INCREMENT_VALUE); - code.int64_to_str(REG3, REG1, REG5);//Convert number to string - code.add_const_reg(REG2, REG3, NUM_LEN_BYTES); //New value_start length - code.convert_size(REG3, REG0); //Write back length bytes in memory + code.int64_to_str(REG3, REG1, REG5); // Convert number to string + code.add_const_reg(REG2, REG3, NUM_LEN_BYTES); // New value_start length + code.convert_size(REG3, REG0); // Write back length bytes in memory - code.write_interpreter_output(REG5, OUTPUT_INDEX); //Write into output index 0 - code.write_from_mem(value_start_col, REG6, REG2); // Write to column + code.write_interpreter_output(REG5, OUTPUT_INDEX); // Write into output index 0 + code.write_from_mem(value_start_col, REG6, REG2); // Write to column code.write_attr(tot_value_len_col, REG3); code.interpret_exit_ok(); @@ -714,13 +717,13 @@ void incr_key_row(std::string *response, code.def_label(LABEL1); code.load_const_u16(REG5, INITIAL_INT_VALUE); code.load_const_u16(REG3, INITIAL_INT_STRING_LEN); - code.write_interpreter_output(REG5, OUTPUT_INDEX); //Write into output index 0 + code.write_interpreter_output(REG5, OUTPUT_INDEX); // Write into output index 0 Uint32 insert_value; - Uint8 *insert_value_ptr = (Uint8*)&insert_value; - insert_value_ptr[0] = 1; // Length is 1 - insert_value_ptr[1] = 0; // Second length byte is 0 - insert_value_ptr[2] = INITIAL_INT_STRING; //Inserts a string '1' + Uint8 *insert_value_ptr = (Uint8 *)&insert_value; + insert_value_ptr[0] = 1; // Length is 1 + insert_value_ptr[1] = 0; // Second length byte is 0 + insert_value_ptr[2] = INITIAL_INT_STRING; // Inserts a string '1' insert_value_ptr[3] = 0; code.load_const_mem(REG0, @@ -733,7 +736,8 @@ void incr_key_row(std::string *response, /* Program end, now compile code */ int ret_code = code.finalise(); - if (ret_code != 0) { + if (ret_code != 0) + { assign_ndb_err_to_response(response, "Failed to create Interpreted code", code.getNdbError()); @@ -759,18 +763,19 @@ void incr_key_row(std::string *response, opts.extraGetFinalValues = getvals; if (1) - opts.optionsPresent |= NdbOperation::OperationOptions::OO_DIRTY_FLAG; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_DIRTY_FLAG; /* Define the actual operation to be sent to RonDB data node. */ const NdbOperation *op = trans->writeTuple( - pk_key_record, - (const char*)key_row, - entire_key_record, - (char*)key_row, - mask_ptr, - &opts, - sizeof(opts)); - if (op == nullptr) { + pk_key_record, + (const char *)key_row, + entire_key_record, + (char *)key_row, + mask_ptr, + &opts, + sizeof(opts)); + if (op == nullptr) + { assign_ndb_err_to_response(response, "Failed to create NdbOperation", trans->getNdbError()); @@ -782,10 +787,11 @@ void incr_key_row(std::string *response, NdbOperation::AbortOnError) != 0 || trans->getNdbError().code != 0) { - if (trans->getNdbError().code == RONDB_KEY_NOT_NULL_ERROR) { - assign_ndb_err_to_response(response, - FAILED_INCR_KEY_MULTI_ROW, - trans->getNdbError()); + if (trans->getNdbError().code == RONDB_KEY_NOT_NULL_ERROR) + { + assign_ndb_err_to_response(response, + FAILED_INCR_KEY_MULTI_ROW, + trans->getNdbError()); return; } assign_ndb_err_to_response(response, From e4469e588679787ec16046f09c12c514611cc654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 16:41:45 +0100 Subject: [PATCH 08/21] Merge in fix of CI --- .devcontainer/devcontainer.json | 4 ++-- .github/workflows/build_test_push.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 7a7d2e42..d6274651 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -32,8 +32,8 @@ } }, "containerEnv": { - "RONDB_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8", + "RONDB_PATH": "/usr/src/app/rondb-24.10.0-linux-glibc2.28-arm64_v8", // This assumes that the RonDB tarball is placed the root of the workspace folder - "LD_LIBRARY_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8/lib" + "LD_LIBRARY_PATH": "/usr/src/app/rondb-24.10.0-linux-glibc2.28-arm64_v8/lib" } } \ No newline at end of file diff --git a/.github/workflows/build_test_push.yaml b/.github/workflows/build_test_push.yaml index 401f7dfa..902709d9 100644 --- a/.github/workflows/build_test_push.yaml +++ b/.github/workflows/build_test_push.yaml @@ -23,8 +23,8 @@ jobs: needs: [build-pink-macos] runs-on: ubuntu-latest env: - RONDB_VERSION: 22.10.5 - RONDB_TARBALL_URI: https://repo.hops.works/master/rondb-22.10.5-linux-glibc2.28-x86_64.tar.gz + RONDB_VERSION: 24.10.0 + RONDB_TARBALL_URI: https://repo.hops.works/master/rondb-24.10.0-linux-glibc2.28-x86_64.tar.gz RONDB_DOCKER_DIR: /tmp/rondb-docker RONDB_TARBALL_FILE: rondb.tar.gz RONDB_PATH: /tmp/rondb From b15826abd6965912080714323eb6762b1ddac11b Mon Sep 17 00:00:00 2001 From: Vincent Lohse Date: Fri, 8 Nov 2024 13:47:54 +0000 Subject: [PATCH 09/21] Added INCR to Bash tests --- pink/rondis/tests/get_set.sh | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pink/rondis/tests/get_set.sh b/pink/rondis/tests/get_set.sh index cf12b20f..3832b7bd 100755 --- a/pink/rondis/tests/get_set.sh +++ b/pink/rondis/tests/get_set.sh @@ -102,4 +102,32 @@ for i in {1..10000}; do echo "SET $KEY:piped_$i value_$i" done | redis-cli --pipe --verbose +incr_key="$key:incr$RANDOM" +incr_output=$(redis-cli INCR "$incr_key") +incr_result=$(redis-cli GET "$incr_key") +if [[ "$incr_result" == 1 ]]; then + echo "PASS: Incrementing non-existing key $incr_key " +else + echo "FAIL: Incrementing non-existing key $incr_key" + echo "Expected: 1" + echo "Received: $incr_result" + exit 1 +fi + +incr_start_value=$RANDOM +set_and_get "$incr_key" $incr_start_value +for i in {1..10}; do + incr_output=$(redis-cli INCR "$incr_key") + incr_result=$(redis-cli GET "$incr_key") + incr_expected_value=$((incr_start_value + i)) + if [[ "$incr_result" == $incr_expected_value ]]; then + echo "PASS: Incrementing key $incr_key to value $incr_result" + else + echo "FAIL: Incrementing key $incr_key from value $incr_start_value" + echo "Expected: $incr_expected_value" + echo "Received: $incr_result" + exit 1 + fi +done + echo "All tests completed." From f0cb24e711ca5a3172b9aa2e34c923c0f5d03718 Mon Sep 17 00:00:00 2001 From: Vincent Lohse Date: Fri, 8 Nov 2024 13:57:38 +0000 Subject: [PATCH 10/21] Moved interpreted code to separate file --- pink/rondis/string/db_operations.cc | 104 ++----------------------- pink/rondis/string/interpreted_code.cc | 87 +++++++++++++++++++++ pink/rondis/string/interpreted_code.h | 36 +++++++++ 3 files changed, 128 insertions(+), 99 deletions(-) create mode 100644 pink/rondis/string/interpreted_code.cc create mode 100644 pink/rondis/string/interpreted_code.h diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index a42f38b5..d45a29e5 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -8,6 +8,7 @@ #include "../common.h" #include "db_operations.h" #include "table_definitions.h" +#include "interpreted_code.h" NdbRecord *pk_key_record = nullptr; NdbRecord *entire_key_record = nullptr; @@ -617,45 +618,12 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, return 0; } -#define RONDB_INSERT 2 -#define RONDB_UPDATE 1 -#define REG0 0 -#define REG1 1 -#define REG2 2 -#define REG3 3 -#define REG4 4 -#define REG5 5 -#define REG6 6 -#define REG7 7 -#define LABEL0 0 -#define LABEL1 1 - -#define INITIAL_INT_VALUE 1 -#define INITIAL_INT_STRING '1' -#define INITIAL_INT_STRING_LEN 1 -#define INITIAL_INT_STRING_LEN_WITH_LEN_BYTES 3 - -#define MEMORY_OFFSET_START 0 -#define MEMORY_OFFSET_LEN_BYTES 4 -#define MEMORY_OFFSET_STRING 6 -#define NUM_LEN_BYTES 2 -#define INCREMENT_VALUE 1 -#define OUTPUT_INDEX 0 -#define RONDB_KEY_NOT_NULL_ERROR 6000 - void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, struct key_table *key_row) { - - const NdbDictionary::Column *value_start_col = tab->getColumn(KEY_TABLE_COL_value_start); - const NdbDictionary::Column *tot_value_len_col = tab->getColumn(KEY_TABLE_COL_tot_value_len); - const NdbDictionary::Column *rondb_key_col = tab->getColumn(KEY_TABLE_COL_rondb_key); - - NdbOperation::OperationOptions opts; - std::memset(&opts, 0, sizeof(opts)); /** * The mask specifies which columns is to be updated after the interpreter * has finished. The values are set in the key_row. @@ -675,76 +643,14 @@ void incr_key_row(std::string *response, key_row->value_data_type = 0; key_row->expiry_date = 0; - /* Define the interpreted program */ Uint32 code_buffer[128]; NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); - code.load_const_u16(REG0, MEMORY_OFFSET_LEN_BYTES); - code.load_const_u16(REG6, MEMORY_OFFSET_START); - code.load_op_type(REG1); // Read operation type into register 1 - code.branch_eq_const(REG1, RONDB_INSERT, LABEL1); // Inserts go to label 1 - - /** - * The first 4 bytes of the memory must be kept for the Attribute header - * REG0 Memory offset == 4 - * REG1 Memory offset == 6 - * REG2 Size of value_start - * REG3 Size of value_start without length bytes - * REG4 Old integer value after conversion - * REG5 New integer value after increment - * REG6 Memory offset == 0 - * REG7 Value of rondb_key (should be NULL) - */ - /* UPDATE code */ - code.read_attr(REG7, rondb_key_col); - code.branch_eq_null(REG7, LABEL0); - code.interpret_exit_nok(RONDB_KEY_NOT_NULL_ERROR); - code.def_label(LABEL0); - code.read_full(value_start_col, REG6, REG2); // Read value_start column - code.load_const_u16(REG1, MEMORY_OFFSET_STRING); - code.sub_const_reg(REG3, REG2, NUM_LEN_BYTES); - code.str_to_int64(REG4, REG1, REG3); // Convert string to number - code.add_const_reg(REG5, REG4, INCREMENT_VALUE); - code.int64_to_str(REG3, REG1, REG5); // Convert number to string - code.add_const_reg(REG2, REG3, NUM_LEN_BYTES); // New value_start length - code.convert_size(REG3, REG0); // Write back length bytes in memory - - code.write_interpreter_output(REG5, OUTPUT_INDEX); // Write into output index 0 - code.write_from_mem(value_start_col, REG6, REG2); // Write to column - code.write_attr(tot_value_len_col, REG3); - code.interpret_exit_ok(); - - /* INSERT code */ - code.def_label(LABEL1); - code.load_const_u16(REG5, INITIAL_INT_VALUE); - code.load_const_u16(REG3, INITIAL_INT_STRING_LEN); - code.write_interpreter_output(REG5, OUTPUT_INDEX); // Write into output index 0 - - Uint32 insert_value; - Uint8 *insert_value_ptr = (Uint8 *)&insert_value; - insert_value_ptr[0] = 1; // Length is 1 - insert_value_ptr[1] = 0; // Second length byte is 0 - insert_value_ptr[2] = INITIAL_INT_STRING; // Inserts a string '1' - insert_value_ptr[3] = 0; - - code.load_const_mem(REG0, - REG2, - INITIAL_INT_STRING_LEN_WITH_LEN_BYTES, - &insert_value); - code.write_from_mem(value_start_col, REG6, REG2); // Write to column - code.write_attr(tot_value_len_col, REG3); - code.interpret_exit_ok(); - - /* Program end, now compile code */ - int ret_code = code.finalise(); - if (ret_code != 0) - { - assign_ndb_err_to_response(response, - "Failed to create Interpreted code", - code.getNdbError()); + if (initNdbCodeIncr(response, &code, tab) != 0) return; - } - /* Prepare the interpreted program to be part of the write */ + // Prepare the interpreted program to be part of the write + NdbOperation::OperationOptions opts; + std::memset(&opts, 0, sizeof(opts)); opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED; opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT; opts.interpretedCode = &code; diff --git a/pink/rondis/string/interpreted_code.cc b/pink/rondis/string/interpreted_code.cc new file mode 100644 index 00000000..29c0e1d5 --- /dev/null +++ b/pink/rondis/string/interpreted_code.cc @@ -0,0 +1,87 @@ +#include +#include +#include +#include "pink/include/redis_conn.h" +#include +#include + +#include "../common.h" +#include "interpreted_code.h" +#include "db_operations.h" +#include "table_definitions.h" + +// Define the interpreted program for the INCR operation +int initNdbCodeIncr(std::string *response, + NdbInterpretedCode *code, + const NdbDictionary::Table *tab) +{ + const NdbDictionary::Column *value_start_col = tab->getColumn(KEY_TABLE_COL_value_start); + const NdbDictionary::Column *tot_value_len_col = tab->getColumn(KEY_TABLE_COL_tot_value_len); + const NdbDictionary::Column *rondb_key_col = tab->getColumn(KEY_TABLE_COL_rondb_key); + + code->load_const_u16(REG0, MEMORY_OFFSET_LEN_BYTES); + code->load_const_u16(REG6, MEMORY_OFFSET_START); + code->load_op_type(REG1); // Read operation type into register 1 + code->branch_eq_const(REG1, RONDB_INSERT, LABEL1); // Inserts go to label 1 + + /** + * The first 4 bytes of the memory must be kept for the Attribute header + * REG0 Memory offset == 4 + * REG1 Memory offset == 6 + * REG2 Size of value_start + * REG3 Size of value_start without length bytes + * REG4 Old integer value after conversion + * REG5 New integer value after increment + * REG6 Memory offset == 0 + * REG7 Value of rondb_key (should be NULL) + */ + /* UPDATE code */ + code->read_attr(REG7, rondb_key_col); + code->branch_eq_null(REG7, LABEL0); + code->interpret_exit_nok(RONDB_KEY_NOT_NULL_ERROR); + code->def_label(LABEL0); + code->read_full(value_start_col, REG6, REG2); // Read value_start column + code->load_const_u16(REG1, MEMORY_OFFSET_STRING); + code->sub_const_reg(REG3, REG2, NUM_LEN_BYTES); + code->str_to_int64(REG4, REG1, REG3); // Convert string to number + code->add_const_reg(REG5, REG4, INCREMENT_VALUE); + code->int64_to_str(REG3, REG1, REG5); // Convert number to string + code->add_const_reg(REG2, REG3, NUM_LEN_BYTES); // New value_start length + code->convert_size(REG3, REG0); // Write back length bytes in memory + + code->write_interpreter_output(REG5, OUTPUT_INDEX); // Write into output index 0 + code->write_from_mem(value_start_col, REG6, REG2); // Write to column + code->write_attr(tot_value_len_col, REG3); + code->interpret_exit_ok(); + + /* INSERT code */ + code->def_label(LABEL1); + code->load_const_u16(REG5, INITIAL_INT_VALUE); + code->load_const_u16(REG3, INITIAL_INT_STRING_LEN); + code->write_interpreter_output(REG5, OUTPUT_INDEX); // Write into output index 0 + + Uint32 insert_value; + Uint8 *insert_value_ptr = (Uint8 *)&insert_value; + insert_value_ptr[0] = 1; // Length is 1 + insert_value_ptr[1] = 0; // Second length byte is 0 + insert_value_ptr[2] = INITIAL_INT_STRING; // Inserts a string '1' + insert_value_ptr[3] = 0; + + code->load_const_mem(REG0, + REG2, + INITIAL_INT_STRING_LEN_WITH_LEN_BYTES, + &insert_value); + code->write_from_mem(value_start_col, REG6, REG2); // Write to column + code->write_attr(tot_value_len_col, REG3); + code->interpret_exit_ok(); + + // Program end, now compile code + int ret_code = code->finalise(); + if (ret_code != 0) + { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code->getNdbError()); + return; + } +} diff --git a/pink/rondis/string/interpreted_code.h b/pink/rondis/string/interpreted_code.h new file mode 100644 index 00000000..e8bb311e --- /dev/null +++ b/pink/rondis/string/interpreted_code.h @@ -0,0 +1,36 @@ +#include + +#ifndef STRING_INTERPRETED_CODE_H +#define STRING_INTERPRETED_CODE_H + +#define RONDB_INSERT 2 +#define RONDB_UPDATE 1 +#define REG0 0 +#define REG1 1 +#define REG2 2 +#define REG3 3 +#define REG4 4 +#define REG5 5 +#define REG6 6 +#define REG7 7 +#define LABEL0 0 +#define LABEL1 1 + +#define MEMORY_OFFSET_START 0 +#define MEMORY_OFFSET_LEN_BYTES 4 +#define MEMORY_OFFSET_STRING 6 +#define NUM_LEN_BYTES 2 +#define INCREMENT_VALUE 1 +#define OUTPUT_INDEX 0 +#define RONDB_KEY_NOT_NULL_ERROR 6000 + +#define INITIAL_INT_VALUE 1 +#define INITIAL_INT_STRING '1' +#define INITIAL_INT_STRING_LEN 1 +#define INITIAL_INT_STRING_LEN_WITH_LEN_BYTES 3 + +int initNdbCodeIncr(std::string *response, + NdbInterpretedCode *code, + const NdbDictionary::Table *tab); + +#endif From 58de4c87253a1f6a4917b896eb2e69f2e57bcf05 Mon Sep 17 00:00:00 2001 From: Vincent Lohse Date: Fri, 8 Nov 2024 14:10:49 +0000 Subject: [PATCH 11/21] Removed redundant headers --- pink/rondis/string/interpreted_code.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pink/rondis/string/interpreted_code.cc b/pink/rondis/string/interpreted_code.cc index 29c0e1d5..0339e551 100644 --- a/pink/rondis/string/interpreted_code.cc +++ b/pink/rondis/string/interpreted_code.cc @@ -1,13 +1,8 @@ -#include -#include -#include -#include "pink/include/redis_conn.h" #include #include #include "../common.h" #include "interpreted_code.h" -#include "db_operations.h" #include "table_definitions.h" // Define the interpreted program for the INCR operation From 3e5b1be350f526ad72b04ecf18d8811746ba63f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Fri, 8 Nov 2024 21:14:02 +0100 Subject: [PATCH 12/21] RONDB-761: Fixed breakout of interpreter code, case insensitive command name --- pink/rondis/Makefile | 2 +- pink/rondis/rondb.cc | 10 ++++++---- pink/rondis/string/interpreted_code.cc | 3 ++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pink/rondis/Makefile b/pink/rondis/Makefile index 43fb315d..4bcf8a9b 100644 --- a/pink/rondis/Makefile +++ b/pink/rondis/Makefile @@ -45,7 +45,7 @@ endif LDFLAGS := $(DEP_LIBS) $(LDFLAGS) # Use find to locate all .cc files in subdirectories -SOURCES = $(CURDIR)/rondis.cc $(CURDIR)/rondb.cc $(CURDIR)/common.cc $(CURDIR)/string/table_definitions.cc $(CURDIR)/string/commands.cc $(CURDIR)/string/db_operations.cc +SOURCES = $(CURDIR)/rondis.cc $(CURDIR)/rondb.cc $(CURDIR)/common.cc $(CURDIR)/string/table_definitions.cc $(CURDIR)/string/commands.cc $(CURDIR)/string/db_operations.cc $(CURDIR)/string/interpreted_code.cc OBJECTS = $(SOURCES:.cc=.o) # Target to build the executable "rondis" diff --git a/pink/rondis/rondb.cc b/pink/rondis/rondb.cc index 477a5b16..cd91defa 100644 --- a/pink/rondis/rondb.cc +++ b/pink/rondis/rondb.cc @@ -6,6 +6,7 @@ #include "common.h" #include "string/table_definitions.h" #include "string/commands.h" +#include /* Ndb objects are not thread-safe. Hence, each worker thread / RonDB connection should @@ -109,7 +110,8 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, int worker_id) { // First check non-ndb commands - if (argv[0] == "ping") + const char *command = argv[0].c_str(); + if (strcasecmp(command, "ping") == 0) { if (argv.size() != 1) { @@ -156,7 +158,7 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, else { Ndb *ndb = ndb_objects[worker_id]; - if (argv[0] == "GET") + if (strcasecmp(command, "GET") == 0) { if (argv.size() == 2) { @@ -169,7 +171,7 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, assign_generic_err_to_response(response, error_message); } } - else if (argv[0] == "SET") + else if (strcasecmp(command, "SET") == 0) { if (argv.size() == 3) { @@ -182,7 +184,7 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, assign_generic_err_to_response(response, error_message); } } - else if (argv[0] == "INCR") + else if (strcasecmp(command, "INCR") == 0) { if (argv.size() == 2) { diff --git a/pink/rondis/string/interpreted_code.cc b/pink/rondis/string/interpreted_code.cc index 0339e551..f880847d 100644 --- a/pink/rondis/string/interpreted_code.cc +++ b/pink/rondis/string/interpreted_code.cc @@ -77,6 +77,7 @@ int initNdbCodeIncr(std::string *response, assign_ndb_err_to_response(response, "Failed to create Interpreted code", code->getNdbError()); - return; + return -1; } + return 0; } From b27887e1e24d9aa17adf2245039d4aac9616e8ab Mon Sep 17 00:00:00 2001 From: Vincent Lohse Date: Tue, 12 Nov 2024 14:45:42 +0000 Subject: [PATCH 13/21] Removed commented-out code --- pink/rondis/tests/get_set.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pink/rondis/tests/get_set.sh b/pink/rondis/tests/get_set.sh index 3832b7bd..cf47a299 100755 --- a/pink/rondis/tests/get_set.sh +++ b/pink/rondis/tests/get_set.sh @@ -102,6 +102,10 @@ for i in {1..10000}; do echo "SET $KEY:piped_$i value_$i" done | redis-cli --pipe --verbose +echo "Testing edge case large key length (Redis allows up to 512MB for the value)..." +edge_value=$(head -c 100000 < /dev/zero | tr '\0' 'b') +set_and_get "$KEY:edge_large" "$edge_value" + incr_key="$key:incr$RANDOM" incr_output=$(redis-cli INCR "$incr_key") incr_result=$(redis-cli GET "$incr_key") From cb3d4a3f6ac80f3cd60d1f25c8a6bd6d035d3da8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Wed, 13 Nov 2024 20:45:12 +0100 Subject: [PATCH 14/21] RONDB-768: Added support for HSET, HGET, HINCR and rewrite of SET handling, removed FOREIGN KEY --- pink/rondis/common.cc | 16 + pink/rondis/common.h | 5 +- pink/rondis/rondb.cc | 42 +- pink/rondis/sql/HSET_key.sql | 7 + pink/rondis/sql/STRING_key.sql | 7 +- pink/rondis/sql/STRING_value.sql | 7 +- pink/rondis/string/commands.cc | 257 ++++++++---- pink/rondis/string/commands.h | 15 +- pink/rondis/string/db_operations.cc | 498 ++++++++++++------------ pink/rondis/string/db_operations.h | 74 ++-- pink/rondis/string/interpreted_code.cc | 150 +++++++ pink/rondis/string/interpreted_code.h | 11 + pink/rondis/string/table_definitions.cc | 53 ++- pink/rondis/string/table_definitions.h | 28 +- pink/rondis/tests/get_set.sh | 3 +- pink/rondis/tests/hget_hset.sh | 139 +++++++ 16 files changed, 935 insertions(+), 377 deletions(-) create mode 100644 pink/rondis/sql/HSET_key.sql create mode 100755 pink/rondis/tests/hget_hset.sh diff --git a/pink/rondis/common.cc b/pink/rondis/common.cc index 00b000b3..e5fe4b4a 100644 --- a/pink/rondis/common.cc +++ b/pink/rondis/common.cc @@ -25,3 +25,19 @@ void assign_generic_err_to_response( std::cout << buf; response->assign(buf); } + +void set_length(char *buf, Uint32 key_len) +{ + Uint8 *ptr = (Uint8 *)buf; + ptr[0] = (Uint8)(key_len & 255); + ptr[1] = (Uint8)(key_len >> 8); +} + +Uint32 get_length(char *buf) +{ + Uint8 *ptr = (Uint8 *)buf; + Uint8 low = ptr[0]; + Uint8 high = ptr[1]; + Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high); + return len32; +} diff --git a/pink/rondis/common.h b/pink/rondis/common.h index 8840296d..03d79284 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -8,7 +8,7 @@ #define REDIS_DB_NAME "redis" -#define FOREIGN_KEY_RESTRICT_ERROR 256 +#define RESTRICT_VALUE_ROWS_ERROR 6000 #define RONDB_INTERNAL_ERROR 2 #define READ_ERROR 626 @@ -16,6 +16,8 @@ int write_formatted(char *buffer, int bufferSize, const char *format, ...); void assign_ndb_err_to_response(std::string *response, const char *app_str, NdbError error); void assign_generic_err_to_response(std::string *response, const char *app_str); +void set_length(char* buf, Uint32 key_len); +Uint32 get_length(char* buf); // NDB API error messages #define FAILED_GET_DICT "Failed to get NdbDict" @@ -24,6 +26,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str); #define FAILED_EXEC_TXN "Failed to execute transaction" #define FAILED_READ_KEY "Failed to read key" #define FAILED_INCR_KEY "Failed to increment key" +#define FAILED_HSET_KEY "Failed to find key" #define FAILED_INCR_KEY_MULTI_ROW "Failed to increment key, multi-row value" #define FAILED_GET_OP "Failed to get NdbOperation object" #define FAILED_DEFINE_OP "Failed to define RonDB operation" diff --git a/pink/rondis/rondb.cc b/pink/rondis/rondb.cc index cd91defa..39b7f459 100644 --- a/pink/rondis/rondb.cc +++ b/pink/rondis/rondb.cc @@ -197,6 +197,45 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, assign_generic_err_to_response(response, error_message); } } + else if (strcasecmp(command, "HGET") == 0) + { + if (argv.size() == 3) + { + rondb_hget_command(ndb, argv, response); + } + else + { + char error_message[256]; + snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str()); + assign_generic_err_to_response(response, error_message); + } + } + else if (strcasecmp(command, "HSET") == 0) + { + if (argv.size() == 4) + { + rondb_hset_command(ndb, argv, response); + } + else + { + char error_message[256]; + snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str()); + assign_generic_err_to_response(response, error_message); + } + } + else if (strcasecmp(command, "HINCR") == 0) + { + if (argv.size() == 3) + { + rondb_hincr_command(ndb, argv, response); + } + else + { + char error_message[256]; + snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str()); + assign_generic_err_to_response(response, error_message); + } + } else { unsupported_command(argv, response); @@ -209,7 +248,8 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv, If this limit is reached, the Ndb object will not create any new ones. Hence, better to catch these cases early. */ - print_args(argv); + printf("Failed to stop transaction\n"); + //print_args(argv); printf("Number of transactions started: %lld\n", ndb->getClientStat(ndb->TransStartCount)); printf("Number of transactions closed: %lld\n", ndb->getClientStat(ndb->TransCloseCount)); exit(1); diff --git a/pink/rondis/sql/HSET_key.sql b/pink/rondis/sql/HSET_key.sql new file mode 100644 index 00000000..c04a70b5 --- /dev/null +++ b/pink/rondis/sql/HSET_key.sql @@ -0,0 +1,7 @@ +CREATE TABLE hset_keys( + redis_key VARBINARY(3000) NOT NULL, + redis_key_id INT UNSIGNED NOT NULL AUTO_INCREMENT, + PRIMARY KEY (redis_key) USING HASH, + UNIQUE KEY (redis_key_id) USING HASH +) ENGINE NDB, +COMMENT = "NDB_TABLE=PARTITION_BALANCE=RP_BY_LDM_X_8"; diff --git a/pink/rondis/sql/STRING_key.sql b/pink/rondis/sql/STRING_key.sql index 171f8dbd..7b8e9148 100644 --- a/pink/rondis/sql/STRING_key.sql +++ b/pink/rondis/sql/STRING_key.sql @@ -1,6 +1,7 @@ -CREATE TABLE redis.string_keys( +CREATE TABLE string_keys( -- Redis actually supports a max key size of 512MiB, -- but we choose not to support that here + redis_key_id BIGINT UNSIGNED NOT NULL, redis_key VARBINARY(3000) NOT NULL, -- This is to save space when referencing the key in the value table rondb_key BIGINT UNSIGNED AUTO_INCREMENT NULL, @@ -14,10 +15,10 @@ CREATE TABLE redis.string_keys( num_rows INT UNSIGNED NOT NULL, value_start VARBINARY(26500) NOT NULL, -- Redis supports get/set of seconds/milliseconds - expiry_date INT UNSIGNED, + expiry_date INT UNSIGNED NOT NULL, -- Easier to sort and delete keys this way KEY expiry_index(expiry_date), - PRIMARY KEY (redis_key) USING HASH, + PRIMARY KEY (redis_key_id, redis_key) USING HASH, UNIQUE KEY (rondb_key) USING HASH ) ENGINE NDB -- Each CHAR will use 1 byte CHARSET = latin1 COMMENT = "NDB_TABLE=PARTITION_BALANCE=FOR_RP_BY_LDM_X_8"; diff --git a/pink/rondis/sql/STRING_value.sql b/pink/rondis/sql/STRING_value.sql index e656b47c..a38c1fbe 100644 --- a/pink/rondis/sql/STRING_value.sql +++ b/pink/rondis/sql/STRING_value.sql @@ -1,8 +1,7 @@ -CREATE TABLE redis.string_values( +CREATE TABLE string_values( rondb_key BIGINT UNSIGNED NOT NULL, ordinal INT UNSIGNED NOT NULL, value VARBINARY(29500) NOT NULL, - PRIMARY KEY (rondb_key, ordinal), - FOREIGN KEY (rondb_key) REFERENCES redis.string_keys(rondb_key) ON UPDATE RESTRICT ON DELETE CASCADE + PRIMARY KEY (rondb_key, ordinal) ) ENGINE NDB, -COMMENT = "NDB_TABLE=PARTITION_BALANCE=RP_BY_LDM_X_8" PARTITION BY KEY (rondb_key); \ No newline at end of file +COMMENT = "NDB_TABLE=PARTITION_BALANCE=RP_BY_LDM_X_8"; diff --git a/pink/rondis/string/commands.cc b/pink/rondis/string/commands.cc index bdbcaaeb..46898320 100644 --- a/pink/rondis/string/commands.cc +++ b/pink/rondis/string/commands.cc @@ -12,8 +12,8 @@ bool setup_transaction( Ndb *ndb, - const pink::RedisCmdArgsType &argv, std::string *response, + Uint64 redis_key_id, struct key_table *key_row, const char *key_str, Uint32 key_len, @@ -38,11 +38,12 @@ bool setup_transaction( assign_ndb_err_to_response(response, FAILED_CREATE_TABLE_OBJECT, dict->getNdbError()); return false; } + key_row->redis_key_id = redis_key_id; memcpy(&key_row->redis_key[2], key_str, key_len); set_length((char*)&key_row->redis_key[0], key_len); NdbTransaction *trans = ndb->startTransaction(tab, - &key_row->redis_key[0], - key_len + 2); + (const char*)&key_row->redis_key_id, + key_len + 10); if (trans == nullptr) { assign_ndb_err_to_response(response, @@ -70,19 +71,22 @@ bool setup_transaction( $0 The key exists but has no value (empty string). */ -void rondb_get_command(Ndb *ndb, - const pink::RedisCmdArgsType &argv, - std::string *response) +static +void rondb_get(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response, + Uint64 redis_key_id) { + Uint32 arg_index_start = (redis_key_id == STRING_REDIS_KEY_ID) ? 1 : 2; const NdbDictionary::Dictionary *dict; const NdbDictionary::Table *tab = nullptr; NdbTransaction *trans = nullptr; struct key_table key_row; - const char *key_str = argv[1].c_str(); - Uint32 key_len = argv[1].size(); + const char *key_str = argv[arg_index_start].c_str(); + Uint32 key_len = argv[arg_index_start].size(); if (!setup_transaction(ndb, - argv, response, + redis_key_id, &key_row, key_str, key_len, @@ -96,23 +100,21 @@ void rondb_get_command(Ndb *ndb, tab, ndb, trans, - &key_row, - key_len); + &key_row); ndb->closeTransaction(trans); if ((ret_code != 0) || key_row.num_rows == 0) { return; } - printf("Getting %d value rows\n", key_row.num_rows); { /* Our value uses value rows, so a more complex read is required. We're starting from scratch here since we'll use a shared lock on the key table this time we read from it. */ - NdbTransaction *trans = ndb->startTransaction(tab, - &(key_row.redis_key[0]), - key_len + 2); + trans = ndb->startTransaction(tab, + (const char*)&key_row.redis_key_id, + key_len + 10); if (trans == nullptr) { assign_ndb_err_to_response(response, @@ -125,27 +127,29 @@ void rondb_get_command(Ndb *ndb, tab, ndb, trans, - &key_row, - key_len); + &key_row); ndb->closeTransaction(trans); return; } } -void rondb_set_command( +static +void rondb_set( Ndb *ndb, const pink::RedisCmdArgsType &argv, - std::string *response) + std::string *response, + Uint64 redis_key_id) { + Uint32 arg_index_start = (redis_key_id == STRING_REDIS_KEY_ID) ? 1 : 2; const NdbDictionary::Dictionary *dict; const NdbDictionary::Table *tab = nullptr; NdbTransaction *trans = nullptr; struct key_table key_row; - const char *key_str = argv[1].c_str(); - Uint32 key_len = argv[1].size(); + const char *key_str = argv[arg_index_start].c_str(); + Uint32 key_len = argv[arg_index_start].size(); if (!setup_transaction(ndb, - argv, response, + redis_key_id, &key_row, key_str, key_len, @@ -154,10 +158,11 @@ void rondb_set_command( &trans)) return; - const char *value_str = argv[2].c_str(); - Uint32 value_len = argv[2].size(); + const char *value_str = argv[arg_index_start + 1].c_str(); + Uint32 value_len = argv[arg_index_start + 1].size(); char varsize_param[EXTENSION_VALUE_LEN + 500]; Uint32 num_value_rows = 0; + Uint32 prev_num_rows = 0; Uint64 rondb_key = 0; if (value_len > INLINE_VALUE_LEN) @@ -180,102 +185,136 @@ void rondb_set_command( if (rondb_get_rondb_key(tab, rondb_key, ndb, response) != 0) { + ndb->closeTransaction(trans); return; } } int ret_code = 0; ret_code = create_key_row(response, - ndb, tab, trans, + redis_key_id, rondb_key, key_str, key_len, value_str, value_len, num_value_rows, - Uint32(0), - &varsize_param[0]); + prev_num_rows, + Uint32(0)); if (ret_code != 0) { // Often unnecessary since it already failed to commit ndb->closeTransaction(trans); - if (ret_code != FOREIGN_KEY_RESTRICT_ERROR) + if (ret_code != RESTRICT_VALUE_ROWS_ERROR) { return; } - else + /* + If we are here, we have tried writing a key that already exists. + This would not be a problem if this key did not have references + to value rows. Hence we first need to delete all of those - this + is best done via a cascade delete. We do a delete & insert in + a single transaction (plus writing the value rows). + */ + trans = ndb->startTransaction(tab, + (const char*)&key_row.redis_key_id, + key_len + 10); + if (trans == nullptr) { - /* - If we are here, we have tried writing a key that already exists. - This would not be a problem if this key did not have references - to value rows. Hence we first need to delete all of those - this - is best done via a cascade delete. We do a delete & insert in - a single transaction (plus writing the value rows). - */ - NdbTransaction *trans = ndb->startTransaction(tab, - &(key_row.redis_key[0]), - key_len + 2); - if (trans == nullptr) - { - assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError()); - return; - } - - if (delete_and_insert_key_row(response, - ndb, - tab, - trans, - rondb_key, - key_str, - key_len, - value_str, - value_len, - num_value_rows, - Uint32(0), - &varsize_param[0]) != 0) - { - ndb->closeTransaction(trans); - return; - } + assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError()); + return; + } + /** + * We don't know the exact number of value rows, but we know that it is + * at least one. + */ + prev_num_rows = 1; + ret_code = create_key_row(response, + tab, + trans, + redis_key_id, + rondb_key, + key_str, + key_len, + value_str, + value_len, + num_value_rows, + prev_num_rows, + Uint32(0)); + if (ret_code != 0) { + ndb->closeTransaction(trans); + return; } + } else if (num_value_rows == 0) { + ndb->closeTransaction(trans); + response->append("+OK\r\n"); + return; } - - if (num_value_rows == 0) + /** + * Coming here means that we either have to add new value rows or we have + * to delete previous value rows or both. Thus the transaction is still + * open. We start by creating the new value rows. Next we delete the + * remaining value rows from the previous instantiation of the row. + */ + if (num_value_rows > 0) { + ret_code = create_all_value_rows(response, + ndb, + dict, + trans, + rondb_key, + value_str, + value_len, + num_value_rows, + &varsize_param[0]); + } + if (ret_code != 0) { + ndb->closeTransaction(trans); + return; + } + ret_code = delete_value_rows(response, + tab, + trans, + rondb_key, + num_value_rows, + prev_num_rows); + if (ret_code != 0) { + ndb->closeTransaction(trans); + return; + } + if (trans->execute(NdbTransaction::Commit, + NdbOperation::AbortOnError) == 0 && + trans->getNdbError().code != 0) { ndb->closeTransaction(trans); - response->append("+OK\r\n"); + assign_ndb_err_to_response(response, + FAILED_EXEC_TXN, + trans->getNdbError()); return; } - printf("Inserting %d value rows\n", num_value_rows); - create_all_value_rows(response, - ndb, - dict, - trans, - rondb_key, - value_str, - value_len, - num_value_rows, - &varsize_param[0]); ndb->closeTransaction(trans); + response->append("+OK\r\n"); return; } -void rondb_incr_command( +static +void rondb_incr( Ndb *ndb, const pink::RedisCmdArgsType &argv, - std::string *response) + std::string *response, + Uint64 redis_key_id) { + Uint32 arg_index_start = (redis_key_id == STRING_REDIS_KEY_ID) ? 1 : 2; const NdbDictionary::Dictionary *dict; const NdbDictionary::Table *tab = nullptr; NdbTransaction *trans = nullptr; struct key_table key_row; - const char *key_str = argv[1].c_str(); - Uint32 key_len = argv[1].size(); + const char *key_str = argv[arg_index_start].c_str(); + Uint32 key_len = argv[arg_index_start].size(); if (!setup_transaction(ndb, - argv, response, + redis_key_id, &key_row, key_str, key_len, @@ -292,3 +331,63 @@ void rondb_incr_command( ndb->closeTransaction(trans); return; } + +void rondb_get_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + return rondb_get(ndb, argv, response, STRING_REDIS_KEY_ID); +} + +void rondb_set_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + return rondb_set(ndb, argv, response, STRING_REDIS_KEY_ID); +} + +void rondb_incr_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + return rondb_incr(ndb, argv, response, STRING_REDIS_KEY_ID); +} + +void rondb_hget_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + Uint64 redis_key_id; + int ret_code = rondb_get_redis_key_id(ndb, + redis_key_id, + argv[1].c_str(), + argv[1].size(), + response); + return rondb_get(ndb, argv, response, redis_key_id); +} + +void rondb_hset_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + Uint64 redis_key_id; + int ret_code = rondb_get_redis_key_id(ndb, + redis_key_id, + argv[1].c_str(), + argv[1].size(), + response); + return rondb_set(ndb, argv, response, redis_key_id); +} + +void rondb_hincr_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response) +{ + Uint64 redis_key_id; + int ret_code = rondb_get_redis_key_id(ndb, + redis_key_id, + argv[1].c_str(), + argv[1].size(), + response); + return rondb_incr(ndb, argv, response, redis_key_id); +} diff --git a/pink/rondis/string/commands.h b/pink/rondis/string/commands.h index 4e97fd1e..29796fcd 100644 --- a/pink/rondis/string/commands.h +++ b/pink/rondis/string/commands.h @@ -22,9 +22,6 @@ Most importantly, it writes Ndb error messages to the response string. This may however change in the future, since this causes redundancy. */ -void set_length(char* buf, Uint32 key_len); -Uint32 get_length(char* buf); - void rondb_get_command(Ndb *ndb, const pink::RedisCmdArgsType &argv, std::string *response); @@ -36,4 +33,16 @@ void rondb_set_command(Ndb *ndb, void rondb_incr_command(Ndb *ndb, const pink::RedisCmdArgsType &argv, std::string *response); + +void rondb_hget_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response); + +void rondb_hset_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response); + +void rondb_hincr_command(Ndb *ndb, + const pink::RedisCmdArgsType &argv, + std::string *response); #endif diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index d45a29e5..a87b5233 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -1,3 +1,4 @@ +#include #include #include #include @@ -10,278 +11,245 @@ #include "table_definitions.h" #include "interpreted_code.h" +NdbRecord *pk_hset_key_record = nullptr; +NdbRecord *entire_hset_key_record = nullptr; NdbRecord *pk_key_record = nullptr; NdbRecord *entire_key_record = nullptr; NdbRecord *pk_value_record = nullptr; NdbRecord *entire_value_record = nullptr; -void set_length(char *buf, Uint32 key_len) -{ - Uint8 *ptr = (Uint8 *)buf; - ptr[0] = (Uint8)(key_len & 255); - ptr[1] = (Uint8)(key_len >> 8); -} - -Uint32 get_length(char *buf) -{ - Uint8 *ptr = (Uint8 *)buf; - Uint8 low = ptr[0]; - Uint8 high = ptr[1]; - Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high); - return len32; -} - int create_key_row(std::string *response, - Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, + Uint64 redis_key_id, Uint64 rondb_key, const char *key_str, Uint32 key_len, const char *value_str, Uint32 tot_value_len, Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - NdbOperation *write_op = trans->getNdbOperation(tab); - if (write_op == nullptr) - { - assign_ndb_err_to_response(response, - FAILED_GET_OP, - trans->getNdbError()); - return -1; + Uint32 &prev_num_rows, + Uint32 row_state) { + const NdbOperation *write_op = nullptr; + NdbRecAttr *recAttr = nullptr; + int ret_code = write_data_to_key_op(response, + &write_op, + tab, + trans, + redis_key_id, + rondb_key, + key_str, + key_len, + value_str, + tot_value_len, + num_value_rows, + prev_num_rows, + row_state, + &recAttr); + if (ret_code != 0) { + return ret_code; } - write_op->writeTuple(); - write_data_to_key_op(write_op, - rondb_key, - key_str, - key_len, - value_str, - tot_value_len, - num_value_rows, - row_state, - buf); + if (num_value_rows == 0 && prev_num_rows == 0) { - if (write_op->getNdbError().code != 0) + if (trans->execute(NdbTransaction::Commit, + NdbOperation::AbortOnError) == 0 && + trans->getNdbError().code == 0) { - assign_ndb_err_to_response(response, - FAILED_DEFINE_OP, - write_op->getNdbError()); - return -1; + return 0; } } + else { - int ret_code = 0; - if (num_value_rows == 0) + if (trans->execute(NdbTransaction::NoCommit, + NdbOperation::AbortOnError) == 0 && + trans->getNdbError().code == 0) { - if (trans->execute(NdbTransaction::Commit, - NdbOperation::AbortOnError) == 0 && - trans->getNdbError().code == 0) - { - return 0; - } - } - else - { - if (trans->execute(NdbTransaction::NoCommit, - NdbOperation::AbortOnError) == 0 && - trans->getNdbError().code == 0) - { - return 0; - } + prev_num_rows = recAttr->u_32_value(); + return 0; } + } - if (trans->getNdbError().code != FOREIGN_KEY_RESTRICT_ERROR) - { - assign_ndb_err_to_response(response, - FAILED_EXEC_TXN, - trans->getNdbError()); - } - return trans->getNdbError().code; + if (trans->getNdbError().code != RESTRICT_VALUE_ROWS_ERROR) + { + assign_ndb_err_to_response(response, + FAILED_EXEC_TXN, + trans->getNdbError()); } + return trans->getNdbError().code; } -int delete_and_insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - if (delete_key_row(response, - ndb, - tab, - trans, - key_str, - key_len, - buf) != 0) +int write_data_to_key_op(std::string *response, + const NdbOperation **ndb_op, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 redis_key_id, + Uint64 rondb_key, + const char *key_str, + Uint32 key_len, + const char *value_str, + Uint32 tot_value_len, + Uint32 num_value_rows, + Uint32 & prev_num_rows, + Uint32 row_state, + NdbRecAttr **recAttr) { + struct key_table key_row; + Uint32 mask = 0xFF; + key_row.null_bits = 0; + memcpy(&key_row.redis_key[2], key_str, key_len); + set_length(&key_row.redis_key[0], key_len); + key_row.redis_key_id = redis_key_id; + if (rondb_key == 0) { - return -1; + mask = 0xFB; } + else + { + key_row.rondb_key = rondb_key; + } + const unsigned char *mask_ptr = (const unsigned char *)&mask; + key_row.tot_value_len = tot_value_len; + key_row.num_rows = num_value_rows; + key_row.value_data_type = row_state; + key_row.expiry_date = 0; - return insert_key_row(response, - ndb, - tab, - trans, - rondb_key, - key_str, - key_len, - value_str, - tot_value_len, - num_value_rows, - row_state, - buf); -} - -int delete_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - const char *key_str, - Uint32 key_len, - char *buf) -{ - NdbOperation *del_op = trans->getNdbOperation(tab); - if (del_op == nullptr) + Uint32 this_value_len = tot_value_len; + if (this_value_len > INLINE_VALUE_LEN) { - assign_ndb_err_to_response(response, - FAILED_GET_OP, - trans->getNdbError()); - return -1; + this_value_len = INLINE_VALUE_LEN; } - del_op->deleteTuple(); - memcpy(&buf[2], key_str, key_len); - set_length(buf, key_len); - del_op->equal(KEY_TABLE_COL_redis_key, buf); + memcpy(&key_row.value_start[2], value_str, this_value_len); + set_length(&key_row.value_start[0], this_value_len); - if (del_op->getNdbError().code != 0) + Uint32 code_buffer[64]; + NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); + int ret_code = 0; + if (num_value_rows > 0 || prev_num_rows > 0) { + ret_code = write_key_row_no_commit(response, code, tab); + } + else { - assign_ndb_err_to_response(response, - FAILED_DEFINE_OP, - del_op->getNdbError()); - return -1; + ret_code = write_key_row_commit(response, code, tab); + } + if (ret_code != 0) { + return ret_code; } - if (trans->execute(NdbTransaction::NoCommit, - NdbOperation::AbortOnError) != 0 || - trans->getNdbError().code != 0) + // Prepare the interpreted program to be part of the write + NdbOperation::OperationOptions opts; + std::memset(&opts, 0, sizeof(opts)); + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT; + opts.interpretedCode = &code; + + NdbOperation::GetValueSpec getvals[1]; + getvals[0].appStorage = nullptr; + getvals[0].recAttr = nullptr; + getvals[0].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE; + opts.numExtraGetFinalValues = 1; + opts.extraGetFinalValues = getvals; + + /* Define the actual operation to be sent to RonDB data node. */ + const NdbOperation *op = trans->writeTuple( + pk_key_record, + (const char *)&key_row, + entire_key_record, + (char *)&key_row, + mask_ptr, + &opts, + sizeof(opts)); + if (op == nullptr) { assign_ndb_err_to_response(response, - FAILED_EXEC_TXN, + "Failed to create NdbOperation", trans->getNdbError()); return -1; } + *ndb_op = op; + *recAttr = getvals[0].recAttr; return 0; } -int insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - { - NdbOperation *insert_op = trans->getNdbOperation(tab); - if (insert_op == nullptr) +int delete_value_rows(std::string *response, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 rondb_key, + Uint32 start_ordinal, + Uint32 end_ordinal) { + for (Uint32 i = start_ordinal; i < end_ordinal; i++) { + NdbOperation *del_op = trans->getNdbOperation(tab); + if (del_op == nullptr) { assign_ndb_err_to_response(response, FAILED_GET_OP, trans->getNdbError()); return -1; } - insert_op->insertTuple(); - write_data_to_key_op(insert_op, - rondb_key, - key_str, - key_len, - value_str, - tot_value_len, - num_value_rows, - row_state, - buf); - if (insert_op->getNdbError().code != 0) + del_op->deleteTuple(); + del_op->equal(VALUE_TABLE_COL_rondb_key, rondb_key); + if (del_op->getNdbError().code != 0) { assign_ndb_err_to_response(response, FAILED_DEFINE_OP, - insert_op->getNdbError()); + del_op->getNdbError()); return -1; } } + if (start_ordinal >= end_ordinal) { + return 0; + } + if (trans->execute(NdbTransaction::NoCommit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) { - if (num_value_rows == 0) - { - if (trans->execute(NdbTransaction::Commit, - NdbOperation::AbortOnError) == 0 && - trans->getNdbError().code == 0) - { - return 0; - } - } - else - { - if (trans->execute(NdbTransaction::NoCommit, - NdbOperation::AbortOnError) == 0 && - trans->getNdbError().code == 0) - { - return 0; - } - } assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError()); return -1; } + return 0; } -void write_data_to_key_op(NdbOperation *ndb_op, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ +int delete_key_row(std::string *response, + Ndb *ndb, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 redis_key_id, + const char *key_str, + Uint32 key_len, + char *buf) { + NdbOperation *del_op = trans->getNdbOperation(tab); + if (del_op == nullptr) + { + assign_ndb_err_to_response(response, + FAILED_GET_OP, + trans->getNdbError()); + return -1; + } + del_op->deleteTuple(); memcpy(&buf[2], key_str, key_len); set_length(buf, key_len); - ndb_op->equal(KEY_TABLE_COL_redis_key, buf); + del_op->equal(KEY_TABLE_COL_redis_key, buf); + del_op->equal(KEY_TABLE_COL_redis_key_id, redis_key_id); - if (rondb_key == 0) - { - ndb_op->setValue(KEY_TABLE_COL_rondb_key, (char *)NULL); - } - else + if (del_op->getNdbError().code != 0) { - ndb_op->setValue(KEY_TABLE_COL_rondb_key, rondb_key); + assign_ndb_err_to_response(response, + FAILED_DEFINE_OP, + del_op->getNdbError()); + return -1; } - ndb_op->setValue(KEY_TABLE_COL_tot_value_len, tot_value_len); - ndb_op->setValue(KEY_TABLE_COL_num_rows, num_value_rows); - ndb_op->setValue(KEY_TABLE_COL_value_data_type, row_state); - ndb_op->setValue(KEY_TABLE_COL_expiry_date, 0); - Uint32 this_value_len = tot_value_len; - if (this_value_len > INLINE_VALUE_LEN) + if (trans->execute(NdbTransaction::NoCommit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) { - this_value_len = INLINE_VALUE_LEN; + assign_ndb_err_to_response(response, + FAILED_EXEC_TXN, + trans->getNdbError()); + return -1; } - memcpy(&buf[2], value_str, this_value_len); - set_length(buf, this_value_len); - ndb_op->setValue(KEY_TABLE_COL_value_start, buf); + return 0; } int create_value_row(std::string *response, @@ -292,8 +260,7 @@ int create_value_row(std::string *response, Uint64 rondb_key, Uint32 this_value_len, Uint32 ordinal, - char *buf) -{ + char *buf) { const NdbDictionary::Table *tab = dict->getTable(VALUE_TABLE_NAME); if (tab == nullptr) { @@ -310,18 +277,16 @@ int create_value_row(std::string *response, trans->getNdbError()); return -1; } - op->insertTuple(); + op->writeTuple(); op->equal(VALUE_TABLE_COL_rondb_key, rondb_key); op->equal(VALUE_TABLE_COL_ordinal, ordinal); memcpy(&buf[2], start_value_ptr, this_value_len); set_length(buf, this_value_len); op->setValue(VALUE_TABLE_COL_value, buf); + if (op->getNdbError().code != 0) { - if (op->getNdbError().code != 0) - { - assign_ndb_err_to_response(response, FAILED_DEFINE_OP, op->getNdbError()); - return -1; - } + assign_ndb_err_to_response(response, FAILED_DEFINE_OP, op->getNdbError()); + return -1; } return 0; } @@ -334,8 +299,7 @@ int create_all_value_rows(std::string *response, const char *value_str, Uint32 value_len, Uint32 num_value_rows, - char *buf) -{ + char *buf) { Uint32 remaining_len = value_len - INLINE_VALUE_LEN; const char *start_value_ptr = &value_str[INLINE_VALUE_LEN]; for (Uint32 ordinal = 0; ordinal < num_value_rows; ordinal++) @@ -359,17 +323,17 @@ int create_all_value_rows(std::string *response, } remaining_len -= this_value_len; start_value_ptr += this_value_len; + if (ordinal == (num_value_rows - 1) || + ordinal % MAX_VALUES_TO_WRITE == (MAX_VALUES_TO_WRITE - 1)) { + if (trans->execute(NdbTransaction::NoCommit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError()); + return -1; + } + } } - - if (trans->execute(NdbTransaction::Commit, - NdbOperation::AbortOnError) != 0 || - trans->getNdbError().code != 0) - { - assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError()); - return -1; - } - - response->append("+OK\r\n"); return 0; } @@ -377,15 +341,13 @@ int get_simple_key_row(std::string *response, const NdbDictionary::Table *tab, Ndb *ndb, NdbTransaction *trans, - struct key_table *key_row, - Uint32 key_len) -{ + struct key_table *key_row) { /** * Mask and options means simply reading all columns - * except primary key column. + * except primary key columns. */ - const Uint32 mask = 0xFE; + const Uint32 mask = 0x1FC; const unsigned char *mask_ptr = (const unsigned char *)&mask; const NdbOperation *read_op = trans->readTuple( pk_key_record, @@ -445,8 +407,7 @@ int get_value_rows(std::string *response, NdbTransaction *trans, const Uint32 num_rows, const Uint64 rondb_key, - const Uint32 tot_value_len) -{ + const Uint32 tot_value_len) { const NdbDictionary::Table *tab = dict->getTable(VALUE_TABLE_NAME); if (tab == nullptr) { @@ -464,7 +425,8 @@ int get_value_rows(std::string *response, Uint32 num_rows_to_read = std::min(ROWS_PER_READ, num_rows - start_ordinal); bool is_last_batch = (batch == (num_read_batches - 1)); - NdbTransaction::ExecType commit_type = is_last_batch ? NdbTransaction::Commit : NdbTransaction::NoCommit; + NdbTransaction::ExecType commit_type = is_last_batch ? + NdbTransaction::Commit : NdbTransaction::NoCommit; if (read_batched_value_rows(response, trans, @@ -485,8 +447,7 @@ int read_batched_value_rows(std::string *response, const Uint64 rondb_key, const Uint32 num_rows_to_read, const Uint32 start_ordinal, - const NdbTransaction::ExecType commit_type) -{ + const NdbTransaction::ExecType commit_type) { struct value_table value_rows[ROWS_PER_READ]; Uint32 ordinal = start_ordinal; @@ -534,9 +495,7 @@ int get_complex_key_row(std::string *response, const NdbDictionary::Table *tab, Ndb *ndb, NdbTransaction *trans, - struct key_table *key_row, - Uint32 key_len) -{ + struct key_table *key_row) { /** * Since a simple read using CommittedRead we will go back to * the safe method where we first read with lock the key row @@ -547,7 +506,7 @@ int get_complex_key_row(std::string *response, * except primary key column. */ - const Uint32 mask = 0xFE; + const Uint32 mask = 0x1FC; const unsigned char *mask_ptr = (const unsigned char *)&mask; const NdbOperation *read_op = trans->readTuple( pk_key_record, @@ -606,8 +565,7 @@ int get_complex_key_row(std::string *response, int rondb_get_rondb_key(const NdbDictionary::Table *tab, Uint64 &rondb_key, Ndb *ndb, - std::string *response) -{ + std::string *response) { if (ndb->getAutoIncrementValue(tab, rondb_key, unsigned(1024)) != 0) { assign_ndb_err_to_response(response, @@ -622,8 +580,7 @@ void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, - struct key_table *key_row) -{ + struct key_table *key_row) { /** * The mask specifies which columns is to be updated after the interpreter * has finished. The values are set in the key_row. @@ -634,7 +591,7 @@ void incr_key_row(std::string *response, * are updated through final update. */ - const Uint32 mask = 0x55; + const Uint32 mask = 0xAB; const unsigned char *mask_ptr = (const unsigned char *)&mask; // redis_key already set as this is the Primary key @@ -719,3 +676,64 @@ void incr_key_row(std::string *response, response->assign(header_buf); return; } + +static +int get_unique_redis_key_id(const NdbDictionary::Table *tab, + Ndb *ndb, + Uint64 &redis_key_id, + std::string *response) { + + if (ndb->getAutoIncrementValue(tab, redis_key_id, unsigned(1024)) != 0) + { + assign_ndb_err_to_response(response, + "Failed to get autoincrement value", + ndb->getNdbError()); + return -1; + } + return 0; +} + +std::unordered_map redis_key_id_hash; +int rondb_get_redis_key_id(Ndb *ndb, + Uint64 &redis_key_id, + const char *key_str, + Uint32 key_len, + std::string *response) { + std::string std_key_str = std::string(key_str, key_len); + auto it = redis_key_id_hash.find(std_key_str); + if (it == redis_key_id_hash.end()) { + /* Found no redis_key_id in local hash */ + const NdbDictionary::Dictionary *dict = ndb->getDictionary(); + if (dict == nullptr) + { + assign_ndb_err_to_response(response, FAILED_GET_DICT, ndb->getNdbError()); + return -1; + } + const NdbDictionary::Table *tab = dict->getTable(HSET_KEY_TABLE_NAME); + if (tab == nullptr) + { + assign_ndb_err_to_response(response, FAILED_CREATE_TABLE_OBJECT, dict->getNdbError()); + return -1; + } + int ret_code = get_unique_redis_key_id(tab, + ndb, + redis_key_id, + response); + if (ret_code < 0) { + return -1; + } + ret_code = write_hset_key_table(ndb, + tab, + std_key_str, + redis_key_id, + response); + if (ret_code < 0) { + return -1; + } + redis_key_id_hash[std_key_str] = redis_key_id; + } else { + /* Found local redis_key_id */ + redis_key_id = it->second; + } + return 0; +} diff --git a/pink/rondis/string/db_operations.h b/pink/rondis/string/db_operations.h index cc80d702..e34cff3a 100644 --- a/pink/rondis/string/db_operations.h +++ b/pink/rondis/string/db_operations.h @@ -11,62 +11,42 @@ const Uint32 ROWS_PER_READ = 2; int create_key_row(std::string *response, - Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, + Uint64 redis_key_id, Uint64 rondb_key, const char *key_str, Uint32 key_len, const char *value_str, Uint32 tot_value_len, Uint32 num_value_rows, - Uint32 row_state, - char *buf); - -void write_data_to_key_op(NdbOperation *ndb_op, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf); + Uint32 &prev_num_rows, + Uint32 row_state); -int delete_and_insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf); +int write_data_to_key_op(std::string *response, + const NdbOperation **ndb_op, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 redis_key_id, + Uint64 rondb_key, + const char *key_str, + Uint32 key_len, + const char *value_str, + Uint32 tot_value_len, + Uint32 num_value_rows, + Uint32 &prev_num_rows, + Uint32 row_state, + NdbRecAttr **recAttr); int delete_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, + Uint64 redis_key_id, const char *key_str, Uint32 key_len, char *buf); -int insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf); - int create_value_row(std::string *response, Ndb *ndb, const NdbDictionary::Dictionary *dict, @@ -87,6 +67,12 @@ int create_all_value_rows(std::string *response, Uint32 num_value_rows, char *buf); +int delete_value_rows(std::string *response, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 rondb_key, + Uint32 start_ordinal, + Uint32 end_ordinal); /* Since the beginning of the value is saved within the key table, it can suffice to read the key table to get the value. If the value is @@ -95,16 +81,14 @@ int get_simple_key_row(std::string *response, const NdbDictionary::Table *tab, Ndb *ndb, NdbTransaction *trans, - struct key_table *key_row, - Uint32 key_len); + struct key_table *key_row); int get_complex_key_row(std::string *response, const NdbDictionary::Dictionary *dict, const NdbDictionary::Table *tab, Ndb *ndb, NdbTransaction *trans, - struct key_table *row, - Uint32 key_len); + struct key_table *row); int get_value_rows(std::string *response, Ndb *ndb, @@ -131,4 +115,10 @@ void incr_key_row(std::string *response, const NdbDictionary::Table *tab, NdbTransaction *trans, struct key_table *key_row); + +int rondb_get_redis_key_id(Ndb *ndb, + Uint64 &redis_key_id, + const char *key_str, + Uint32 key_len, + std::string *response); #endif diff --git a/pink/rondis/string/interpreted_code.cc b/pink/rondis/string/interpreted_code.cc index f880847d..41bed036 100644 --- a/pink/rondis/string/interpreted_code.cc +++ b/pink/rondis/string/interpreted_code.cc @@ -2,6 +2,7 @@ #include #include "../common.h" +#include "commands.h" #include "interpreted_code.h" #include "table_definitions.h" @@ -81,3 +82,152 @@ int initNdbCodeIncr(std::string *response, } return 0; } + +int write_hset_key_table(Ndb *ndb, + const NdbDictionary::Table *tab, + std::string std_key_str, + Uint64 & redis_key_id, + std::string *response) { + /* Prepare primary key */ + struct hset_key_table key_row; + const char *key_str = std_key_str.c_str(); + Uint32 key_len = std_key_str.size(); + set_length(&key_row.redis_key[0], key_len); + memcpy(&key_row.redis_key[2], key_str, key_len); + + const Uint32 mask = 0x1; // Write primary key + const unsigned char *mask_ptr = (const unsigned char *)&mask; + const NdbDictionary::Column *redis_key_id_col = tab->getColumn(HSET_KEY_TABLE_COL_redis_key_id); + Uint32 code_buffer[64]; + NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); + code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); // Inserts go to label 0 + /* UPDATE */ + code.read_attr(REG7, redis_key_id_col); + code.write_interpreter_output(REG7, OUTPUT_INDEX); // Write into output index 0 + code.interpret_exit_ok(); + + /* INSERT */ + code.def_label(LABEL0); + code.load_const_u64(REG7, redis_key_id); + code.write_attr(redis_key_id_col, REG7); + code.write_interpreter_output(REG7, OUTPUT_INDEX); // Write into output index 0 + code.interpret_exit_ok(); + + // Program end, now compile code + int ret_code = code.finalise(); + if (ret_code != 0) + { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code.getNdbError()); + return -1; + } + // Prepare the interpreted program to be part of the write + NdbOperation::OperationOptions opts; + std::memset(&opts, 0, sizeof(opts)); + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT; + opts.interpretedCode = &code; + + NdbOperation::GetValueSpec getvals[1]; + getvals[0].appStorage = nullptr; + getvals[0].recAttr = nullptr; + getvals[0].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE; + opts.numExtraGetFinalValues = 1; + opts.extraGetFinalValues = getvals; + + /* Start a transaction */ + NdbTransaction *trans = ndb->startTransaction(tab, + (const char*)&key_row.redis_key_id, + key_len + 2); + /* Define the actual operation to be sent to RonDB data node. */ + const NdbOperation *op = trans->writeTuple( + pk_hset_key_record, + (const char *)&key_row, + entire_hset_key_record, + (char *)&key_row, + mask_ptr, + &opts, + sizeof(opts)); + if (op == nullptr) + { + ndb->closeTransaction(trans); + assign_ndb_err_to_response(response, + "Failed to create NdbOperation", + trans->getNdbError()); + return -1; + } + if (trans->execute(NdbTransaction::Commit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) + { + ndb->closeTransaction(trans); + assign_ndb_err_to_response(response, + FAILED_HSET_KEY, + trans->getNdbError()); + return -1; + } + /* Retrieve the returned new value as an Uint64 value */ + NdbRecAttr *recAttr = getvals[0].recAttr; + redis_key_id = recAttr->u_64_value(); + ndb->closeTransaction(trans); + return 0; +} + +int write_key_row_no_commit(std::string *response, + NdbInterpretedCode &code, + const NdbDictionary::Table *tab) { + const NdbDictionary::Column *num_rows_col = tab->getColumn(KEY_TABLE_COL_num_rows); + code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); // Inserts go to label 0 + /* UPDATE */ + code.read_attr(REG7, num_rows_col); + code.write_interpreter_output(REG7, OUTPUT_INDEX); // Write into output index 0 + code.interpret_exit_ok(); + + /* INSERT */ + code.def_label(LABEL0); + code.load_const_u16(REG7, 0); + code.write_interpreter_output(REG7, OUTPUT_INDEX); // Write into output index 0 + code.interpret_exit_ok(); + + // Program end, now compile code + int ret_code = code.finalise(); + if (ret_code != 0) + { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code.getNdbError()); + return -1; + } + return 0; +} + +int write_key_row_commit(std::string *response, + NdbInterpretedCode &code, + const NdbDictionary::Table *tab) { + const NdbDictionary::Column *num_rows_col = tab->getColumn(KEY_TABLE_COL_num_rows); + code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); // Inserts go to label 0 + /* UPDATE */ + code.read_attr(REG7, num_rows_col); + code.branch_eq_const(REG7, 0, LABEL0); + code.interpret_exit_nok(6000); + + /* INSERT */ + code.def_label(LABEL0); + code.interpret_exit_ok(); + + // Program end, now compile code + int ret_code = code.finalise(); + if (ret_code != 0) + { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code.getNdbError()); + return -1; + } + return 0; +} diff --git a/pink/rondis/string/interpreted_code.h b/pink/rondis/string/interpreted_code.h index e8bb311e..f4f2b20d 100644 --- a/pink/rondis/string/interpreted_code.h +++ b/pink/rondis/string/interpreted_code.h @@ -33,4 +33,15 @@ int initNdbCodeIncr(std::string *response, NdbInterpretedCode *code, const NdbDictionary::Table *tab); +int write_hset_key_table(Ndb *ndb, + const NdbDictionary::Table *tab, + std::string std_key_str, + Uint64 & redis_key_id, + std::string *response); +int write_key_row_commit(std::string *response, + NdbInterpretedCode &code, + const NdbDictionary::Table *tab); +int write_key_row_no_commit(std::string *response, + NdbInterpretedCode &code, + const NdbDictionary::Table *tab); #endif diff --git a/pink/rondis/string/table_definitions.cc b/pink/rondis/string/table_definitions.cc index 20345363..2b64a4bf 100644 --- a/pink/rondis/string/table_definitions.cc +++ b/pink/rondis/string/table_definitions.cc @@ -16,6 +16,47 @@ * Create NdbRecord's for all table accesses, they can be reused * for all Ndb objects. */ +int init_hset_key_records(NdbDictionary::Dictionary *dict) +{ + const NdbDictionary::Table *tab = dict->getTable(HSET_KEY_TABLE_NAME); + if (tab == nullptr) + { + printf("Failed getting Ndb table %s\n", HSET_KEY_TABLE_NAME); + return -1; + } + + const NdbDictionary::Column *redis_key_col = tab->getColumn(HSET_KEY_TABLE_COL_redis_key); + const NdbDictionary::Column *redis_key_id_col = tab->getColumn(HSET_KEY_TABLE_COL_redis_key_id); + + if (redis_key_col == nullptr || + redis_key_id_col == nullptr) + { + printf("Failed getting Ndb columns for table %s\n", HSET_KEY_TABLE_NAME); + return -1; + } + + std::map> pk_lookup_column_map = { + {redis_key_col, {offsetof(struct hset_key_table, redis_key), 0}}, + }; + if (init_record(dict, tab, pk_lookup_column_map, pk_hset_key_record) != 0) + { + printf("Failed creating pk-lookup record for table %s\n", HSET_KEY_TABLE_NAME); + return -1; + } + + std::map> read_all_column_map = { + {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, + {redis_key_id_col, {offsetof(struct key_table, redis_key_id), 0}}, + }; + + if (init_record(dict, tab, read_all_column_map, entire_hset_key_record) != 0) + { + printf("Failed creating read-all cols record for table %s\n", HSET_KEY_TABLE_NAME); + return -1; + } + return 0; +} + int init_key_records(NdbDictionary::Dictionary *dict) { const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME); @@ -25,6 +66,7 @@ int init_key_records(NdbDictionary::Dictionary *dict) return -1; } + const NdbDictionary::Column *redis_key_id_col = tab->getColumn(KEY_TABLE_COL_redis_key_id); const NdbDictionary::Column *redis_key_col = tab->getColumn(KEY_TABLE_COL_redis_key); const NdbDictionary::Column *rondb_key_col = tab->getColumn(KEY_TABLE_COL_rondb_key); const NdbDictionary::Column *expiry_date_col = tab->getColumn(KEY_TABLE_COL_expiry_date); @@ -34,6 +76,7 @@ int init_key_records(NdbDictionary::Dictionary *dict) const NdbDictionary::Column *value_data_type_col = tab->getColumn(KEY_TABLE_COL_value_data_type); if (redis_key_col == nullptr || + redis_key_id_col == nullptr || rondb_key_col == nullptr || expiry_date_col == nullptr || value_start_col == nullptr || @@ -46,6 +89,7 @@ int init_key_records(NdbDictionary::Dictionary *dict) } std::map> pk_lookup_column_map = { + {redis_key_id_col, {offsetof(struct key_table, redis_key_id), 0}}, {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, }; if (init_record(dict, tab, pk_lookup_column_map, pk_key_record) != 0) @@ -55,6 +99,7 @@ int init_key_records(NdbDictionary::Dictionary *dict) } std::map> read_all_column_map = { + {redis_key_id_col, {offsetof(struct key_table, redis_key_id), 0}}, {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, {rondb_key_col, {offsetof(struct key_table, rondb_key), 0}}, {expiry_date_col, {offsetof(struct key_table, expiry_date), 1}}, @@ -141,7 +186,13 @@ int init_record(NdbDictionary::Dictionary *dict, int init_string_records(NdbDictionary::Dictionary *dict) { - int res = init_key_records(dict); + int res = init_hset_key_records(dict); + if (res != 0) + { + return res; + } + + res = init_key_records(dict); if (res != 0) { return res; diff --git a/pink/rondis/string/table_definitions.h b/pink/rondis/string/table_definitions.h index 60280c92..d9fa542a 100644 --- a/pink/rondis/string/table_definitions.h +++ b/pink/rondis/string/table_definitions.h @@ -10,12 +10,34 @@ - one NdbRecord defining the columns we want to fetch */ +#define MAX_VALUES_TO_WRITE 4 +#define MAX_KEY_VALUE_LEN 3000 +#define STRING_REDIS_KEY_ID 0 + /* - KEY TABLE + HSET KEY TABLE +*/ +#define HSET_KEY_TABLE_NAME "hset_keys" + +int init_hset_key_records(NdbDictionary::Dictionary *dict); + +extern NdbRecord *pk_hset_key_record; +extern NdbRecord *entire_hset_key_record; + +#define HSET_KEY_TABLE_COL_redis_key "redis_key" +#define HSET_KEY_TABLE_COL_redis_key_id "redis_key_id" + +struct hset_key_table +{ + Uint64 redis_key_id; + char redis_key[MAX_KEY_VALUE_LEN + 2]; +}; + +/* + KEY AND FIELD TABLE */ #define KEY_TABLE_NAME "string_keys" -#define MAX_KEY_VALUE_LEN 3000 #define INLINE_VALUE_LEN 26500 int init_key_records(NdbDictionary::Dictionary *dict); @@ -27,6 +49,7 @@ extern NdbRecord *entire_key_record; Doing this instead of reflection; Keep these the same as the field names in the key_table struct. */ +#define KEY_TABLE_COL_redis_key_id "redis_key_id" #define KEY_TABLE_COL_redis_key "redis_key" #define KEY_TABLE_COL_rondb_key "rondb_key" #define KEY_TABLE_COL_expiry_date "expiry_date" @@ -38,6 +61,7 @@ extern NdbRecord *entire_key_record; struct key_table { Uint32 null_bits; + Uint64 redis_key_id; char redis_key[MAX_KEY_VALUE_LEN + 2]; Uint64 rondb_key; Uint32 expiry_date; diff --git a/pink/rondis/tests/get_set.sh b/pink/rondis/tests/get_set.sh index cf47a299..7d05740f 100755 --- a/pink/rondis/tests/get_set.sh +++ b/pink/rondis/tests/get_set.sh @@ -21,7 +21,7 @@ EOF set_output=$(redis-cli SET "$key" "$value") fi - echo $set_output + #echo $set_output if [[ $set_output == ERR* ]]; then echo "FAIL: Could not SET $key with given value" exit 1 @@ -106,6 +106,7 @@ echo "Testing edge case large key length (Redis allows up to 512MB for the value edge_value=$(head -c 100000 < /dev/zero | tr '\0' 'b') set_and_get "$KEY:edge_large" "$edge_value" +key="key" incr_key="$key:incr$RANDOM" incr_output=$(redis-cli INCR "$incr_key") incr_result=$(redis-cli GET "$incr_key") diff --git a/pink/rondis/tests/hget_hset.sh b/pink/rondis/tests/hget_hset.sh new file mode 100755 index 00000000..efcb517a --- /dev/null +++ b/pink/rondis/tests/hget_hset.sh @@ -0,0 +1,139 @@ +#!/bin/bash + +set -e + +# Change key suffix using script arguments +HASH_KEY="key" +KEY_SUFFIX=${1:-0} +KEY="test_key_$KEY_SUFFIX" + +# Function to set a value and retrieve it, then verify if it matches +function hset_and_hget() { + local field="$1" + local value="$2" + + # SET the value in Redis + if [[ -f "$value" ]]; then + set_output=$(redis-cli --pipe < "$xxl_file" +# set_and_get "$KEY:xxl" "$xxl_file" +# rm "$xxl_file" + +echo "Testing non-ASCII string..." +hset_and_hget "$KEY:nonascii" "こんにちは世界" # Japanese for "Hello, World" + +echo "Testing binary data..." +binary_value=$(echo -e "\x01\x02\x03\x04\x05\x06\x07") +hset_and_hget "$KEY:binary" "$binary_value" + +echo "Testing unicode characters..." +unicode_value="🔥💧🌳" +hset_and_hget "$KEY:unicode" "$unicode_value" + +echo "Testing multiple keys..." +for i in {1..10}; do + test_value="Value_$i"_$(head -c $((RANDOM % 100 + 1)) < /dev/zero | tr '\0' 'a') + hset_and_hget "$KEY:multiple_$i" "$test_value" +done + +echo "Testing piped keys..." +for i in {1..10000}; do + echo "HSET $HASH_KEY $KEY:piped_$i value_$i" +done | redis-cli --pipe --verbose + +echo "Testing edge case large key length (Redis allows up to 512MB for the value)..." +edge_value=$(head -c 100000 < /dev/zero | tr '\0' 'b') +hset_and_hget "$KEY:edge_large" "$edge_value" + +field="key" +incr_field="$field:incr$RANDOM" +incr_output=$(redis-cli HINCR "$HASH_KEY" "$incr_field") +incr_result=$(redis-cli HGET "$HASH_KEY" "$incr_field") +if [[ "$incr_result" == 1 ]]; then + echo "PASS: Incrementing non-existing key $incr_field " +else + echo "FAIL: Incrementing non-existing key $incr_field" + echo "Expected: 1" + echo "Received: $incr_result" + exit 1 +fi + +incr_start_value=$RANDOM +hset_and_hget "$incr_field" $incr_start_value +for i in {1..10}; do + incr_output=$(redis-cli HINCR "$HASH_KEY" "$incr_field") + incr_result=$(redis-cli HGET "$HASH_KEY" "$incr_field") + incr_expected_value=$((incr_start_value + i)) + if [[ "$incr_result" == $incr_expected_value ]]; then + echo "PASS: Incrementing field $incr_field to value $incr_result" + else + echo "FAIL: Incrementing field $incr_field from value $incr_start_value" + echo "Expected: $incr_expected_value" + echo "Received: $incr_result" + exit 1 + fi +done + +echo "All tests completed." From 5bac5be388d9069ff2f3a61e380008164747d229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 10:59:46 +0100 Subject: [PATCH 15/21] Fix CI --- .github/workflows/build_test_push.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_test_push.yaml b/.github/workflows/build_test_push.yaml index 902709d9..e55d758a 100644 --- a/.github/workflows/build_test_push.yaml +++ b/.github/workflows/build_test_push.yaml @@ -84,7 +84,7 @@ jobs: run: | docker exec -i mysqld_1 bash -c "mysql -uroot -e 'CREATE DATABASE redis;'" for sql_file in pink/rondis/sql/*.sql; do - cat "$sql_file" | docker exec -i mysqld_1 mysql -uroot + cat "$sql_file" | docker exec -i mysqld_1 mysql -uroot --database=redis done - name: Connect Docker network From d99920dbe410d55e2a702d1cfa4d366681b5cd50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 12:23:02 +0100 Subject: [PATCH 16/21] Fixed compiler warnings --- pink/src/pink_epoll.cc | 3 ++- pink/src/pink_pubsub.cc | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pink/src/pink_epoll.cc b/pink/src/pink_epoll.cc index ce1f8cac..b16a3d24 100644 --- a/pink/src/pink_epoll.cc +++ b/pink/src/pink_epoll.cc @@ -147,7 +147,8 @@ bool PinkEpoll::Register(const PinkItem& it, bool force) { } notify_queue_protector_.Unlock(); if (success) { - write(notify_send_fd_, "", 1); + int ret_code = write(notify_send_fd_, "", 1); + if (ret_code < 0) success = false; } return success; } diff --git a/pink/src/pink_pubsub.cc b/pink/src/pink_pubsub.cc index 7bef979f..16326cd7 100644 --- a/pink/src/pink_pubsub.cc +++ b/pink/src/pink_pubsub.cc @@ -141,7 +141,7 @@ int PubSubThread::Publish(const std::string& channel, const std::string &msg) { channel_ = channel; message_ = msg; // Send signal to ThreadMain() - write(msg_pfd_[1], "", 1); + (void)write(msg_pfd_[1], "", 1); receiver_mutex_.Lock(); while (receivers_ == -1) { receiver_rsignal_.Wait(); @@ -384,7 +384,7 @@ void *PubSubThread::ThreadMain() { pfe = (pink_epoll_->firedevent()) + i; if (pfe->fd == pink_epoll_->notify_receive_fd()) { // New connection comming if (pfe->mask & PinkEpoll::kRead) { - read(pink_epoll_->notify_receive_fd(), triger, 1); + (void)read(pink_epoll_->notify_receive_fd(), triger, 1); { PinkItem ti = pink_epoll_->notify_queue_pop(); if (ti.notify_type() == kNotiClose) { @@ -404,7 +404,7 @@ void *PubSubThread::ThreadMain() { } if (pfe->fd == msg_pfd_[0]) { // Publish message if (pfe->mask & PinkEpoll::kRead) { - read(msg_pfd_[0], triger, 1); + (void)read(msg_pfd_[0], triger, 1); std::string channel, msg; int32_t receivers = 0; channel = channel_; From 2996041482f6a86ee040444cfd4a67213c26a992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 14:42:13 +0100 Subject: [PATCH 17/21] Add wait for Rondis server to start properly --- .github/workflows/build_test_push.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/build_test_push.yaml b/.github/workflows/build_test_push.yaml index e55d758a..feb5dae7 100644 --- a/.github/workflows/build_test_push.yaml +++ b/.github/workflows/build_test_push.yaml @@ -98,6 +98,10 @@ jobs: docker exec -w $DOCKER_WORK_DIR -e LD_LIBRARY_PATH=/tmp/rondb/lib \ -t $CONTAINER_NAME pink/rondis/rondis 6379 mgmd_1:1186 2 > $LOCAL_RONDIS_LOG & + # Takes a few seconds before all connections are setup and started properly + - name: Wait for Rondis server to start properly + run: sleep 5 + # Running this multiple times to check for memory leaks and that overwrites/updates/deletes work - name: Run tests multiple times run: | From 59849db801c490d6a9860dea7ad0220e1056c4b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 14:50:58 +0100 Subject: [PATCH 18/21] Fix CI --- .github/workflows/build_test_push.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_test_push.yaml b/.github/workflows/build_test_push.yaml index feb5dae7..1f26dacb 100644 --- a/.github/workflows/build_test_push.yaml +++ b/.github/workflows/build_test_push.yaml @@ -77,6 +77,10 @@ jobs: $IMAGE_NAME \ tail -f /dev/null + # Takes a while for RonDB service to start properly + - name: Wait for RonDB service to start properly + run: sleep30 + - name: Build Pink run: docker exec -i $CONTAINER_NAME bash -c "cd pink && ./build.sh" @@ -100,7 +104,7 @@ jobs: # Takes a few seconds before all connections are setup and started properly - name: Wait for Rondis server to start properly - run: sleep 5 + run: sleep 31 # Running this multiple times to check for memory leaks and that overwrites/updates/deletes work - name: Run tests multiple times From 6df271bfa840f42f40daf78652b9013401ee2ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 14:57:39 +0100 Subject: [PATCH 19/21] Fix compiler warnings --- pink/src/pink_pubsub.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pink/src/pink_pubsub.cc b/pink/src/pink_pubsub.cc index 16326cd7..f6ae51ba 100644 --- a/pink/src/pink_pubsub.cc +++ b/pink/src/pink_pubsub.cc @@ -141,7 +141,8 @@ int PubSubThread::Publish(const std::string& channel, const std::string &msg) { channel_ = channel; message_ = msg; // Send signal to ThreadMain() - (void)write(msg_pfd_[1], "", 1); + int ret_code = write(msg_pfd_[1], "", 1); + (void)ret_code; receiver_mutex_.Lock(); while (receivers_ == -1) { receiver_rsignal_.Wait(); @@ -384,7 +385,8 @@ void *PubSubThread::ThreadMain() { pfe = (pink_epoll_->firedevent()) + i; if (pfe->fd == pink_epoll_->notify_receive_fd()) { // New connection comming if (pfe->mask & PinkEpoll::kRead) { - (void)read(pink_epoll_->notify_receive_fd(), triger, 1); + int ret_code = read(pink_epoll_->notify_receive_fd(), triger, 1); + (void)ret_code; { PinkItem ti = pink_epoll_->notify_queue_pop(); if (ti.notify_type() == kNotiClose) { @@ -404,7 +406,8 @@ void *PubSubThread::ThreadMain() { } if (pfe->fd == msg_pfd_[0]) { // Publish message if (pfe->mask & PinkEpoll::kRead) { - (void)read(msg_pfd_[0], triger, 1); + int ret_code = read(msg_pfd_[0], triger, 1); + (void)ret_code; std::string channel, msg; int32_t receivers = 0; channel = channel_; From 61ad88adfd44d794b53e08d66a15e565cafdb1c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 16:46:52 +0100 Subject: [PATCH 20/21] One more fix of CI --- .github/workflows/build_test_push.yaml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.github/workflows/build_test_push.yaml b/.github/workflows/build_test_push.yaml index 1f26dacb..4b942731 100644 --- a/.github/workflows/build_test_push.yaml +++ b/.github/workflows/build_test_push.yaml @@ -77,10 +77,6 @@ jobs: $IMAGE_NAME \ tail -f /dev/null - # Takes a while for RonDB service to start properly - - name: Wait for RonDB service to start properly - run: sleep30 - - name: Build Pink run: docker exec -i $CONTAINER_NAME bash -c "cd pink && ./build.sh" @@ -104,7 +100,7 @@ jobs: # Takes a few seconds before all connections are setup and started properly - name: Wait for Rondis server to start properly - run: sleep 31 + run: sleep5 # Running this multiple times to check for memory leaks and that overwrites/updates/deletes work - name: Run tests multiple times From ff4bb3437bdf6050e5e69ae5e02f643b9d4d2599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20Ronstr=C3=B6m?= Date: Thu, 14 Nov 2024 17:03:15 +0100 Subject: [PATCH 21/21] Fix CI --- .github/workflows/build_test_push.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_test_push.yaml b/.github/workflows/build_test_push.yaml index 4b942731..feb5dae7 100644 --- a/.github/workflows/build_test_push.yaml +++ b/.github/workflows/build_test_push.yaml @@ -100,7 +100,7 @@ jobs: # Takes a few seconds before all connections are setup and started properly - name: Wait for Rondis server to start properly - run: sleep5 + run: sleep 5 # Running this multiple times to check for memory leaks and that overwrites/updates/deletes work - name: Run tests multiple times