Skip to content

Commit

Permalink
see RedisLabs#238, data import broken
Browse files Browse the repository at this point in the history
The PR includes some refactoring and cleanup around the following:
object_generator
    - generate_key(), a new function that generates a key and stores it
      inside internal buffer (no need for a buffer at client level)
import_object_generator
    - Align it to object_generator APIs of getting key/value/expire
    - read_next_item - new function for reading the next item before
      creating SET command
    - read_next_key - new function for reading (pointing) to the next
      key before creating the GET command
data_object
    - removed.
      We were already getting key/value/expire separately. Now that
      data-import and verify-data also moved to do it like that,
      we could remove it completely.
verify_client
    - unify the craate_request with its base class, and derive the
      create_x_request functions.
  • Loading branch information
YaacovHazan committed Oct 31, 2023
1 parent 0325e00 commit 31caa28
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 208 deletions.
98 changes: 52 additions & 46 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,14 @@ get_key_response client::get_key_for_conn(unsigned int command_index, unsigned i
iter = obj_iter_type(m_config, command_index);

*key_index = m_obj_gen->get_key_index(iter);
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);

if (!m_config->data_import || m_config->generate_keys) {
m_obj_gen->generate_key(*key_index);
} else {
/* For SET command we already read a completes item (see create_set_request()) */
if (command_index == GET_CMD_IDX)
dynamic_cast<import_object_generator*>(m_obj_gen)->read_next_key(*key_index);
}

return available_for_conn;
}
Expand All @@ -277,7 +284,7 @@ bool client::create_arbitrary_request(unsigned int command_index, struct timeval
get_key_response res = get_key_for_conn(command_index, conn_id, &key_index);
/* If key not available for this connection, we have a bug of sending partial request */
assert(res == available_for_conn);
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_key_buffer, m_key_len);
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, m_obj_gen->get_key(), m_obj_gen->get_key_len());
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);
Expand Down Expand Up @@ -313,7 +320,7 @@ bool client::create_set_request(struct timeval& timestamp, unsigned int conn_id)
unsigned int value_len;
const char *value = m_obj_gen->get_value(key_index, &value_len);

m_connections[conn_id]->send_set_command(&timestamp, m_key_buffer, m_key_len,
m_connections[conn_id]->send_set_command(&timestamp, m_obj_gen->get_key(), m_obj_gen->get_key_len(),
value, value_len, m_obj_gen->get_expiry(),
m_config->data_offset);
}
Expand All @@ -328,7 +335,7 @@ bool client::create_get_request(struct timeval& timestamp, unsigned int conn_id)
return false;

if (res == available_for_conn) {
m_connections[conn_id]->send_get_command(&timestamp, m_key_buffer, m_key_len, m_config->data_offset);
m_connections[conn_id]->send_get_command(&timestamp, m_obj_gen->get_key(), m_obj_gen->get_key_len(), m_config->data_offset);
}

return true;
Expand All @@ -346,7 +353,7 @@ bool client::create_mget_request(struct timeval& timestamp, unsigned int conn_id
/* Not supported in cluster mode */
assert(res == available_for_conn);

m_keylist->add_key(m_key_buffer, m_key_len);
m_keylist->add_key(m_obj_gen->get_key(), m_obj_gen->get_key_len());
}

m_connections[conn_id]->send_mget_command(&timestamp, m_keylist);
Expand Down Expand Up @@ -378,6 +385,11 @@ void client::create_request(struct timeval timestamp, unsigned int conn_id)

// are we set or get? this depends on the ratio
else if (m_set_ratio_count < m_config->ratio.a) {
/* Before we can create a SET request, we need to read the next imported item */
if (m_config->data_import) {
dynamic_cast<import_object_generator*>(m_obj_gen)->read_next_item();
}

if (!create_set_request(timestamp, conn_id))
return;

Expand Down Expand Up @@ -482,57 +494,51 @@ unsigned long long int verify_client::get_errors(void)
return m_errors;
}

