diff --git a/pink/rondis/Makefile b/pink/rondis/Makefile index 64fbccca..43fb315d 100644 --- a/pink/rondis/Makefile +++ b/pink/rondis/Makefile @@ -14,10 +14,7 @@ endif all: rondis -ifndef PINK_PATH - $(warning Warning: missing pink path, using default) - PINK_PATH=$(CURDIR)/../.. -endif +PINK_PATH=$(CURDIR)/../.. PINK_INCLUDE_DIR=$(PINK_PATH) PINK_LIBRARY=$(PINK_PATH)/pink/lib/libpink.a diff --git a/pink/rondis/common.h b/pink/rondis/common.h index 64c505cc..d03759d3 100644 --- a/pink/rondis/common.h +++ b/pink/rondis/common.h @@ -23,6 +23,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str); #define FAILED_CREATE_TXN_OBJECT "Failed to create transaction object" #define FAILED_EXEC_TXN "Failed to execute transaction" #define FAILED_READ_KEY "Failed to read key" +#define FAILED_INCR_KEY "Failed to increment key" #define FAILED_GET_OP "Failed to get NdbOperation object" #define FAILED_DEFINE_OP "Failed to define RonDB operation" diff --git a/pink/rondis/string/db_operations.cc b/pink/rondis/string/db_operations.cc index f4615866..a7b00e7c 100644 --- a/pink/rondis/string/db_operations.cc +++ b/pink/rondis/string/db_operations.cc @@ -615,10 +615,163 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab, return 0; } +#define RONDB_INSERT 2 +#define RONDB_UPDATE 1 +#define REG0 0 +#define REG1 1 +#define REG2 2 +#define REG3 3 +#define REG4 4 +#define REG5 5 +#define REG6 6 +#define REG7 7 +#define LABEL0 0 void incr_key_row(std::string *response, Ndb *ndb, const NdbDictionary::Table *tab, NdbTransaction *trans, struct key_table *key_row) { - return; + + const NdbDictionary::Column *value_start_col = tab->getColumn("value_start"); + const NdbDictionary::Column *tot_value_len_col = tab->getColumn("tot_value_len"); + + NdbOperation::OperationOptions opts; + std::memset(&opts, 0, sizeof(opts)); + /** + * The mask specifies which columns is to be updated after the interpreter + * has finished. The values are set in the key_row. + * We have 7 columns, we will update tot_value_len in interpreter, same with + * value_start. + * + * The rest, redis_key, rondb_key, value_data_type, num_rows and expiry_date + * are updated through final update. + */ + + const Uint32 mask = 0x57; + const unsigned char *mask_ptr = (const unsigned char *)&mask; + + // redis_key already set as this is the Primary key + key_row->null_bits = 1; // Set rondb_key to NULL, first NULL column + key_row->num_rows = 0; + key_row->value_data_type = 0; + key_row->expiry_date = 0; + + /* Define the interpreted program */ + Uint32 code_buffer[128]; + NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer)); + code.load_const_u16(REG0, 4); //Memory offset 0 + code.load_const_u16(REG6, 0); //Memory offset 0 + int ret_code = code.load_op_type(REG1); // Read operation type into register 1 + code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); //Inserts go to label 1 + + /** + * The first 4 bytes of the memory must be kept for the Attribute header + * REG0 Memory offset == 4 + * REG1 Memory offset == 6 + * REG2 Size of value_start + * REG3 Size of value_start without length bytes + * REG4 Old integer value after conversion + * REG5 New integer value after increment + * REG6 Memory offset == 0 + * REG7 not used + */ + /* UPDATE code */ + code.read_full(value_start_col, REG6, REG2); // Read value_start column + code.load_const_u16(REG1, 6);//Memory offset 2 + code.sub_const_reg(REG3, REG2, 2);//Subtract 2 from length + code.str_to_int64(REG4, REG1, REG3);//Convert string to number into register 6 + code.add_const_reg(REG5, REG4, 1); //New integer value in register 6 + code.int64_to_str(REG3, REG1, REG5);//Convert to string + code.add_const_reg(REG2, REG3, 2); //New value_start length + code.convert_size(REG3, REG0); //Write back length bytes in memory + + code.write_interpreter_output(REG5, 0); //Write into output index 0 + code.write_from_mem(value_start_col, REG6, REG2); // Write to column + code.write_attr(tot_value_len_col, REG3); + code.interpret_exit_ok(); + + /* INSERT code */ + code.def_label(LABEL0); + code.load_const_u16(REG5, 1); + code.load_const_u16(REG3, 1); + code.write_interpreter_output(REG5, 0); //Write into output index 0 + + Uint32 insert_value; + Uint8 *insert_value_ptr = (Uint8*)&insert_value; + insert_value_ptr[0] = 1; // Length is 1 + insert_value_ptr[1] = 0; // Second length byte is 0 + insert_value_ptr[2] = '1'; //Inserts a string '1' + insert_value_ptr[3] = 0; + + code.load_const_mem(REG0, REG2, 3, &insert_value);// Load to memory + code.write_from_mem(value_start_col, REG6, REG2); // Write to column + code.write_attr(tot_value_len_col, REG3); + code.interpret_exit_ok(); + + /* Program end, now compile code */ + ret_code = code.finalise(); + if (ret_code != 0) { + assign_ndb_err_to_response(response, + "Failed to create Interpreted code", + code.getNdbError()); + return; + } + + /* Prepare the interpreted program to be part of the write */ + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT; + opts.interpretedCode = &code; + + /** + * Prepare to get the final value of the Redis row after INCR is finished + * This is performed by the reading the pseudo column that is reading the + * output index written in interpreter program. + */ + NdbOperation::GetValueSpec getvals[1]; + getvals[0].appStorage = nullptr; + getvals[0].recAttr = nullptr; + getvals[0].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0; + opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE; + opts.numExtraGetFinalValues = 1; + opts.extraGetFinalValues = getvals; + + /* Define the actual operation to be sent to RonDB data node. */ + const NdbOperation *op = trans->writeTuple( + pk_key_record, + (const char*)key_row, + entire_key_record, + (char*)key_row, + mask_ptr, + &opts, + sizeof(opts)); + if (op == nullptr) { + assign_ndb_err_to_response(response, + "Failed to create NdbOperation", + trans->getNdbError()); + return; + } + + /* Send to RonDB and execute the INCR operation */ + if (trans->execute(NdbTransaction::Commit, + NdbOperation::AbortOnError) != 0 || + trans->getNdbError().code != 0) + { + assign_ndb_err_to_response(response, + FAILED_INCR_KEY, + trans->getNdbError()); + return; + } + + /* Retrieve the returned new value as an Int64 value */ + NdbRecAttr *recAttr = getvals[0].recAttr; + Int64 new_incremented_value = recAttr->int64_value(); + + /* Send the return message to Redis client */ + char header_buf[20]; + int header_len = write_formatted(header_buf, + sizeof(header_buf), + ":%lld\r\n", + new_incremented_value); + response->assign(header_buf); + return; } diff --git a/pink/rondis/string/table_definitions.cc b/pink/rondis/string/table_definitions.cc index 6759abfb..20345363 100644 --- a/pink/rondis/string/table_definitions.cc +++ b/pink/rondis/string/table_definitions.cc @@ -55,8 +55,7 @@ int init_key_records(NdbDictionary::Dictionary *dict) } std::map> read_all_column_map = { - // TODO: Fix this one - // {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, + {redis_key_col, {offsetof(struct key_table, redis_key), 0}}, {rondb_key_col, {offsetof(struct key_table, rondb_key), 0}}, {expiry_date_col, {offsetof(struct key_table, expiry_date), 1}}, {value_start_col, {offsetof(struct key_table, value_start), 0}}, diff --git a/pink/rondis/string/table_definitions.h b/pink/rondis/string/table_definitions.h index 357d0348..60280c92 100644 --- a/pink/rondis/string/table_definitions.h +++ b/pink/rondis/string/table_definitions.h @@ -37,6 +37,7 @@ extern NdbRecord *entire_key_record; struct key_table { + Uint32 null_bits; char redis_key[MAX_KEY_VALUE_LEN + 2]; Uint64 rondb_key; Uint32 expiry_date;