Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GETting multi-row values #11

Merged
merged 21 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
}
},
"containerEnv": {
"RONDB_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8"
// Do this manually:
// export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8/lib
"RONDB_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8",
// This assumes that the RonDB tarball is placed the root of the workspace folder
"LD_LIBRARY_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8/lib"
}
}
3 changes: 1 addition & 2 deletions pink/rondis/sql/STRING_key.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ CREATE TABLE redis.string_keys(
-- Redis actually supports a max key size of 512MiB,
-- but we choose not to support that here
redis_key VARBINARY(3000) NOT NULL,
-- We will use a ndb auto-increment
-- This is to save space when referencing the key in the value table
rondb_key BIGINT UNSIGNED,
rondb_key BIGINT UNSIGNED AUTO_INCREMENT NULL,
-- TODO: Replace with Enum below
value_data_type INT UNSIGNED NOT NULL,
-- value_data_type ENUM('string', 'number', 'binary_string'),
Expand Down
19 changes: 14 additions & 5 deletions pink/rondis/string/commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ void rondb_get_command(Ndb *ndb,
// start copying from 3rd byte
memcpy(&key_row.redis_key[2], key_str, key_len);
// Length as little endian
key_row.redis_key[0] = key_len & 255;
key_row.redis_key[1] = key_len >> 8;
Uint8 *ptr = (Uint8 *)&key_row.redis_key[0];
ptr[0] = Uint8(key_len & 255);
ptr[1] = Uint8(key_len >> 8);

// This is (usually) a local operation to calculate the correct data node, using the
// hash of the pk value.
Expand Down Expand Up @@ -86,8 +87,9 @@ void rondb_get_command(Ndb *ndb,
// start copying from 3rd byte
memcpy(&key_row.redis_key[2], key_str, key_len);
// Length as little endian
key_row.redis_key[0] = key_len & 255;
key_row.redis_key[1] = key_len >> 8;
Uint8 *ptr = (Uint8 *)&key_row.redis_key[0];
ptr[0] = Uint8(key_len & 255);
ptr[1] = Uint8(key_len >> 8);

/*
Our value uses value rows, so a more complex read is required.
Expand Down Expand Up @@ -154,7 +156,7 @@ void rondb_set_command(
}

char varsize_param[EXTENSION_VALUE_LEN + 500];
Uint32 num_value_rows = value_len / EXTENSION_VALUE_LEN;
Uint32 num_value_rows = 0;
Uint64 rondb_key = 0;

if (value_len > INLINE_VALUE_LEN)
Expand All @@ -168,6 +170,13 @@ void rondb_set_command(
* deleting the row in the main table ensures that all
* value rows are also deleted.
*/
Uint32 extended_value_len = value_len - INLINE_VALUE_LEN;
num_value_rows = extended_value_len / EXTENSION_VALUE_LEN;
if (extended_value_len % EXTENSION_VALUE_LEN != 0)
{
num_value_rows++;
}

if (rondb_get_rondb_key(tab, rondb_key, ndb, response) != 0)
{
return;
Expand Down
159 changes: 90 additions & 69 deletions pink/rondis/string/db_operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,9 @@ void write_data_to_key_op(NdbOperation *ndb_op,
this_value_len = INLINE_VALUE_LEN;
}
memcpy(&buf[2], value_str, this_value_len);
buf[0] = this_value_len & 255;
buf[1] = this_value_len >> 8;
Uint8 *ptr = (Uint8 *)buf;
ptr[0] = (Uint8)(this_value_len & 255);
ptr[1] = (Uint8)(this_value_len >> 8);
ndb_op->setValue(KEY_TABLE_COL_value_start, buf);
}

Expand Down Expand Up @@ -300,11 +301,11 @@ int create_value_row(std::string *response,
op->equal(VALUE_TABLE_COL_rondb_key, rondb_key);
op->equal(VALUE_TABLE_COL_ordinal, ordinal);
memcpy(&buf[2], start_value_ptr, this_value_len);
buf[0] = this_value_len & 255;
buf[1] = this_value_len >> 8;
Uint8 *ptr = (Uint8 *)buf;
ptr[0] = (Uint8)(this_value_len & 255);
ptr[1] = (Uint8)(this_value_len >> 8);
op->setValue(VALUE_TABLE_COL_value, buf);
{
int ret_code = op->getNdbError().code;
if (op->getNdbError().code != 0)
{
assign_ndb_err_to_response(response, FAILED_DEFINE_OP, op->getNdbError());
Expand Down Expand Up @@ -356,7 +357,7 @@ int create_all_value_rows(std::string *response,
assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError());
return -1;
}

response->append("+OK\r\n");
return 0;
}
Expand Down Expand Up @@ -408,13 +409,15 @@ int get_simple_key_row(std::string *response,
{
return 0;
}
char buf[20];
int len = write_formatted(buf,
sizeof(buf),
"$%u\r\n",
key_row->tot_value_len);
response->reserve(key_row->tot_value_len + len + 3);
response->append(buf);
char header_buf[20];
int header_len = write_formatted(header_buf,
sizeof(header_buf),
"$%u\r\n",
key_row->tot_value_len);

// The total length of the expected response
response->reserve(header_len + key_row->tot_value_len + 2);
response->append(header_buf);
response->append((const char *)&key_row->value_start[2], key_row->tot_value_len);
response->append("\r\n");
/*
Expand Down Expand Up @@ -442,54 +445,77 @@ int get_value_rows(std::string *response,
return -1;
}

// Break up fetching large values to avoid blocking the network for other reads
const int ROWS_PER_READ = 2;
struct value_table value_rows[ROWS_PER_READ];

for (Uint32 row_index = 0; row_index < num_rows; row_index++)
// This is rounded up
Uint32 num_read_batches = (num_rows + ROWS_PER_READ - 1) / ROWS_PER_READ;
for (Uint32 batch = 0; batch < num_read_batches; batch++)
{
int read_index = row_index % ROWS_PER_READ;
value_rows[read_index].rondb_key = rondb_key;
value_rows[read_index].ordinal = row_index;

bool is_last_row_of_read = (read_index == (ROWS_PER_READ - 1));
bool is_last_row = (row_index == (num_rows - 1));
if (!is_last_row_of_read && !is_last_row)
Uint32 start_ordinal = batch * ROWS_PER_READ;
Uint32 num_rows_to_read = std::min(ROWS_PER_READ, num_rows - start_ordinal);

bool is_last_batch = (batch == (num_read_batches - 1));
NdbTransaction::ExecType commit_type = is_last_batch ? NdbTransaction::Commit : NdbTransaction::NoCommit;

if (read_batched_value_rows(response,
trans,
rondb_key,
num_rows_to_read,
start_ordinal,
commit_type) != 0)
{
continue;
return -1;
}
}
return 0;
}

// Break up fetching large values to avoid blocking the network for other reads
int read_batched_value_rows(std::string *response,
NdbTransaction *trans,
const Uint64 rondb_key,
const Uint32 num_rows_to_read,
const Uint32 start_ordinal,
const NdbTransaction::ExecType commit_type)
{
struct value_table value_rows[ROWS_PER_READ];

Uint32 ordinal = start_ordinal;
for (Uint32 i = 0; i < num_rows_to_read; i++)
{
value_rows[i].rondb_key = rondb_key;
value_rows[i].ordinal = ordinal;
const NdbOperation *read_op = trans->readTuple(
pk_value_record,
(const char *)&value_rows,
(const char *)&value_rows[i],
entire_value_record,
(char *)&value_rows,
(char *)&value_rows[i],
NdbOperation::LM_CommittedRead);
if (read_op == nullptr)
{
assign_ndb_err_to_response(response,
FAILED_GET_OP,
trans->getNdbError());
return RONDB_INTERNAL_ERROR;
return -1;
}
ordinal++;
}

NdbTransaction::ExecType commit_type = is_last_row ? NdbTransaction::Commit : NdbTransaction::NoCommit;
if (trans->execute(commit_type,
NdbOperation::AbortOnError) != 0)
{
assign_ndb_err_to_response(response,
FAILED_READ_KEY,
trans->getNdbError());
return RONDB_INTERNAL_ERROR;
}
if (trans->execute(commit_type,
NdbOperation::AbortOnError) != 0 ||
trans->getNdbError().code != 0)
{
assign_ndb_err_to_response(response,
FAILED_READ_KEY,
trans->getNdbError());
return -1;
}

for (Uint32 i = 0; i <= read_index; i++)
{
// Transfer char pointer to response's string
Uint32 row_value_len =
value_rows[i].value[0] + (value_rows[i].value[1] << 8);
response->append(&value_rows[i].value[2], row_value_len);
}
for (Uint32 i = 0; i < num_rows_to_read; i++)
{
// Transfer char pointer to response's string
Uint8 low = (Uint8)value_rows[i].value[0];
Uint8 high = (Uint8)value_rows[i].value[1];
Uint32 row_value_len = Uint32(low) + (Uint32(256) * Uint32(high));
response->append((const char *)&value_rows[i].value[2], row_value_len);
}
return 0;
}
Expand Down Expand Up @@ -529,7 +555,8 @@ int get_complex_key_row(std::string *response,
return RONDB_INTERNAL_ERROR;
}
if (trans->execute(NdbTransaction::NoCommit,
NdbOperation::AbortOnError) != 0)
NdbOperation::AbortOnError) != 0 ||
trans->getNdbError().code != 0)
{
assign_ndb_err_to_response(response,
FAILED_READ_KEY,
Expand All @@ -539,17 +566,19 @@ int get_complex_key_row(std::string *response,

// Got inline value, now getting the other value rows

// Preparing response based on returned total value length
char buf[20];
int len = write_formatted(buf,
sizeof(buf),
"$%u\r\n",
key_row->tot_value_len);
response->reserve(key_row->tot_value_len + len + 3);
response->append(buf);
// Writing the Redis header to the response (indicating value length)
char header_buf[20];
int header_len = write_formatted(header_buf,
sizeof(header_buf),
"$%u\r\n",
key_row->tot_value_len);
response->reserve(header_len + key_row->tot_value_len + 2);
response->append(header_buf);

// Append inline value to response
Uint32 inline_value_len = key_row->value_start[0] + (key_row->value_start[1] << 8);
Uint8 low = (Uint8)key_row->value_start[0];
Uint8 high = (Uint8)key_row->value_start[1];
Uint32 inline_value_len = Uint32(low) + (Uint32(256) * Uint32(high));
response->append((const char *)&key_row->value_start[2], inline_value_len);

int ret_code = get_value_rows(response,
Expand All @@ -572,20 +601,12 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab,
Ndb *ndb,
std::string *response)
{
if (ndb->getAutoIncrementValue(tab, rondb_key, unsigned(1024)) == 0)
if (ndb->getAutoIncrementValue(tab, rondb_key, unsigned(1024)) != 0)
{
return 0;
}
if (ndb->getNdbError().code == 626)
{
if (ndb->setAutoIncrementValue(tab, Uint64(1), false) == 0)
{
rondb_key = Uint64(1);
return 0;
}
assign_ndb_err_to_response(response,
"Failed to get autoincrement value",
ndb->getNdbError());
return -1;
}
assign_ndb_err_to_response(response,
"Failed to get autoincrement value",
ndb->getNdbError());
return -1;
return 0;
}
9 changes: 9 additions & 0 deletions pink/rondis/string/db_operations.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/Ndb.hpp>

const Uint32 ROWS_PER_READ = 2;

int create_key_row(std::string *response,
Ndb *ndb,
const NdbDictionary::Table *tab,
Expand Down Expand Up @@ -109,6 +111,13 @@ int get_value_rows(std::string *response,
const Uint64 key_id,
const Uint32 tot_value_len);

int read_batched_value_rows(std::string *response,
NdbTransaction *trans,
const Uint64 rondb_key,
const Uint32 num_rows_to_read,
const Uint32 start_ordinal,
const NdbTransaction::ExecType commit_type);

int rondb_get_rondb_key(const NdbDictionary::Table *tab,
Uint64 &key_id,
Ndb *ndb,
Expand Down
2 changes: 1 addition & 1 deletion pink/rondis/string/table_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct value_table
{
Uint64 rondb_key;
Uint32 ordinal;
char value[EXTENSION_VALUE_LEN];
char value[EXTENSION_VALUE_LEN + 2];
};

/*
Expand Down
19 changes: 9 additions & 10 deletions pink/rondis/tests/get_set.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ EOF

# GET the value
local result=$(redis-cli GET "$key")

local expected_hash=$(echo -n "$value" | sha256sum | awk '{print $1}')
local actual_hash=$(echo -n "$result" | sha256sum | awk '{print $1}')

# Check if the retrieved value matches the expected value
if [[ "$result" == "$value" ]]; then
if [[ "$expected_hash" == "$actual_hash" ]]; then
echo "PASS: $key with value length ${#value}"
else
echo "FAIL: $key with value length ${#value}"
echo "Expected: $value"
echo "Got: $result"
echo "FAIL: $key with value length ${#value}; got length ${#result}"
echo "Expected hash: $expected_hash"
echo "Received hash: $actual_hash"
exit 1
fi
echo
Expand All @@ -47,7 +50,7 @@ generate_random_chars() {
local random_string=""

while [ "${#random_string}" -lt "$length" ]; do
random_string+=$(head /dev/urandom | tr -dc 'a-zA-Z0-9' | head -c "$length")
random_string+=$(head /dev/urandom | LC_CTYPE=C tr -dc 'a-zA-Z0-9' | head -c "$length")
done

echo "${random_string:0:$length}"
Expand All @@ -64,12 +67,8 @@ set_and_get "$KEY:empty" ""
echo "Testing small string..."
set_and_get "$KEY:small" "hello"

# Too large values seem to fail due to the network buffer size
# Minimal amount to create value rows: 30000
# TODO: Increase this as soon as GH actions allows it:
# for NUM_CHARS in 100 10000 30000 50000 80000 100000; do

for NUM_CHARS in 100 10000; do
for NUM_CHARS in 100 10000 30000 50000 57000 60000 70000; do
echo "Testing string with $NUM_CHARS characters..."
test_value=$(generate_random_chars $NUM_CHARS)
set_and_get "$KEY:$NUM_CHARS" "$test_value"
Expand Down