void verify_client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// TODO: Refactor client::create_request so this can be unified.
if (m_set_ratio_count < m_config->ratio.a) {
// Prepare a GET request that will be compared against a previous
// SET request.
data_object *obj = m_obj_gen->get_object(obj_iter_type(m_config, 0));
unsigned int key_len;
const char *key = obj->get_key(&key_len);
bool verify_client::create_wait_request(struct timeval& timestamp, unsigned int conn_id) {
// Nothing to do
return true;
}

bool verify_client::create_set_request(struct timeval& timestamp, unsigned int conn_id) {
unsigned long long key_index;
get_key_response res = get_key_for_conn(SET_CMD_IDX, conn_id, &key_index);
if (res == not_available)
return false;

if (res == available_for_conn) {
unsigned int value_len;
const char *value = obj->get_value(&value_len);
const char *value = m_obj_gen->get_value(key_index, &value_len);

m_connections[conn_id]->send_verify_get_command(&timestamp, key, key_len,
value, value_len, obj->get_expiry(),
m_connections[conn_id]->send_verify_get_command(&timestamp, m_obj_gen->get_key(), m_obj_gen->get_key_len(),
value, value_len,
m_config->data_offset);
}

m_set_ratio_count++;
} else if (m_get_ratio_count < m_config->ratio.b) {
// We don't really care about GET operations, all we do here is keep
// the object generator synced.
int iter = obj_iter_type(m_config, 2);

if (m_config->multi_key_get > 0) {
unsigned int keys_count;
return true;
}

keys_count = m_config->ratio.b - m_get_ratio_count;
if ((int)keys_count > m_config->multi_key_get)
keys_count = m_config->multi_key_get;
m_keylist->clear();
while (m_keylist->get_keys_count() < keys_count) {
unsigned int keylen;
const char *key = m_obj_gen->get_key(iter, &keylen);
bool verify_client::create_get_request(struct timeval& timestamp, unsigned int conn_id) {
// Just Keep object generator synced
unsigned long long key_index;
get_key_for_conn(GET_CMD_IDX, conn_id, &key_index);

assert(key != NULL);
assert(keylen > 0);
return true;
}

m_keylist->add_key(key, keylen);
}
bool verify_client::create_mget_request(struct timeval& timestamp, unsigned int conn_id) {
// Just Keep object generator synced
unsigned long long key_index;
unsigned int keys_count = m_config->ratio.b - m_get_ratio_count;
if ((int)keys_count > m_config->multi_key_get)
keys_count = m_config->multi_key_get;

m_get_ratio_count += keys_count;
} else {
unsigned int keylen;
m_obj_gen->get_key(iter, &keylen);
m_get_ratio_count++;
}
m_keylist->clear();
for (unsigned int i = 0; i < keys_count; i++) {

// We don't really send this request, but need to count it to be in sync.
m_reqs_processed++;
} else {
m_get_ratio_count = m_set_ratio_count = 0;
get_key_for_conn(GET_CMD_IDX, conn_id, &key_index);
}

return true;
}

void verify_client::handle_response(unsigned int conn_id, struct timeval timestamp,
Expand Down
18 changes: 8 additions & 10 deletions client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class client;
class client_group;
struct benchmark_config;
class object_generator;
class data_object;

#define SET_CMD_IDX 0
#define GET_CMD_IDX 2
Expand All @@ -61,10 +60,6 @@ class client : public connections_manager {
bool m_initialized;
bool m_end_set;

// key buffer
char m_key_buffer[250];
int m_key_len;

// test related
benchmark_config* m_config;
object_generator* m_obj_gen;
Expand Down Expand Up @@ -93,10 +88,10 @@ class client : public connections_manager {

virtual get_key_response get_key_for_conn(unsigned int command_index, unsigned int conn_id, unsigned long long* key_index);
virtual bool create_arbitrary_request(unsigned int command_index, struct timeval& timestamp, unsigned int conn_id);
bool create_wait_request(struct timeval& timestamp, unsigned int conn_id);
bool create_set_request(struct timeval& timestamp, unsigned int conn_id);
bool create_get_request(struct timeval& timestamp, unsigned int conn_id);
bool create_mget_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_wait_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_set_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_get_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_mget_request(struct timeval& timestamp, unsigned int conn_id);

// client manager api's
unsigned long long get_reqs_processed() {
Expand Down Expand Up @@ -185,7 +180,10 @@ class verify_client : public client {
unsigned long long int m_errors;

virtual bool finished(void);
virtual void create_request(struct timeval timestamp, unsigned int conn_id);
virtual bool create_wait_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_set_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_get_request(struct timeval& timestamp, unsigned int conn_id);
virtual bool create_mget_request(struct timeval& timestamp, unsigned int conn_id);
virtual void handle_response(unsigned int conn_id, struct timeval timestamp,
request *request, protocol_response *response);
public:
Expand Down
13 changes: 9 additions & 4 deletions cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ get_key_response cluster_client::get_key_for_conn(unsigned int command_index, un
// first check if we already have a key in the pool
if (!m_key_index_pools[conn_id]->empty()) {
*key_index = m_key_index_pools[conn_id]->front();
m_key_len = snprintf(m_key_buffer, sizeof(m_key_buffer)-1, "%s%llu", m_obj_gen->get_key_prefix(), *key_index);
m_obj_gen->generate_key(*key_index);

m_key_index_pools[conn_id]->pop();
return available_for_conn;
Expand All @@ -321,11 +321,13 @@ get_key_response cluster_client::get_key_for_conn(unsigned int command_index, un
// generate key
client::get_key_for_conn(command_index, conn_id, key_index);

unsigned int hslot = calc_hslot_crc16_cluster(m_key_buffer, m_key_len);
unsigned int hslot = calc_hslot_crc16_cluster(m_obj_gen->get_key(), m_obj_gen->get_key_len());

// check if the key match for this connection
if (m_slot_to_shard[hslot] == conn_id) {
benchmark_debug_log("%s generated key=[%.*s] for itself\n", m_connections[conn_id]->get_readable_id(), m_key_len, m_key_buffer);
benchmark_debug_log("%s generated key=[%.*s] for itself\n",
m_connections[conn_id]->get_readable_id(),
m_obj_gen->get_key_len(), m_obj_gen->get_key());
return available_for_conn;
}

Expand All @@ -347,7 +349,10 @@ get_key_response cluster_client::get_key_for_conn(unsigned int command_index, un
return not_available;

// store command and key for the other connection
benchmark_debug_log("%s generated key=[%.*s] for %s\n", m_connections[conn_id]->get_readable_id(), m_key_len, m_key_buffer, m_connections[other_conn_id]->get_readable_id());
benchmark_debug_log("%s generated key=[%.*s] for %s\n",
m_connections[conn_id]->get_readable_id(),
m_obj_gen->get_key_len(), m_obj_gen->get_key(),
m_connections[other_conn_id]->get_readable_id());

key_idx_pool->push(command_index);
key_idx_pool->push(*key_index);
Expand Down
Loading

0 comments on commit 31caa28

Please sign in to comment.