Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First functional INCR command #20

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
128a6fa
Merge pull request #1 from olapiv/mikaelMultiP
mronstro Nov 4, 2024
d968646
RONDB-765: First step to implementing INCR command
mikaelronstrom Nov 5, 2024
e0f9c05
RONDB-766: Fix compiler warnings
mikaelronstrom Nov 5, 2024
a265c5d
RONDB-761: First functional INCR command
mikaelronstrom Nov 6, 2024
4f768a4
RONDB-761: Improvements to INCR key
mikaelronstrom Nov 7, 2024
bf9c8fb
RONDB-761: Review comments
mikaelronstrom Nov 8, 2024
fa82c40
RONDB-761: Fixed mistake
mikaelronstrom Nov 8, 2024
8ad0fe3
Autoformatting
olapiv Nov 8, 2024
7fffcde
Bumped RonDB version
olapiv Nov 8, 2024
8a8c34e
Added INCR to Bash tests
olapiv Nov 8, 2024
816ebac
Moved interpreted code to separate file
olapiv Nov 8, 2024
67ffec5
Removed redundant headers
olapiv Nov 8, 2024
27fbeda
Merge pull request #2 from olapiv/mikaelFixes
mronstro Nov 8, 2024
a21b3fb
RONDB-761: Fixed breakout of interpreter code, case insensitive comma…
mikaelronstrom Nov 8, 2024
e7f0dea
Slash compiler warnings
olapiv Nov 11, 2024
c8c449d
Slash compiler warnings
olapiv Nov 11, 2024
0660f9d
Added command ECHO
olapiv Nov 11, 2024
2aac8d2
Added piped_key command in tests
olapiv Nov 12, 2024
e3886c7
Redis-benchmark: multiple clients, random keyspace, batched requests
olapiv Nov 12, 2024
b3265ff
Added $KEY to batched test
olapiv Nov 12, 2024
812495d
Added log
olapiv Nov 12, 2024
6b22057
FIX: Set "close_" to false by default
olapiv Nov 12, 2024
b9d77c2
Fix compiler warning
olapiv Nov 12, 2024
b536240
Added CONFIG parameter
olapiv Nov 12, 2024
3585961
Added comments/logs
olapiv Nov 12, 2024
e4be12f
Removed commented-out code
olapiv Nov 12, 2024
eec0e23
Run tests multi-threaded
olapiv Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pink/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ ifeq ($(NO_PB),1)
$(filter-out $(VERSION_CC) $(wildcard $(SRC_DIR)/pb_*), $(wildcard $(SRC_DIR)/*.cc))
endif


ifndef SLASH_PATH
$(warning Warning: missing slash path, using default)
SLASH_PATH=$(CURDIR)/third/slash
endif
SLASH_PATH=$(CURDIR)/third/slash
SLASH_INCLUDE_DIR=$(SLASH_PATH)
SLASH_LIBRARY=$(SLASH_PATH)/slash/lib/libslash.a

Expand Down
6 changes: 3 additions & 3 deletions pink/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class RedisConn: public PinkConn {
const int rbuf_max_len = REDIS_MAX_MESSAGE);
virtual ~RedisConn();

virtual ReadStatus GetRequest();
virtual WriteStatus SendReply();
virtual int WriteResp(const std::string& resp);
virtual ReadStatus GetRequest() override;
virtual WriteStatus SendReply() override;
virtual int WriteResp(const std::string& resp) override;
olapiv marked this conversation as resolved.
Show resolved Hide resolved

void TryResizeBuffer() override;
void SetHandleType(const HandleType& handle_type);
Expand Down
5 changes: 1 addition & 4 deletions pink/rondis/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ endif

all: rondis

ifndef PINK_PATH
$(warning Warning: missing pink path, using default)
PINK_PATH=$(CURDIR)/../..
endif
PINK_PATH=$(CURDIR)/../..
PINK_INCLUDE_DIR=$(PINK_PATH)
PINK_LIBRARY=$(PINK_PATH)/pink/lib/libpink.a

Expand Down
5 changes: 5 additions & 0 deletions pink/rondis/common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/Ndb.hpp>

#ifndef RONDIS_COMMON_H
#define RONDIS_COMMON_H

#define MAX_CONNECTIONS 2

#define REDIS_DB_NAME "redis"
Expand All @@ -20,6 +23,7 @@ void assign_generic_err_to_response(std::string *response, const char *app_str);
#define FAILED_CREATE_TXN_OBJECT "Failed to create transaction object"
#define FAILED_EXEC_TXN "Failed to execute transaction"
#define FAILED_READ_KEY "Failed to read key"
#define FAILED_INCR_KEY "Failed to increment key"
#define FAILED_GET_OP "Failed to get NdbOperation object"
#define FAILED_DEFINE_OP "Failed to define RonDB operation"

Expand All @@ -28,3 +32,4 @@ void assign_generic_err_to_response(std::string *response, const char *app_str);
#define REDIS_WRONG_NUMBER_OF_ARGS "wrong number of arguments for '%s' command"
#define REDIS_NO_SUCH_KEY "$-1\r\n"
#define REDIS_KEY_TOO_LARGE "key is too large (3000 bytes max)"
#endif
13 changes: 13 additions & 0 deletions pink/rondis/rondb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ int rondb_redis_handler(const pink::RedisCmdArgsType &argv,
assign_generic_err_to_response(response, error_message);
}
}
else if (argv[0] == "INCR")
{
if (argv.size() == 2)
{
rondb_incr_command(ndb, argv, response);
}
else
{
char error_message[256];
snprintf(error_message, sizeof(error_message), REDIS_WRONG_NUMBER_OF_ARGS, argv[0].c_str());
assign_generic_err_to_response(response, error_message);
}
}
else
{
printf("Unsupported command: ");
Expand Down
4 changes: 4 additions & 0 deletions pink/rondis/rondb.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/Ndb.hpp>

#ifndef RONDIS_RONDB_H
#define RONDIS_RONDB_H

extern std::vector<Ndb *> ndb_objects;

int initialize_ndb_objects(const char *connect_string, int num_ndb_objects);
Expand All @@ -14,3 +17,4 @@ void rondb_end();
int rondb_redis_handler(const pink::RedisCmdArgsType &argv,
std::string *response,
int fd);
#endif
2 changes: 1 addition & 1 deletion pink/rondis/rondis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static void SignalSetup()
int main(int argc, char *argv[])
{
int port = 6379;
char *connect_string = "localhost:13000";
const char *connect_string = "localhost:13000";
int worker_threads = 2;
if (argc != 4)
{
Expand Down
189 changes: 111 additions & 78 deletions pink/rondis/string/commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,56 @@
#include <ndbapi/Ndb.hpp>

#include "db_operations.h"
#include "commands.h"
#include "../common.h"
#include "table_definitions.h"

bool setup_transaction(
olapiv marked this conversation as resolved.
Show resolved Hide resolved
Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response,
struct key_table *key_row,
const char *key_str,
Uint32 key_len,
const NdbDictionary::Dictionary **ret_dict,
const NdbDictionary::Table **ret_tab,
NdbTransaction **ret_trans)
{
if (key_len > MAX_KEY_VALUE_LEN)
{
assign_generic_err_to_response(response, REDIS_KEY_TOO_LARGE);
return false;
}
const NdbDictionary::Dictionary *dict = ndb->getDictionary();
if (dict == nullptr)
{
assign_ndb_err_to_response(response, FAILED_GET_DICT, ndb->getNdbError());
return false;
}
const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME);
if (tab == nullptr)
{
assign_ndb_err_to_response(response, FAILED_CREATE_TABLE_OBJECT, dict->getNdbError());
return false;
}
memcpy(&key_row->redis_key[2], key_str, key_len);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? Since we should never change the key..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add two rows of NdbRecord, one is input that contains the searched for primary key, the second is the new values to write. We use the same row for both, so setting the primary key is part of the WHERE clause. Also it is used to write the values when performing an INSERT operation (the WRITE is either becoming an UPDATE or an INSERT.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is whether the memory has to be copied or whether one could just reference the pointer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have defined an NdbRecord, this defines the offset from the pointer to the Primary key pointer row that you supply in writeTuple, so if you define the offset as 0 and send in key_str as the Primary key pointer I gather it could work, but for a multi-column PK you need to copy, I don't think this optimisation makes much sense, most keys are short and so not a problem to do a quick memcpy

set_length((char*)&key_row->redis_key[0], key_len);
NdbTransaction *trans = ndb->startTransaction(tab,
&key_row->redis_key[0],
key_len + 2);
if (trans == nullptr)
{
assign_ndb_err_to_response(response,
FAILED_CREATE_TXN_OBJECT,
ndb->getNdbError());
return false;
}
*ret_tab = tab;
*ret_trans = trans;
*ret_dict = dict;
return true;
}

/*
A successful GET will return in this format:
$5
Expand All @@ -27,45 +74,22 @@ void rondb_get_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response)
{
Uint32 key_len = argv[1].size();
if (key_len > MAX_KEY_VALUE_LEN)
{
assign_generic_err_to_response(response, REDIS_KEY_TOO_LARGE);
return;
}
const char *key_str = argv[1].c_str();

const NdbDictionary::Dictionary *dict = ndb->getDictionary();
const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME);
if (tab == nullptr)
{
assign_ndb_err_to_response(response,
FAILED_CREATE_TABLE_OBJECT,
dict->getNdbError());
return;
}

const NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *tab = nullptr;
NdbTransaction *trans = nullptr;
struct key_table key_row;
// varbinary -> first 2 bytes are length if bigger than 255
// start copying from 3rd byte
memcpy(&key_row.redis_key[2], key_str, key_len);
// Length as little endian
Uint8 *ptr = (Uint8 *)&key_row.redis_key[0];
ptr[0] = Uint8(key_len & 255);
ptr[1] = Uint8(key_len >> 8);

// This is (usually) a local operation to calculate the correct data node, using the
// hash of the pk value.
NdbTransaction *trans = ndb->startTransaction(tab,
&(key_row.redis_key[0]),
key_len + 2);
if (trans == nullptr)
{
assign_ndb_err_to_response(response,
FAILED_CREATE_TXN_OBJECT,
ndb->getNdbError());
return;
}
const char *key_str = argv[1].c_str();
Uint32 key_len = argv[1].size();
if (!setup_transaction(ndb,
argv,
response,
&key_row,
key_str,
key_len,
&dict,
&tab,
&trans))
return;

int ret_code = get_simple_key_row(
response,
Expand All @@ -81,22 +105,11 @@ void rondb_get_command(Ndb *ndb,
}
printf("Getting %d value rows\n", key_row.num_rows);
{
// For some reason this needs to be used from scratch
struct key_table key_row;
// varbinary -> first 2 bytes are length if bigger than 255
// start copying from 3rd byte
memcpy(&key_row.redis_key[2], key_str, key_len);
// Length as little endian
Uint8 *ptr = (Uint8 *)&key_row.redis_key[0];
ptr[0] = Uint8(key_len & 255);
ptr[1] = Uint8(key_len >> 8);

/*
Our value uses value rows, so a more complex read is required.
We're starting from scratch here since we'll use a shared lock
on the key table this time we read from it.
*/

NdbTransaction *trans = ndb->startTransaction(tab,
&(key_row.redis_key[0]),
key_len + 2);
Expand Down Expand Up @@ -124,37 +137,25 @@ void rondb_set_command(
const pink::RedisCmdArgsType &argv,
std::string *response)
{
const NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *tab = nullptr;
NdbTransaction *trans = nullptr;
struct key_table key_row;
const char *key_str = argv[1].c_str();
Uint32 key_len = argv[1].size();
if (key_len > MAX_KEY_VALUE_LEN)
{
assign_generic_err_to_response(response, REDIS_KEY_TOO_LARGE);
return;
}
if (!setup_transaction(ndb,
argv,
response,
&key_row,
key_str,
key_len,
&dict,
&tab,
&trans))
return;

const char *key_str = argv[1].c_str();
const char *value_str = argv[2].c_str();
Uint32 value_len = argv[2].size();

const NdbDictionary::Dictionary *dict = ndb->getDictionary();
if (dict == nullptr)
{
assign_ndb_err_to_response(response, FAILED_GET_DICT, ndb->getNdbError());
return;
}
const NdbDictionary::Table *tab = dict->getTable(KEY_TABLE_NAME);
if (tab == nullptr)
{
assign_ndb_err_to_response(response, FAILED_CREATE_TABLE_OBJECT, dict->getNdbError());
return;
}

NdbTransaction *trans = ndb->startTransaction(tab, key_str, key_len);
if (trans == nullptr)
{
assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError());
return;
}

char varsize_param[EXTENSION_VALUE_LEN + 500];
Uint32 num_value_rows = 0;
Uint64 rondb_key = 0;
Expand Down Expand Up @@ -213,7 +214,9 @@ void rondb_set_command(
is best done via a cascade delete. We do a delete & insert in
a single transaction (plus writing the value rows).
*/
trans = ndb->startTransaction(tab, key_str, key_len);
NdbTransaction *trans = ndb->startTransaction(tab,
&(key_row.redis_key[0]),
key_len + 2);
if (trans == nullptr)
{
assign_ndb_err_to_response(response, FAILED_CREATE_TXN_OBJECT, ndb->getNdbError());
Expand Down Expand Up @@ -246,7 +249,6 @@ void rondb_set_command(
return;
}
printf("Inserting %d value rows\n", num_value_rows);

create_all_value_rows(response,
ndb,
dict,
Expand All @@ -259,3 +261,34 @@ void rondb_set_command(
ndb->closeTransaction(trans);
return;
}

void rondb_incr_command(
Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response)
{
const NdbDictionary::Dictionary *dict;
const NdbDictionary::Table *tab = nullptr;
NdbTransaction *trans = nullptr;
struct key_table key_row;
const char *key_str = argv[1].c_str();
Uint32 key_len = argv[1].size();
if (!setup_transaction(ndb,
argv,
response,
&key_row,
key_str,
key_len,
&dict,
&tab,
&trans))
return;

incr_key_row(response,
ndb,
tab,
trans,
&key_row);
ndb->closeTransaction(trans);
return;
}
10 changes: 9 additions & 1 deletion pink/rondis/string/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
#include "pink/include/redis_conn.h"
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/Ndb.hpp>

#include "db_operations.h"

#ifndef STRING_COMMANDS_H
#define STRING_COMMANDS_H
/*
All STRING commands:
https://redis.io/docs/latest/commands/?group=string
Expand All @@ -21,6 +22,8 @@
Most importantly, it writes Ndb error messages to the response string. This may
however change in the future, since this causes redundancy.
*/
void set_length(char* buf, Uint32 key_len);
Uint32 get_length(char* buf);

void rondb_get_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
Expand All @@ -29,3 +32,8 @@ void rondb_get_command(Ndb *ndb,
void rondb_set_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response);

void rondb_incr_command(Ndb *ndb,
const pink::RedisCmdArgsType &argv,
std::string *response);
#endif
Loading
Loading