Skip to content

Commit

Permalink
add support for aribitrary command (#67)
Browse files Browse the repository at this point in the history
* add support for aribitrary command

add new option to run arbitrary command instead of
the SET:GET commands.

* add support for aribitrary command

cr fixes 1:
1. replace hex_digit_to_int()
2. remove the MAX_BULKS limit
3. fix redis protocol parser
  • Loading branch information
YaacovHazan authored and yaacovhazan-Redislabs committed Dec 23, 2018
1 parent 58a1bdb commit 891df95
Show file tree
Hide file tree
Showing 15 changed files with 942 additions and 306 deletions.
55 changes: 47 additions & 8 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,39 @@ bool client::hold_pipeline(unsigned int conn_id) {
// This function could use some urgent TLC -- but we need to do it without altering the behavior
void client::create_request(struct timeval timestamp, unsigned int conn_id)
{
// are we using arbitrary command?
if (m_config->command) {
int cmd_size = 0;
for (unsigned int i = 0; i < m_config->command->command_args.size(); i++) {
command_arg* arg = &m_config->command->command_args[i];

if (arg->type == const_type) {
cmd_size += m_connections[conn_id]->send_arbitrary_command(arg);
} else if (arg->type == key_type) {
int iter = obj_iter_type(m_config, 0);
unsigned int key_len;
const char *key = m_obj_gen->get_key(iter, &key_len);

assert(key != NULL);
assert(key_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, key, key_len);
} else if (arg->type == data_type) {
unsigned int value_len;
const char *value = m_obj_gen->get_value(0, &value_len);

assert(value != NULL);
assert(value_len > 0);

cmd_size += m_connections[conn_id]->send_arbitrary_command(arg, value, value_len);
}
}

m_connections[conn_id]->send_arbitrary_command_end(&timestamp, cmd_size);
m_reqs_generated++;
return;
}

// If the Set:Wait ratio is not 0, start off with WAITs
if (m_config->wait_ratio.b &&
(m_tot_wait_ops == 0 ||
Expand Down Expand Up @@ -335,19 +368,24 @@ void client::handle_response(unsigned int conn_id, struct timeval timestamp,
switch (request->m_type) {
case rt_get:
m_stats.update_get_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
response->get_hits(),
request->m_keys - response->get_hits());
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp),
response->get_hits(),
request->m_keys - response->get_hits());
break;
case rt_set:
m_stats.update_set_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
case rt_wait:
m_stats.update_wait_op(&timestamp,
ts_diff(request->m_sent_time, timestamp));
ts_diff(request->m_sent_time, timestamp));
break;
case rt_arbitrary:
m_stats.update_aribitrary_op(&timestamp,
request->m_size + response->get_total_len(),
ts_diff(request->m_sent_time, timestamp));
break;
default:
assert(0);
Expand Down Expand Up @@ -591,7 +629,8 @@ void client_group::write_client_stats(const char *prefix)
char filename[PATH_MAX];

snprintf(filename, sizeof(filename)-1, "%s-%u.csv", prefix, client_id++);
if (!(*i)->get_stats()->save_csv(filename, m_config->cluster_mode)) {
if (!(*i)->get_stats()->save_csv(filename, m_config->cluster_mode,
m_config->command ? m_config->command->command_name : "")) {
fprintf(stderr, "error: %s: failed to write client stats.\n", filename);
}
}
Expand Down
16 changes: 8 additions & 8 deletions cluster_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,23 @@ void cluster_client::handle_cluster_slots(protocol_response *r) {
std::vector<bool> close_sc(prev_connections_size, true);

// run over response and create connections
for (unsigned int i=0; i<r->get_mbulk_value()->mbulk_array.size(); i++) {
for (unsigned int i=0; i<r->get_mbulk_value()->mbulks_elements.size(); i++) {
// create connection
mbulk_element* shard = r->get_mbulk_value()->mbulk_array[i];
mbulk_size_el* shard = r->get_mbulk_value()->mbulks_elements[i]->as_mbulk_size();

int min_slot = strtol(shard->mbulk_array[0]->value, NULL, 10);
int max_slot = strtol(shard->mbulk_array[1]->value, NULL, 10);
int min_slot = strtol(shard->mbulks_elements[0]->as_bulk()->value + 1, NULL, 10);
int max_slot = strtol(shard->mbulks_elements[1]->as_bulk()->value + 1, NULL, 10);

// hostname/ip
mbulk_element* mbulk_addr_el = shard->mbulk_array[2]->mbulk_array[0];
bulk_el* mbulk_addr_el = shard->mbulks_elements[2]->as_mbulk_size()->mbulks_elements[0]->as_bulk();
char* addr = (char*) malloc(mbulk_addr_el->value_len + 1);
memcpy(addr, mbulk_addr_el->value, mbulk_addr_el->value_len);
addr[mbulk_addr_el->value_len] = '\0';

// port
mbulk_element* mbulk_port_el = shard->mbulk_array[2]->mbulk_array[1];
char* port = (char*) malloc(mbulk_port_el->value_len + 1);
memcpy(port, mbulk_port_el->value, mbulk_port_el->value_len);
bulk_el* mbulk_port_el = shard->mbulks_elements[2]->as_mbulk_size()->mbulks_elements[1]->as_bulk();
char* port = (char*) malloc(mbulk_port_el->value_len);
memcpy(port, mbulk_port_el->value + 1, mbulk_port_el->value_len);
port[mbulk_port_el->value_len] = '\0';

// check if connection already exist
Expand Down
155 changes: 155 additions & 0 deletions config_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,158 @@ const char* server_addr::get_last_error(void) const
{
return gai_strerror(m_last_error);
}

static int hex_digit_to_int(char c) {
if (c >= 'a' && c <= 'f') {
return (c - 'a') + 10;
} else if (c >= 'A' && c <= 'F') {
return (c - 'A') + 10;
} else if (c >= '0' && c <= '9') {
return (c - '0');
} else {
return -1;
}
}

bool arbitrary_command::split_command_to_args(const char* command) {
const char *p = command;
size_t command_len = strlen(command);

char buffer[command_len];
unsigned int buffer_len = 0;

while (1) {
/* skip blanks */
while (*p && isspace(*p)) {
p++;
}

if (*p) {
/* get a token */
bool in_quotes = 0; /* set to 1 if we are in "quotes" */
bool in_single_quotes = 0; /* set to 1 if we are in 'single quotes' */
bool done = 0;
buffer_len = 0;
//current = p;

while (!done) {
if (in_quotes) {
if (*p == '\\' && *(p + 1) == 'x' &&
isxdigit(*(p + 2)) &&
isxdigit(*(p + 3))) {

unsigned char byte;
byte = (hex_digit_to_int(*(p + 2)) * 16) +
hex_digit_to_int(*(p + 3));

buffer[buffer_len] = byte;
buffer_len++;
p += 3;
} else if (*p == '\\' && *(p + 1)) {
char c;
p++;

switch (*p) {
case 'n':
c = '\n';
break;

case 'r':
c = '\r';
break;

case 't':
c = '\t';
break;

case 'b':
c = '\b';
break;

case 'a':
c = '\a';
break;

default:
c = *p;
break;
}

buffer[buffer_len] = c;
buffer_len++;
} else if (*p == '"') {
/* closing quote must be followed by a space or
* nothing at all. */
if (*(p + 1) && !isspace(*(p + 1))) {
goto err;
}

done = 1;
} else if (!*p) {
/* unterminated quotes */
goto err;
} else {
buffer[buffer_len] = *p;
buffer_len++;
}
} else if (in_single_quotes) {
if (*p == '\\' && *(p + 1) == '\'') {
p++;
buffer[buffer_len] = '\'';
buffer_len++;
} else if (*p == '\'') {
/* closing quote must be followed by a space or
* nothing at all. */
if (*(p + 1) && !isspace(*(p + 1))) {
goto err;
}

done = 1;
} else if (!*p) {
/* unterminated quotes */
goto err;
} else {
buffer[buffer_len] = *p;
buffer_len++;
}
} else {
switch (*p) {
case ' ':
case '\n':
case '\r':
case '\t':
case '\0':
done = 1;
break;

case '"':
in_quotes = 1;
break;

case '\'':
in_single_quotes = 1;
break;

default:
buffer[buffer_len] = *p;
buffer_len++;
break;
}
}

if (*p) {
p++;
}
}

// add new arg
command_arg arg(buffer, buffer_len);
command_args.push_back(arg);
} else {
return true;
}
}

err:
return false;
}
22 changes: 22 additions & 0 deletions config_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,26 @@ struct server_addr {
int m_last_error;
};

#define KEY_PLACEHOLDER "__key__"
#define DATA_PLACEHOLDER "__data__"

enum command_arg_type {
const_type = 0,
key_type = 1,
data_type = 2,
undefined_type = 3
};

struct command_arg {
command_arg(const char* arg, unsigned int arg_len) : type(undefined_type), data(arg, arg_len) {;}
command_arg_type type;
std::string data;
};

struct arbitrary_command {
std::vector<command_arg> command_args;
std::string command_name;
bool split_command_to_args(const char* command);
};

#endif /* _CONFIG_TYPES_H */
Loading

0 comments on commit 891df95

Please sign in to comment.