Skip to content

Commit

Permalink
Fix GETting multi-row values (#11)
Browse files Browse the repository at this point in the history
* Always check errors using getNdbError()
* Compare hashes in bash tests instead of full values
FIXES:
* Multi-value-row fetching: use different memory for each row
* Multi-value-row fetching: use clean struct for every fetch (place fetch into separate func)
* Calculate number of value rows correctly in SET
* Using uint8 for type safety when read/writing value lengths from buffers
* Usage of auto-increment for rondb_key; use SQL's AUTO_INCREMENT
  • Loading branch information
olapiv authored Nov 4, 2024
1 parent db1c603 commit 9d1d7ce
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 90 deletions.
6 changes: 3 additions & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
}
},
"containerEnv": {
"RONDB_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8"
// Do this manually:
// export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8/lib
"RONDB_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8",
// This assumes that the RonDB tarball is placed the root of the workspace folder
"LD_LIBRARY_PATH": "/usr/src/app/rondb-22.10.5-linux-glibc2.28-arm64_v8/lib"
}
}
3 changes: 1 addition & 2 deletions pink/rondis/sql/STRING_key.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ CREATE TABLE redis.string_keys(
-- Redis actually supports a max key size of 512MiB,
-- but we choose not to support that here
redis_key VARBINARY(3000) NOT NULL,
-- We will use a ndb auto-increment
-- This is to save space when referencing the key in the value table
rondb_key BIGINT UNSIGNED,
rondb_key BIGINT UNSIGNED AUTO_INCREMENT NULL,
-- TODO: Replace with Enum below
value_data_type INT UNSIGNED NOT NULL,
-- value_data_type ENUM('string', 'number', 'binary_string'),
Expand Down
19 changes: 14 additions & 5 deletions pink/rondis/string/commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ void rondb_get_command(Ndb *ndb,
// start copying from 3rd byte
memcpy(&key_row.redis_key[2], key_str, key_len);
// Length as little endian
key_row.redis_key[0] = key_len & 255;
key_row.redis_key[1] = key_len >> 8;
Uint8 *ptr = (Uint8 *)&key_row.redis_key[0];
ptr[0] = Uint8(key_len & 255);
ptr[1] = Uint8(key_len >> 8);

// This is (usually) a local operation to calculate the correct data node, using the
// hash of the pk value.
Expand Down Expand Up @@ -86,8 +87,9 @@ void rondb_get_command(Ndb *ndb,
// start copying from 3rd byte
memcpy(&key_row.redis_key[2], key_str, key_len);
// Length as little endian
key_row.redis_key[0] = key_len & 255;
key_row.redis_key[1] = key_len >> 8;
Uint8 *ptr = (Uint8 *)&key_row.redis_key[0];
ptr[0] = Uint8(key_len & 255);
ptr[1] = Uint8(key_len >> 8);

/*
Our value uses value rows, so a more complex read is required.
Expand Down Expand Up @@ -154,7 +156,7 @@ void rondb_set_command(
}

char varsize_param[EXTENSION_VALUE_LEN + 500];
Uint32 num_value_rows = value_len / EXTENSION_VALUE_LEN;
Uint32 num_value_rows = 0;
Uint64 rondb_key = 0;

if (value_len > INLINE_VALUE_LEN)
Expand All @@ -168,6 +170,13 @@ void rondb_set_command(
* deleting the row in the main table ensures that all
* value rows are also deleted.
*/
Uint32 extended_value_len = value_len - INLINE_VALUE_LEN;
num_value_rows = extended_value_len / EXTENSION_VALUE_LEN;
if (extended_value_len % EXTENSION_VALUE_LEN != 0)
{
num_value_rows++;
}

if (rondb_get_rondb_key(tab, rondb_key, ndb, response) != 0)
{
return;
Expand Down
159 changes: 90 additions & 69 deletions pink/rondis/string/db_operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,9 @@ void write_data_to_key_op(NdbOperation *ndb_op,
this_value_len = INLINE_VALUE_LEN;
}
memcpy(&buf[2], value_str, this_value_len);
buf[0] = this_value_len & 255;
buf[1] = this_value_len >> 8;
Uint8 *ptr = (Uint8 *)buf;
ptr[0] = (Uint8)(this_value_len & 255);
ptr[1] = (Uint8)(this_value_len >> 8);
ndb_op->setValue(KEY_TABLE_COL_value_start, buf);
}

Expand Down Expand Up @@ -300,11 +301,11 @@ int create_value_row(std::string *response,
op->equal(VALUE_TABLE_COL_rondb_key, rondb_key);
op->equal(VALUE_TABLE_COL_ordinal, ordinal);
memcpy(&buf[2], start_value_ptr, this_value_len);
buf[0] = this_value_len & 255;
buf[1] = this_value_len >> 8;
Uint8 *ptr = (Uint8 *)buf;
ptr[0] = (Uint8)(this_value_len & 255);
ptr[1] = (Uint8)(this_value_len >> 8);
op->setValue(VALUE_TABLE_COL_value, buf);
{
int ret_code = op->getNdbError().code;
if (op->getNdbError().code != 0)
{
assign_ndb_err_to_response(response, FAILED_DEFINE_OP, op->getNdbError());
Expand Down Expand Up @@ -356,7 +357,7 @@ int create_all_value_rows(std::string *response,
assign_ndb_err_to_response(response, FAILED_EXEC_TXN, trans->getNdbError());
return -1;
}

response->append("+OK\r\n");
return 0;
}
Expand Down Expand Up @@ -408,13 +409,15 @@ int get_simple_key_row(std::string *response,
{
return 0;
}
char buf[20];
int len = write_formatted(buf,
sizeof(buf),
"$%u\r\n",
key_row->tot_value_len);
response->reserve(key_row->tot_value_len + len + 3);
response->append(buf);
char header_buf[20];
int header_len = write_formatted(header_buf,
sizeof(header_buf),
"$%u\r\n",
key_row->tot_value_len);

// The total length of the expected response
response->reserve(header_len + key_row->tot_value_len + 2);
response->append(header_buf);
response->append((const char *)&key_row->value_start[2], key_row->tot_value_len);
response->append("\r\n");
/*
Expand Down Expand Up @@ -442,54 +445,77 @@ int get_value_rows(std::string *response,
return -1;
}

// Break up fetching large values to avoid blocking the network for other reads
const int ROWS_PER_READ = 2;
struct value_table value_rows[ROWS_PER_READ];

for (Uint32 row_index = 0; row_index < num_rows; row_index++)
// This is rounded up
Uint32 num_read_batches = (num_rows + ROWS_PER_READ - 1) / ROWS_PER_READ;
for (Uint32 batch = 0; batch < num_read_batches; batch++)
{
int read_index = row_index % ROWS_PER_READ;
value_rows[read_index].rondb_key = rondb_key;
value_rows[read_index].ordinal = row_index;

bool is_last_row_of_read = (read_index == (ROWS_PER_READ - 1));
bool is_last_row = (row_index == (num_rows - 1));
if (!is_last_row_of_read && !is_last_row)
Uint32 start_ordinal = batch * ROWS_PER_READ;
Uint32 num_rows_to_read = std::min(ROWS_PER_READ, num_rows - start_ordinal);

bool is_last_batch = (batch == (num_read_batches - 1));
NdbTransaction::ExecType commit_type = is_last_batch ? NdbTransaction::Commit : NdbTransaction::NoCommit;

if (read_batched_value_rows(response,
trans,
rondb_key,
num_rows_to_read,
start_ordinal,
commit_type) != 0)
{
continue;
return -1;
}
}
return 0;
}

// Break up fetching large values to avoid blocking the network for other reads
int read_batched_value_rows(std::string *response,
NdbTransaction *trans,
const Uint64 rondb_key,
const Uint32 num_rows_to_read,
const Uint32 start_ordinal,
const NdbTransaction::ExecType commit_type)
{
struct value_table value_rows[ROWS_PER_READ];

Uint32 ordinal = start_ordinal;
for (Uint32 i = 0; i < num_rows_to_read; i++)
{
value_rows[i].rondb_key = rondb_key;
value_rows[i].ordinal = ordinal;
const NdbOperation *read_op = trans->readTuple(
pk_value_record,
(const char *)&value_rows,
(const char *)&value_rows[i],
entire_value_record,
(char *)&value_rows,
(char *)&value_rows[i],
NdbOperation::LM_CommittedRead);
if (read_op == nullptr)
{
assign_ndb_err_to_response(response,
FAILED_GET_OP,
trans->getNdbError());
return RONDB_INTERNAL_ERROR;
return -1;
}
ordinal++;
}

NdbTransaction::ExecType commit_type = is_last_row ? NdbTransaction::Commit : NdbTransaction::NoCommit;
if (trans->execute(commit_type,
NdbOperation::AbortOnError) != 0)
{
assign_ndb_err_to_response(response,
FAILED_READ_KEY,
trans->getNdbError());
return RONDB_INTERNAL_ERROR;
}
if (trans->execute(commit_type,
NdbOperation::AbortOnError) != 0 ||
trans->getNdbError().code != 0)
{
assign_ndb_err_to_response(response,
FAILED_READ_KEY,
trans->getNdbError());
return -1;
}

for (Uint32 i = 0; i <= read_index; i++)
{
// Transfer char pointer to response's string
Uint32 row_value_len =
value_rows[i].value[0] + (value_rows[i].value[1] << 8);
response->append(&value_rows[i].value[2], row_value_len);
}
for (Uint32 i = 0; i < num_rows_to_read; i++)
{
// Transfer char pointer to response's string
Uint8 low = (Uint8)value_rows[i].value[0];
Uint8 high = (Uint8)value_rows[i].value[1];
Uint32 row_value_len = Uint32(low) + (Uint32(256) * Uint32(high));
response->append((const char *)&value_rows[i].value[2], row_value_len);
}
return 0;
}
Expand Down Expand Up @@ -529,7 +555,8 @@ int get_complex_key_row(std::string *response,
return RONDB_INTERNAL_ERROR;
}
if (trans->execute(NdbTransaction::NoCommit,
NdbOperation::AbortOnError) != 0)
NdbOperation::AbortOnError) != 0 ||
trans->getNdbError().code != 0)
{
assign_ndb_err_to_response(response,
FAILED_READ_KEY,
Expand All @@ -539,17 +566,19 @@ int get_complex_key_row(std::string *response,

// Got inline value, now getting the other value rows

// Preparing response based on returned total value length
char buf[20];
int len = write_formatted(buf,
sizeof(buf),
"$%u\r\n",
key_row->tot_value_len);
response->reserve(key_row->tot_value_len + len + 3);
response->append(buf);
// Writing the Redis header to the response (indicating value length)
char header_buf[20];
int header_len = write_formatted(header_buf,
sizeof(header_buf),
"$%u\r\n",
key_row->tot_value_len);
response->reserve(header_len + key_row->tot_value_len + 2);
response->append(header_buf);

// Append inline value to response
Uint32 inline_value_len = key_row->value_start[0] + (key_row->value_start[1] << 8);
Uint8 low = (Uint8)key_row->value_start[0];
Uint8 high = (Uint8)key_row->value_start[1];
Uint32 inline_value_len = Uint32(low) + (Uint32(256) * Uint32(high));
response->append((const char *)&key_row->value_start[2], inline_value_len);

int ret_code = get_value_rows(response,
Expand All @@ -572,20 +601,12 @@ int rondb_get_rondb_key(const NdbDictionary::Table *tab,
Ndb *ndb,
std::string *response)
{
if (ndb->getAutoIncrementValue(tab, rondb_key, unsigned(1024)) == 0)
if (ndb->getAutoIncrementValue(tab, rondb_key, unsigned(1024)) != 0)
{
return 0;
}
if (ndb->getNdbError().code == 626)
{
if (ndb->setAutoIncrementValue(tab, Uint64(1), false) == 0)
{
rondb_key = Uint64(1);
return 0;
}
assign_ndb_err_to_response(response,
"Failed to get autoincrement value",
ndb->getNdbError());
return -1;
}
assign_ndb_err_to_response(response,
"Failed to get autoincrement value",
ndb->getNdbError());
return -1;
return 0;
}
9 changes: 9 additions & 0 deletions pink/rondis/string/db_operations.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/Ndb.hpp>

const Uint32 ROWS_PER_READ = 2;

int create_key_row(std::string *response,
Ndb *ndb,
const NdbDictionary::Table *tab,
Expand Down Expand Up @@ -109,6 +111,13 @@ int get_value_rows(std::string *response,
const Uint64 key_id,
const Uint32 tot_value_len);

int read_batched_value_rows(std::string *response,
NdbTransaction *trans,
const Uint64 rondb_key,
const Uint32 num_rows_to_read,
const Uint32 start_ordinal,
const NdbTransaction::ExecType commit_type);

int rondb_get_rondb_key(const NdbDictionary::Table *tab,
Uint64 &key_id,
Ndb *ndb,
Expand Down
2 changes: 1 addition & 1 deletion pink/rondis/string/table_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct value_table
{
Uint64 rondb_key;
Uint32 ordinal;
char value[EXTENSION_VALUE_LEN];
char value[EXTENSION_VALUE_LEN + 2];
};

/*
Expand Down
19 changes: 9 additions & 10 deletions pink/rondis/tests/get_set.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ EOF

# GET the value
local result=$(redis-cli GET "$key")

local expected_hash=$(echo -n "$value" | sha256sum | awk '{print $1}')
local actual_hash=$(echo -n "$result" | sha256sum | awk '{print $1}')

# Check if the retrieved value matches the expected value
if [[ "$result" == "$value" ]]; then
if [[ "$expected_hash" == "$actual_hash" ]]; then
echo "PASS: $key with value length ${#value}"
else
echo "FAIL: $key with value length ${#value}"
echo "Expected: $value"
echo "Got: $result"
echo "FAIL: $key with value length ${#value}; got length ${#result}"
echo "Expected hash: $expected_hash"
echo "Received hash: $actual_hash"
exit 1
fi
echo
Expand All @@ -47,7 +50,7 @@ generate_random_chars() {
local random_string=""

while [ "${#random_string}" -lt "$length" ]; do
random_string+=$(head /dev/urandom | tr -dc 'a-zA-Z0-9' | head -c "$length")
random_string+=$(head /dev/urandom | LC_CTYPE=C tr -dc 'a-zA-Z0-9' | head -c "$length")
done

echo "${random_string:0:$length}"
Expand All @@ -64,12 +67,8 @@ set_and_get "$KEY:empty" ""
echo "Testing small string..."
set_and_get "$KEY:small" "hello"

# Too large values seem to fail due to the network buffer size
# Minimal amount to create value rows: 30000
# TODO: Increase this as soon as GH actions allows it:
# for NUM_CHARS in 100 10000 30000 50000 80000 100000; do

for NUM_CHARS in 100 10000; do
for NUM_CHARS in 100 10000 30000 50000 57000 60000 70000; do
echo "Testing string with $NUM_CHARS characters..."
test_value=$(generate_random_chars $NUM_CHARS)
set_and_get "$KEY:$NUM_CHARS" "$test_value"
Expand Down

0 comments on commit 9d1d7ce

Please sign in to comment.