Skip to content

Commit

Permalink
RONDB-761: First functional INCR command
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaelronstrom committed Nov 6, 2024
1 parent e0f9c05 commit a265c5d
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 7 deletions.
5 changes: 1 addition & 4 deletions pink/rondis/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ endif

all: rondis

ifndef PINK_PATH
$(warning Warning: missing pink path, using default)
PINK_PATH=$(CURDIR)/../..
endif
PINK_PATH=$(CURDIR)/../..
PINK_INCLUDE_DIR=$(PINK_PATH)
PINK_LIBRARY=$(PINK_PATH)/pink/lib/libpink.a

Expand Down
1 change: 1 addition & 0 deletions pink/rondis/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str);
#define FAILED_CREATE_TXN_OBJECT "Failed to create transaction object"
#define FAILED_EXEC_TXN "Failed to execute transaction"
#define FAILED_READ_KEY "Failed to read key"
#define FAILED_INCR_KEY "Failed to increment key"
#define FAILED_GET_OP "Failed to get NdbOperation object"
#define FAILED_DEFINE_OP "Failed to define RonDB operation"

Expand Down
155 changes: 154 additions & 1 deletion pink/rondis/string/db_operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,163 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab,
return 0;
}

#define RONDB_INSERT 2
#define RONDB_UPDATE 1
#define REG0 0
#define REG1 1
#define REG2 2
#define REG3 3
#define REG4 4
#define REG5 5
#define REG6 6
#define REG7 7
#define LABEL0 0
void incr_key_row(std::string *response,
Ndb *ndb,
const NdbDictionary::Table *tab,
NdbTransaction *trans,
struct key_table *key_row) {
return;

const NdbDictionary::Column *value_start_col = tab->getColumn("value_start");
const NdbDictionary::Column *tot_value_len_col = tab->getColumn("tot_value_len");

NdbOperation::OperationOptions opts;
std::memset(&opts, 0, sizeof(opts));
/**
* The mask specifies which columns is to be updated after the interpreter
* has finished. The values are set in the key_row.
* We have 7 columns, we will update tot_value_len in interpreter, same with
* value_start.
*
* The rest, redis_key, rondb_key, value_data_type, num_rows and expiry_date
* are updated through final update.
*/

const Uint32 mask = 0x57;
const unsigned char *mask_ptr = (const unsigned char *)&mask;

// redis_key already set as this is the Primary key
key_row->null_bits = 1; // Set rondb_key to NULL, first NULL column
key_row->num_rows = 0;
key_row->value_data_type = 0;
key_row->expiry_date = 0;

/* Define the interpreted program */
Uint32 code_buffer[128];
NdbInterpretedCode code(tab, &code_buffer[0], sizeof(code_buffer));
code.load_const_u16(REG0, 4); //Memory offset 0
code.load_const_u16(REG6, 0); //Memory offset 0
int ret_code = code.load_op_type(REG1); // Read operation type into register 1
code.branch_eq_const(REG1, RONDB_INSERT, LABEL0); //Inserts go to label 1

/**
* The first 4 bytes of the memory must be kept for the Attribute header
* REG0 Memory offset == 4
* REG1 Memory offset == 6
* REG2 Size of value_start
* REG3 Size of value_start without length bytes
* REG4 Old integer value after conversion
* REG5 New integer value after increment
* REG6 Memory offset == 0
* REG7 not used
*/
/* UPDATE code */
code.read_full(value_start_col, REG6, REG2); // Read value_start column
code.load_const_u16(REG1, 6);//Memory offset 2
code.sub_const_reg(REG3, REG2, 2);//Subtract 2 from length
code.str_to_int64(REG4, REG1, REG3);//Convert string to number into register 6
code.add_const_reg(REG5, REG4, 1); //New integer value in register 6
code.int64_to_str(REG3, REG1, REG5);//Convert to string
code.add_const_reg(REG2, REG3, 2); //New value_start length
code.convert_size(REG3, REG0); //Write back length bytes in memory

code.write_interpreter_output(REG5, 0); //Write into output index 0
code.write_from_mem(value_start_col, REG6, REG2); // Write to column
code.write_attr(tot_value_len_col, REG3);
code.interpret_exit_ok();

/* INSERT code */
code.def_label(LABEL0);
code.load_const_u16(REG5, 1);
code.load_const_u16(REG3, 1);
code.write_interpreter_output(REG5, 0); //Write into output index 0

Uint32 insert_value;
Uint8 *insert_value_ptr = (Uint8*)&insert_value;
insert_value_ptr[0] = 1; // Length is 1
insert_value_ptr[1] = 0; // Second length byte is 0
insert_value_ptr[2] = '1'; //Inserts a string '1'
insert_value_ptr[3] = 0;

code.load_const_mem(REG0, REG2, 3, &insert_value);// Load to memory
code.write_from_mem(value_start_col, REG6, REG2); // Write to column
code.write_attr(tot_value_len_col, REG3);
code.interpret_exit_ok();

/* Program end, now compile code */
ret_code = code.finalise();
if (ret_code != 0) {
assign_ndb_err_to_response(response,
"Failed to create Interpreted code",
code.getNdbError());
return;
}

/* Prepare the interpreted program to be part of the write */
opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED;
opts.optionsPresent |= NdbOperation::OperationOptions::OO_INTERPRETED_INSERT;
opts.interpretedCode = &code;

/**
* Prepare to get the final value of the Redis row after INCR is finished
* This is performed by the reading the pseudo column that is reading the
* output index written in interpreter program.
*/
NdbOperation::GetValueSpec getvals[1];
getvals[0].appStorage = nullptr;
getvals[0].recAttr = nullptr;
getvals[0].column = NdbDictionary::Column::READ_INTERPRETER_OUTPUT_0;
opts.optionsPresent |= NdbOperation::OperationOptions::OO_GET_FINAL_VALUE;
opts.numExtraGetFinalValues = 1;
opts.extraGetFinalValues = getvals;

/* Define the actual operation to be sent to RonDB data node. */
const NdbOperation *op = trans->writeTuple(
pk_key_record,
(const char*)key_row,
entire_key_record,
(char*)key_row,
mask_ptr,
&opts,
sizeof(opts));
if (op == nullptr) {
assign_ndb_err_to_response(response,
"Failed to create NdbOperation",
trans->getNdbError());
return;
}

/* Send to RonDB and execute the INCR operation */
if (trans->execute(NdbTransaction::Commit,
NdbOperation::AbortOnError) != 0 ||
trans->getNdbError().code != 0)
{
assign_ndb_err_to_response(response,
FAILED_INCR_KEY,
trans->getNdbError());
return;
}

/* Retrieve the returned new value as an Int64 value */
NdbRecAttr *recAttr = getvals[0].recAttr;
Int64 new_incremented_value = recAttr->int64_value();

/* Send the return message to Redis client */
char header_buf[20];
int header_len = write_formatted(header_buf,
sizeof(header_buf),
":%lld\r\n",
new_incremented_value);
response->assign(header_buf);
return;
}
3 changes: 1 addition & 2 deletions pink/rondis/string/table_definitions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ int init_key_records(NdbDictionary::Dictionary *dict)
}

std::map<const NdbDictionary::Column *, std::pair<size_t, int>> read_all_column_map = {
// TODO: Fix this one
// {redis_key_col, {offsetof(struct key_table, redis_key), 0}},
{redis_key_col, {offsetof(struct key_table, redis_key), 0}},
{rondb_key_col, {offsetof(struct key_table, rondb_key), 0}},
{expiry_date_col, {offsetof(struct key_table, expiry_date), 1}},
{value_start_col, {offsetof(struct key_table, value_start), 0}},
Expand Down
1 change: 1 addition & 0 deletions pink/rondis/string/table_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extern NdbRecord *entire_key_record;

struct key_table
{
Uint32 null_bits;
char redis_key[MAX_KEY_VALUE_LEN + 2];
Uint64 rondb_key;
Uint32 expiry_date;
Expand Down

0 comments on commit a265c5d

Please sign in to comment.