Skip to content

Commit

Permalink
RONDB-768: HSET, HGET, HINCR now passes test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaelronstrom committed Nov 11, 2024
1 parent 5e1345d commit 6e2f0b5
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 132 deletions.
1 change: 1 addition & 0 deletions pink/rondis/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str);
#define FAILED_EXEC_TXN "Failed to execute transaction"
#define FAILED_READ_KEY "Failed to read key"
#define FAILED_INCR_KEY "Failed to increment key"
#define FAILED_HSET_KEY "Failed to find key"
#define FAILED_INCR_KEY_MULTI_ROW "Failed to increment key, multi-row value"
#define FAILED_GET_OP "Failed to get NdbOperation object"
#define FAILED_DEFINE_OP "Failed to define RonDB operation"
Expand Down
42 changes: 41 additions & 1 deletion pink/rondis/rondb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,45 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv,
assign_generic_err_to_response(response, error_message);
}
}
else if (strcasecmp(command, "HGET") == 0)
{
if (argv.size() == 3)
{
rondb_hget_command(ndb, argv, response);
}
else
{
char error_message[256];
snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str());
assign_generic_err_to_response(response, error_message);
}
}
else if (strcasecmp(command, "HSET") == 0)
{
if (argv.size() == 4)
{
rondb_hset_command(ndb, argv, response);
}
else
{
char error_message[256];
snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str());
assign_generic_err_to_response(response, error_message);
}
}
else if (strcasecmp(command, "HINCR") == 0)
{
if (argv.size() == 3)
{
rondb_hincr_command(ndb, argv, response);
}
else
{
char error_message[256];
snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str());
assign_generic_err_to_response(response, error_message);
}
}
else
{
printf("Unsupported command: ");
Expand All @@ -171,7 +210,8 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv,
If this limit is reached, the Ndb object will not create any new ones.
Hence, better to catch these cases early.
*/
print_args(argv);
printf("Failed to stop transaction\n");
//print_args(argv);
printf("Number of transactions started: %lld\n", ndb->getClientStat(ndb->TransStartCount));
printf("Number of transactions closed: %lld\n", ndb->getClientStat(ndb->TransCloseCount));
exit(1);
Expand Down
5 changes: 3 additions & 2 deletions pink/rondis/sql/STRING_key.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE redis.string_keys(
CREATE TABLE string_keys(
-- Redis actually supports a max key size of 512MiB,
-- but we choose not to support that here
redis_key_id BIGINT UNSIGNED NOT NULL,
redis_key VARBINARY(3000) NOT NULL,
-- This is to save space when referencing the key in the value table
rondb_key BIGINT UNSIGNED AUTO_INCREMENT NULL,
Expand All @@ -17,7 +18,7 @@ CREATE TABLE redis.string_keys(
expiry_date INT UNSIGNED,
-- Easier to sort and delete keys this way
KEY expiry_index(expiry_date),
PRIMARY KEY (redis_key) USING HASH,
PRIMARY KEY (redis_key_id, redis_key) USING HASH,
UNIQUE KEY (rondb_key) USING HASH
) ENGINE NDB -- Each CHAR will use 1 byte
CHARSET = latin1 COMMENT = "NDB_TABLE=PARTITION_BALANCE=FOR_RP_BY_LDM_X_8";
6 changes: 3 additions & 3 deletions pink/rondis/sql/STRING_value.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
CREATE TABLE redis.string_values(
CREATE TABLE string_values(
rondb_key BIGINT UNSIGNED NOT NULL,
ordinal INT UNSIGNED NOT NULL,
value VARBINARY(29500) NOT NULL,
PRIMARY KEY (rondb_key, ordinal),
FOREIGN KEY (rondb_key) REFERENCES redis.string_keys(rondb_key) ON UPDATE RESTRICT ON DELETE CASCADE
FOREIGN KEY (rondb_key) REFERENCES string_keys(rondb_key) ON UPDATE RESTRICT ON DELETE CASCADE
) ENGINE NDB,
COMMENT = "NDB_TABLE=PARTITION_BALANCE=RP_BY_LDM_X_8" PARTITION BY KEY (rondb_key);
COMMENT = "NDB_TABLE=PARTITION_BALANCE=RP_BY_LDM_X_8" PARTITION BY KEY (rondb_key);
135 changes: 84 additions & 51 deletions pink/rondis/string/commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

bool setup_transaction(
Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response,
Uint64 redis_key_id,
struct key_table *key_row,
Expand Down Expand Up @@ -78,14 +77,14 @@ void rondb_get(Ndb *ndb,
std::string *response,
Uint64 redis_key_id)
{
Uint32 arg_index_start = (redis_key_id == STRING_REDIS_KEY_ID) ? 1 : 2;
const NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *tab = nullptr;
NdbTransaction *trans = nullptr;
struct key_table key_row;
const char *key_str = argv[1].c_str();
Uint32 key_len = argv[1].size();
const char *key_str = argv[arg_index_start].c_str();
Uint32 key_len = argv[arg_index_start].size();
if (!setup_transaction(ndb,
argv,
response,
redis_key_id,
&key_row,
Expand All @@ -107,16 +106,15 @@ void rondb_get(Ndb *ndb,
{
return;
}
printf("Getting %d value rows\n", key_row.num_rows);
{
/*
Our value uses value rows, so a more complex read is required.
We're starting from scratch here since we'll use a shared lock
on the key table this time we read from it.
*/
NdbTransaction *trans = ndb->startTransaction(tab,
(const char*)&key_row.redis_key_id,
key_len + 10);
trans = ndb->startTransaction(tab,
(const char*)&key_row.redis_key_id,
key_len + 10);
if (trans == nullptr)
{
assign_ndb_err_to_response(response,
Expand All @@ -142,14 +140,14 @@ void rondb_set(
std::string *response,
Uint64 redis_key_id)
{
Uint32 arg_index_start = (redis_key_id == STRING_REDIS_KEY_ID) ? 1 : 2;
const NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *tab = nullptr;
NdbTransaction *trans = nullptr;
struct key_table key_row;
const char *key_str = argv[1].c_str();
Uint32 key_len = argv[1].size();
const char *key_str = argv[arg_index_start].c_str();
Uint32 key_len = argv[arg_index_start].size();
if (!setup_transaction(ndb,
argv,
response,
redis_key_id,
&key_row,
Expand All @@ -160,8 +158,8 @@ void rondb_set(
&trans))
return;

const char *value_str = argv[2].c_str();
Uint32 value_len = argv[2].size();
const char *value_str = argv[arg_index_start + 1].c_str();
Uint32 value_len = argv[arg_index_start + 1].size();
char varsize_param[EXTENSION_VALUE_LEN + 500];
Uint32 num_value_rows = 0;
Uint64 rondb_key = 0;
Expand All @@ -186,6 +184,7 @@ void rondb_set(

if (rondb_get_rondb_key(tab, rondb_key, ndb, response) != 0)
{
ndb->closeTransaction(trans);
return;
}
}
Expand All @@ -212,41 +211,37 @@ void rondb_set(
{
return;
}
else
/*
If we are here, we have tried writing a key that already exists.
This would not be a problem if this key did not have references
to value rows. Hence we first need to delete all of those - this
is best done via a cascade delete. We do a delete & insert in
a single transaction (plus writing the value rows).
*/
trans = ndb->startTransaction(tab,
(const char*)&key_row.redis_key_id,
key_len + 10);
if (trans == nullptr)
{
/*
If we are here, we have tried writing a key that already exists.
This would not be a problem if this key did not have references
to value rows. Hence we first need to delete all of those - this
is best done via a cascade delete. We do a delete & insert in
a single transaction (plus writing the value rows).
*/
NdbTransaction *trans = ndb->startTransaction(tab,
(const char*)&key_row.redis_key_id,
key_len + 10);
if (trans == nullptr)
{
assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError());
return;
}

if (delete_and_insert_key_row(response,
ndb,
tab,
trans,
redis_key_id,
rondb_key,
key_str,
key_len,
value_str,
value_len,
num_value_rows,
Uint32(0),
&varsize_param[0]) != 0)
{
ndb->closeTransaction(trans);
return;
}
assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError());
return;
}
if (delete_and_insert_key_row(response,
ndb,
tab,
trans,
redis_key_id,
rondb_key,
key_str,
key_len,
value_str,
value_len,
num_value_rows,
Uint32(0),
&varsize_param[0]) != 0)
{
ndb->closeTransaction(trans);
return;
}
}

Expand All @@ -256,7 +251,6 @@ void rondb_set(
response->append("+OK\r\n");
return;
}
printf("Inserting %d value rows\n", num_value_rows);
create_all_value_rows(response,
ndb,
dict,
Expand All @@ -277,14 +271,14 @@ void rondb_incr(
std::string *response,
Uint64 redis_key_id)
{
Uint32 arg_index_start = (redis_key_id == STRING_REDIS_KEY_ID) ? 1 : 2;
const NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *tab = nullptr;
NdbTransaction *trans = nullptr;
struct key_table key_row;
const char *key_str = argv[1].c_str();
Uint32 key_len = argv[1].size();
const char *key_str = argv[arg_index_start].c_str();
Uint32 key_len = argv[arg_index_start].size();
if (!setup_transaction(ndb,
argv,
response,
redis_key_id,
&key_row,
Expand Down Expand Up @@ -324,3 +318,42 @@ void rondb_incr_command(Ndb *ndb,
{
return rondb_incr(ndb, argv, response, STRING_REDIS_KEY_ID);
}

void rondb_hget_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response)
{
Uint64 redis_key_id;
int ret_code = rondb_get_redis_key_id(ndb,
redis_key_id,
argv[1].c_str(),
argv[1].size(),
response);
return rondb_get(ndb, argv, response, redis_key_id);
}

void rondb_hset_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response)
{
Uint64 redis_key_id;
int ret_code = rondb_get_redis_key_id(ndb,
redis_key_id,
argv[1].c_str(),
argv[1].size(),
response);
return rondb_set(ndb, argv, response, redis_key_id);
}

void rondb_hincr_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response)
{
Uint64 redis_key_id;
int ret_code = rondb_get_redis_key_id(ndb,
redis_key_id,
argv[1].c_str(),
argv[1].size(),
response);
return rondb_incr(ndb, argv, response, redis_key_id);
}
12 changes: 12 additions & 0 deletions pink/rondis/string/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,16 @@ void rondb_set_command(Ndb *ndb,
void rondb_incr_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response);

void rondb_hget_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response);

void rondb_hset_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response);

void rondb_hincr_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response);
#endif
Loading

0 comments on commit 6e2f0b5

Please sign in to comment.