diff --git a/pink/rondis/common.cc b/pink/rondis/common.cc index 00b000b3..e5fe4b4a 100644 --- a/pink/rondis/common.cc +++ b/pink/rondis/common.cc @@ -25,3 +25,19 @@ void assign_generic_err_to_response( std::cout << buf; response->assign(buf); } + +void set_length(char *buf, Uint32 key_len) +{ + Uint8 *ptr = (Uint8 *)buf; + ptr[0] = (Uint8)(key_len & 255); + ptr[1] = (Uint8)(key_len >> 8); +} + +Uint32 get_length(char *buf) +{ + Uint8 *ptr = (Uint8 *)buf; + Uint8 low = ptr[0]; + Uint8 high = ptr[1]; + Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high); + return len32; +} diff --git a/pink/rondis/common.h b/pink/rondis/common.h index 8959f629..0b52508d 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -16,6 +16,8 @@ int write_formatted(char *buffer, int bufferSize, const char *format, ...); void assign_ndb_err_to_response(std::string *response, const char *app_str, NdbError error); void assign_generic_err_to_response(std::string *response, const char *app_str); +void set_length(char* buf, Uint32 key_len); +Uint32 get_length(char* buf); // NDB API error messages #define FAILED_GET_DICT "Failed to get NdbDict" diff --git a/pink/rondis/string/commands.cc b/pink/rondis/string/commands.cc index ccb0ac93..75584650 100644 --- a/pink/rondis/string/commands.cc +++ b/pink/rondis/string/commands.cc @@ -162,6 +162,7 @@ void rondb_set( Uint32 value_len = argv[arg_index_start + 1].size(); char varsize_param[EXTENSION_VALUE_LEN + 500]; Uint32 num_value_rows = 0; + Uint32 prev_num_rows = 0; Uint64 rondb_key = 0; if (value_len > INLINE_VALUE_LEN) @@ -191,7 +192,6 @@ void rondb_set( int ret_code = 0; ret_code = create_key_row(response, - ndb, tab, trans, redis_key_id, @@ -201,8 +201,8 @@ void rondb_set( value_str, value_len, num_value_rows, - Uint32(0), - &varsize_param[0]); + prev_num_rows, + Uint32(0)); if (ret_code != 0) { // Often unnecessary since it already failed to commit @@ -226,41 +226,68 @@ void rondb_set( assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError()); return; } - if (delete_and_insert_key_row(response, - ndb, - tab, - trans, - redis_key_id, - rondb_key, - key_str, - key_len, - value_str, - value_len, - num_value_rows, - Uint32(0), - &varsize_param[0]) != 0) - { - ndb->closeTransaction(trans); - return; - } - } - - if (num_value_rows == 0) - { + /** + * We don't know the exact number of value rows, but we know that it is + * at least one. + */ + prev_num_rows = 1; + ret_code = create_key_row(response, + tab, + trans, + redis_key_id, + rondb_key, + key_str, + key_len, + value_str, + value_len, + num_value_rows, + prev_num_rows, + Uint32(0)); + } else if (num_value_rows == 0) { ndb->closeTransaction(trans); response->append("+OK\r\n"); return; } - create_all_value_rows(response, - ndb, - dict, - trans, - rondb_key, - value_str, - value_len, - num_value_rows, - &varsize_param[0]); + /** + * Coming here means that we either have to add new value rows or we have + * to delete previous value rows or both. Thus the transaction is still + * open. We start by creating the new value rows. Next we delete the + * remaining value rows from the previous instantiation of the row. + */ + if (num_value_rows > 0) { + ret_code = create_all_value_rows(response, + ndb, + dict, + trans, + rondb_key, + value_str, + value_len, + num_value_rows, + &varsize_param[0]); + } + if (ret_code != 0) { + return; + } + ret_code = delete_value_rows(response, + tab, + trans, + rondb_key, + num_value_rows, + prev_num_rows); + if (ret_code != 0) { + return; + } + if (trans->execute(NdbTransaction::Commit, + NdbOperation::AbortOnError) == 0 && + trans->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, + FAILED_EXEC_TXN, + trans->getNdbError()); + return; + } ndb->closeTransaction(trans); + response->append("+OK\r\n"); return; } diff --git a/pink/rondis/string/commands.h b/pink/rondis/string/commands.h index 6b5ef535..29796fcd 100644 --- a/pink/rondis/string/commands.h +++ b/pink/rondis/string/commands.h @@ -22,9 +22,6 @@ Most importantly, it writes Ndb error messages to the response string. This may however change in the future, since this causes redundancy. */ -void set_length(char* buf, Uint32 key_len); -Uint32 get_length(char* buf); - void rondb_get_command(Ndb *ndb, const pink::RedisCmdArgsType &argv, std::string *response); diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index a5e2917d..c68f4e13 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -18,24 +18,7 @@ NdbRecord *entire_key_record = nullptr; NdbRecord *pk_value_record = nullptr; NdbRecord *entire_value_record = nullptr; -void set_length(char *buf, Uint32 key_len) -{ - Uint8 *ptr = (Uint8 *)buf; - ptr[0] = (Uint8)(key_len & 255); - ptr[1] = (Uint8)(key_len >> 8); -} - -Uint32 get_length(char *buf) -{ - Uint8 *ptr = (Uint8 *)buf; - Uint8 low = ptr[0]; - Uint8 high = ptr[1]; - Uint32 len32 = Uint32(low) + Uint32(256) * Uint32(high); - return len32; -} - int create_key_row(std::string *response, - Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, Uint64 redis_key_id, @@ -45,37 +28,27 @@ int create_key_row(std::string *response, const char *value_str, Uint32 tot_value_len, Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - NdbOperation *write_op = trans->getNdbOperation(tab); - if (write_op == nullptr) - { - assign_ndb_err_to_response(response, - FAILED_GET_OP, - trans->getNdbError()); - return -1; - } - write_op->writeTuple(); - write_data_to_key_op(write_op, - redis_key_id, - rondb_key, - key_str, - key_len, - value_str, - tot_value_len, - num_value_rows, - row_state, - buf); - if (write_op->getNdbError().code != 0) - { - assign_ndb_err_to_response(response, - FAILED_DEFINE_OP, - write_op->getNdbError()); - return -1; - } - int ret_code = 0; - if (num_value_rows == 0) + Uint32 &prev_num_rows, + Uint32 row_state) { + const NdbOperation *write_op = nullptr; + NdbRecAttr *recAttr = nullptr; + int ret_code = write_data_to_key_op(response, + &write_op, + tab, + trans, + redis_key_id, + rondb_key, + key_str, + key_len, + value_str, + tot_value_len, + num_value_rows, + row_state, + &recAttr); + if (ret_code != 0) { + return ret_code; + } + if (num_value_rows == 0 && prev_num_rows) { if (trans->execute(NdbTransaction::Commit, NdbOperation::AbortOnError) == 0 && @@ -90,6 +63,7 @@ int create_key_row(std::string *response, NdbOperation::AbortOnError) == 0 && trans->getNdbError().code == 0) { + prev_num_rows = recAttr->u_32_value(); return 0; } } @@ -103,45 +77,128 @@ int create_key_row(std::string *response, return trans->getNdbError().code; } -int delete_and_insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 redis_key_id, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - if (delete_key_row(response, - ndb, - tab, - trans, - redis_key_id, - key_str, - key_len, - buf) != 0) +int write_data_to_key_op(std::string *response, + const NdbOperation **ndb_op, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 redis_key_id, + Uint64 rondb_key, + const char *key_str, + Uint32 key_len, + const char *value_str, + Uint32 tot_value_len, + Uint32 num_value_rows, + Uint32 row_state, + NdbRecAttr **recAttr) { + struct key_table key_row; + const Uint32 mask = 0xEF; + const unsigned char *mask_ptr = (const unsigned char *)&mask; + key_row.null_bits = 0; + memcpy(&key_row.redis_key[2], key_str, key_len); + set_length(&key_row.redis_key[0], key_len); + key_row.redis_key_id = redis_key_id; + if (rondb_key == 0) + { + key_row.null_bits = 1; + } + else + { + key_row.rondb_key = rondb_key; + } + key_row.tot_value_len = tot_value_len; + key_row.num_rows = num_value_rows; + key_row.value_data_type = row_state; + key_row.expiry_date = 0; + + Uint32 this_value_len = tot_value_len; + if (this_value_len > INLINE_VALUE_LEN) + { + this_value_len = INLINE_VALUE_LEN; + } + memcpy(&key_row.value_start[2], value_str, this_value_len); + set_length(&key_row.value_start[0], this_value_len); + + Uint32 code_buffer[64]; + NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); + int ret_code = write_key_row(response, code, tab); + if (ret_code != 0) { + return ret_code; + } + + // Prepare the interpreted program to be part of the write + NdbOperation::OperationOptions opts; + std::memset(&opts, 0, sizeof(opts)); + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT; + opts.interpretedCode = &code; + + NdbOperation::GetValueSpec getvals[1]; + getvals[0].appStorage = nullptr; + getvals[0].recAttr = nullptr; + getvals[0].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE; + opts.numExtraGetFinalValues = 1; + opts.extraGetFinalValues = getvals; + + /* Define the actual operation to be sent to RonDB data node. */ + const NdbOperation *op = trans->writeTuple( + pk_key_record, + (const char *)&key_row, + entire_key_record, + (char *)&key_row, + mask_ptr, + &opts, + sizeof(opts)); + if (op == nullptr) { + assign_ndb_err_to_response(response, + "Failed to create NdbOperation", + trans->getNdbError()); return -1; } + *ndb_op = op; + *recAttr = getvals[0].recAttr; + return 0; +} - return insert_key_row(response, - ndb, - tab, - trans, - redis_key_id, - rondb_key, - key_str, - key_len, - value_str, - tot_value_len, - num_value_rows, - row_state, - buf); +int delete_value_rows(std::string *response, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 rondb_key, + Uint32 start_ordinal, + Uint32 end_ordinal) { + for (Uint32 i = start_ordinal; i < end_ordinal; i++) { + NdbOperation *del_op = trans->getNdbOperation(tab); + if (del_op == nullptr) + { + assign_ndb_err_to_response(response, + FAILED_GET_OP, + trans->getNdbError()); + return -1; + } + del_op->deleteTuple(); + del_op->equal(VALUE_TABLE_COL_rondb_key, rondb_key); + if (del_op->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, + FAILED_DEFINE_OP, + del_op->getNdbError()); + return -1; + } + } + if (start_ordinal >= end_ordinal) { + return 0; + } + if (trans->execute(NdbTransaction::NoCommit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, + FAILED_EXEC_TXN, + trans->getNdbError()); + return -1; + } + return 0; } int delete_key_row(std::string *response, @@ -151,8 +208,7 @@ int delete_key_row(std::string *response, Uint64 redis_key_id, const char *key_str, Uint32 key_len, - char *buf) -{ + char *buf) { NdbOperation *del_op = trans->getNdbOperation(tab); if (del_op == nullptr) { @@ -187,109 +243,6 @@ int delete_key_row(std::string *response, return 0; } -int insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 redis_key_id, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - NdbOperation *insert_op = trans->getNdbOperation(tab); - if (insert_op == nullptr) - { - assign_ndb_err_to_response(response, - FAILED_GET_OP, - trans->getNdbError()); - return -1; - } - insert_op->insertTuple(); - write_data_to_key_op(insert_op, - redis_key_id, - rondb_key, - key_str, - key_len, - value_str, - tot_value_len, - num_value_rows, - row_state, - buf); - if (insert_op->getNdbError().code != 0) - { - assign_ndb_err_to_response(response, - FAILED_DEFINE_OP, - insert_op->getNdbError()); - return -1; - } - if (num_value_rows == 0) - { - if (trans->execute(NdbTransaction::Commit, - NdbOperation::AbortOnError) == 0 && - trans->getNdbError().code == 0) - { - return 0; - } - } - else - { - if (trans->execute(NdbTransaction::NoCommit, - NdbOperation::AbortOnError) == 0 && - trans->getNdbError().code == 0) - { - return 0; - } - } - assign_ndb_err_to_response(response, - FAILED_EXEC_TXN, - trans->getNdbError()); - return -1; -} - -void write_data_to_key_op(NdbOperation *ndb_op, - Uint64 redis_key_id, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf) -{ - memcpy(&buf[2], key_str, key_len); - set_length(buf, key_len); - ndb_op->equal(KEY_TABLE_COL_redis_key, buf); - ndb_op->equal(KEY_TABLE_COL_redis_key_id, redis_key_id); - - if (rondb_key == 0) - { - ndb_op->setValue(KEY_TABLE_COL_rondb_key, (char *)NULL); - } - else - { - ndb_op->setValue(KEY_TABLE_COL_rondb_key, rondb_key); - } - ndb_op->setValue(KEY_TABLE_COL_tot_value_len, tot_value_len); - ndb_op->setValue(KEY_TABLE_COL_num_rows, num_value_rows); - ndb_op->setValue(KEY_TABLE_COL_value_data_type, row_state); - ndb_op->setValue(KEY_TABLE_COL_expiry_date, 0); - - Uint32 this_value_len = tot_value_len; - if (this_value_len > INLINE_VALUE_LEN) - { - this_value_len = INLINE_VALUE_LEN; - } - memcpy(&buf[2], value_str, this_value_len); - set_length(buf, this_value_len); - ndb_op->setValue(KEY_TABLE_COL_value_start, buf); -} - int create_value_row(std::string *response, Ndb *ndb, const NdbDictionary::Dictionary *dict, @@ -298,8 +251,7 @@ int create_value_row(std::string *response, Uint64 rondb_key, Uint32 this_value_len, Uint32 ordinal, - char *buf) -{ + char *buf) { const NdbDictionary::Table *tab = dict->getTable(VALUE_TABLE_NAME); if (tab == nullptr) { @@ -316,18 +268,16 @@ int create_value_row(std::string *response, trans->getNdbError()); return -1; } - op->insertTuple(); + op->writeTuple(); op->equal(VALUE_TABLE_COL_rondb_key, rondb_key); op->equal(VALUE_TABLE_COL_ordinal, ordinal); memcpy(&buf[2], start_value_ptr, this_value_len); set_length(buf, this_value_len); op->setValue(VALUE_TABLE_COL_value, buf); + if (op->getNdbError().code != 0) { - if (op->getNdbError().code != 0) - { - assign_ndb_err_to_response(response, FAILED_DEFINE_OP, op->getNdbError()); - return -1; - } + assign_ndb_err_to_response(response, FAILED_DEFINE_OP, op->getNdbError()); + return -1; } return 0; } @@ -340,8 +290,7 @@ int create_all_value_rows(std::string *response, const char *value_str, Uint32 value_len, Uint32 num_value_rows, - char *buf) -{ + char *buf) { Uint32 remaining_len = value_len - INLINE_VALUE_LEN; const char *start_value_ptr = &value_str[INLINE_VALUE_LEN]; for (Uint32 ordinal = 0; ordinal < num_value_rows; ordinal++) @@ -365,17 +314,17 @@ int create_all_value_rows(std::string *response, } remaining_len -= this_value_len; start_value_ptr += this_value_len; + if (ordinal == (num_value_rows - 1) || + ordinal % MAX_VALUES_TO_WRITE == (MAX_VALUES_TO_WRITE - 1)) { + if (trans->execute(NdbTransaction::NoCommit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError()); + return -1; + } + } } - - if (trans->execute(NdbTransaction::Commit, - NdbOperation::AbortOnError) != 0 || - trans->getNdbError().code != 0) - { - assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError()); - return -1; - } - - response->append("+OK\r\n"); return 0; } @@ -383,8 +332,7 @@ int get_simple_key_row(std::string *response, const NdbDictionary::Table *tab, Ndb *ndb, NdbTransaction *trans, - struct key_table *key_row) -{ + struct key_table *key_row) { /** * Mask and options means simply reading all columns * except primary key columns. @@ -450,8 +398,7 @@ int get_value_rows(std::string *response, NdbTransaction *trans, const Uint32 num_rows, const Uint64 rondb_key, - const Uint32 tot_value_len) -{ + const Uint32 tot_value_len) { const NdbDictionary::Table *tab = dict->getTable(VALUE_TABLE_NAME); if (tab == nullptr) { @@ -469,7 +416,8 @@ int get_value_rows(std::string *response, Uint32 num_rows_to_read = std::min(ROWS_PER_READ, num_rows - start_ordinal); bool is_last_batch = (batch == (num_read_batches - 1)); - NdbTransaction::ExecType commit_type = is_last_batch ? NdbTransaction::Commit : NdbTransaction::NoCommit; + NdbTransaction::ExecType commit_type = is_last_batch ? + NdbTransaction::Commit : NdbTransaction::NoCommit; if (read_batched_value_rows(response, trans, @@ -490,8 +438,7 @@ int read_batched_value_rows(std::string *response, const Uint64 rondb_key, const Uint32 num_rows_to_read, const Uint32 start_ordinal, - const NdbTransaction::ExecType commit_type) -{ + const NdbTransaction::ExecType commit_type) { struct value_table value_rows[ROWS_PER_READ]; Uint32 ordinal = start_ordinal; @@ -539,8 +486,7 @@ int get_complex_key_row(std::string *response, const NdbDictionary::Table *tab, Ndb *ndb, NdbTransaction *trans, - struct key_table *key_row) -{ + struct key_table *key_row) { /** * Since a simple read using CommittedRead we will go back to * the safe method where we first read with lock the key row @@ -610,8 +556,7 @@ int get_complex_key_row(std::string *response, int rondb_get_rondb_key(const NdbDictionary::Table *tab, Uint64 &rondb_key, Ndb *ndb, - std::string *response) -{ + std::string *response) { if (ndb->getAutoIncrementValue(tab, rondb_key, unsigned(1024)) != 0) { assign_ndb_err_to_response(response, @@ -626,8 +571,7 @@ void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, - struct key_table *key_row) -{ + struct key_table *key_row) { /** * The mask specifies which columns is to be updated after the interpreter * has finished. The values are set in the key_row. @@ -728,8 +672,7 @@ static int get_unique_redis_key_id(const NdbDictionary::Table *tab, Ndb *ndb, Uint64 &redis_key_id, - std::string *response) -{ + std::string *response) { if (ndb->getAutoIncrementValue(tab, redis_key_id, unsigned(1024)) != 0) { diff --git a/pink/rondis/string/db_operations.h b/pink/rondis/string/db_operations.h index 9c527fb4..d69c584b 100644 --- a/pink/rondis/string/db_operations.h +++ b/pink/rondis/string/db_operations.h @@ -11,7 +11,6 @@ const Uint32 ROWS_PER_READ = 2; int create_key_row(std::string *response, - Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, Uint64 redis_key_id, @@ -21,33 +20,22 @@ int create_key_row(std::string *response, const char *value_str, Uint32 tot_value_len, Uint32 num_value_rows, - Uint32 row_state, - char *buf); - -void write_data_to_key_op(NdbOperation *ndb_op, - Uint64 redis_key_id, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf); - -int delete_and_insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 redis_key_id, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf); + Uint32 &prev_num_rows, + Uint32 row_state); + +int write_data_to_key_op(std::string *response, + const NdbOperation **ndb_op, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 redis_key_id, + Uint64 rondb_key, + const char *key_str, + Uint32 key_len, + const char *value_str, + Uint32 tot_value_len, + Uint32 num_value_rows, + Uint32 row_state, + NdbRecAttr **recAttr); int delete_key_row(std::string *response, Ndb *ndb, @@ -58,20 +46,6 @@ int delete_key_row(std::string *response, Uint32 key_len, char *buf); -int insert_key_row(std::string *response, - Ndb *ndb, - const NdbDictionary::Table *tab, - NdbTransaction *trans, - Uint64 redis_key_id, - Uint64 rondb_key, - const char *key_str, - Uint32 key_len, - const char *value_str, - Uint32 tot_value_len, - Uint32 num_value_rows, - Uint32 row_state, - char *buf); - int create_value_row(std::string *response, Ndb *ndb, const NdbDictionary::Dictionary *dict, @@ -92,6 +66,12 @@ int create_all_value_rows(std::string *response, Uint32 num_value_rows, char *buf); +int delete_value_rows(std::string *response, + const NdbDictionary::Table *tab, + NdbTransaction *trans, + Uint64 rondb_key, + Uint32 start_ordinal, + Uint32 end_ordinal); /* Since the beginning of the value is saved within the key table, it can suffice to read the key table to get the value. If the value is diff --git a/pink/rondis/string/interpreted_code.cc b/pink/rondis/string/interpreted_code.cc index 24064500..32c60360 100644 --- a/pink/rondis/string/interpreted_code.cc +++ b/pink/rondis/string/interpreted_code.cc @@ -175,3 +175,33 @@ int write_hset_key_table(Ndb *ndb, ndb->closeTransaction(trans); return 0; } + + +int write_key_row(std::string *response, + NdbInterpretedCode &code, + const NdbDictionary::Table *tab) { + const NdbDictionary::Column *num_rows_col = tab->getColumn(KEY_TABLE_COL_num_rows); + code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); // Inserts go to label 0 + /* UPDATE */ + code.read_attr(REG7, num_rows_col); + code.write_interpreter_output(REG7, OUTPUT_INDEX); // Write into output index 0 + code.interpret_exit_ok(); + + /* INSERT */ + code.def_label(LABEL0); + code.load_const_u16(REG7, 0); + code.write_interpreter_output(REG7, OUTPUT_INDEX); // Write into output index 0 + code.interpret_exit_ok(); + + // Program end, now compile code + int ret_code = code.finalise(); + if (ret_code != 0) + { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code.getNdbError()); + return -1; + } + return 0; +} diff --git a/pink/rondis/string/interpreted_code.h b/pink/rondis/string/interpreted_code.h index 460cefb2..62357e9e 100644 --- a/pink/rondis/string/interpreted_code.h +++ b/pink/rondis/string/interpreted_code.h @@ -38,4 +38,8 @@ int write_hset_key_table(Ndb *ndb, std::string std_key_str, Uint64 & redis_key_id, std::string *response); +int write_key_row(std::string *response, + NdbInterpretedCode &code, + const NdbDictionary::Table *tab); + #endif diff --git a/pink/rondis/string/table_definitions.h b/pink/rondis/string/table_definitions.h index fe0477b8..d9fa542a 100644 --- a/pink/rondis/string/table_definitions.h +++ b/pink/rondis/string/table_definitions.h @@ -10,6 +10,7 @@ - one NdbRecord defining the columns we want to fetch */ +#define MAX_VALUES_TO_WRITE 4 #define MAX_KEY_VALUE_LEN 3000 #define STRING_REDIS_KEY_ID 0