Skip to content

Commit

Permalink
Add rate limiting to control the number of requests per second
Browse files Browse the repository at this point in the history
A new 'rate-limiting' option was added to control the number of
request per second.

The rate limiting is based on the 'Token Bucket' algorithm, and
according to the configured rate, on each interval, a "new" amount
of requests allowed to be sent to the server.

The rate-limiting is at the connection level. Therefore, in cluster mode,
the limitation is for each shard, so if, for example, the cluster has three
shards and the user configured one request per second. On every second,
memtier-benchmark will send three requests, one for each shard.
  • Loading branch information
YaacovHazan committed Oct 15, 2023
1 parent bec3471 commit e21eff3
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 3 deletions.
25 changes: 25 additions & 0 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"run_count = %u\n"
"debug = %u\n"
"requests = %llu\n"
"rate_limit = %u\n"
"clients = %u\n"
"threads = %u\n"
"test_time = %u\n"
Expand Down Expand Up @@ -176,6 +177,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->run_count,
cfg->debug,
cfg->requests,
cfg->request_rate,
cfg->clients,
cfg->threads,
cfg->test_time,
Expand Down Expand Up @@ -233,6 +235,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co
jsonhandler->write_obj("run_count" ,"%u", cfg->run_count);
jsonhandler->write_obj("debug" ,"%u", cfg->debug);
jsonhandler->write_obj("requests" ,"%llu", cfg->requests);
jsonhandler->write_obj("rate_limit" ,"%u", cfg->request_rate);
jsonhandler->write_obj("clients" ,"%u", cfg->clients);
jsonhandler->write_obj("threads" ,"%u", cfg->threads);
jsonhandler->write_obj("test_time" ,"%u", cfg->test_time);
Expand Down Expand Up @@ -421,6 +424,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_tls_sni,
o_tls_protocols,
o_hdr_file_prefix,
o_rate_limiting,
o_help
};

Expand Down Expand Up @@ -489,6 +493,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "command", 1, 0, o_command },
{ "command-key-pattern", 1, 0, o_command_key_pattern },
{ "command-ratio", 1, 0, o_command_ratio },
{ "rate-limiting", 1, 0, o_rate_limiting },
{ NULL, 0, 0, 0 }
};

Expand Down Expand Up @@ -861,6 +866,15 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
}
break;
}
case o_rate_limiting: {
endptr = NULL;
cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10);
if (!cfg->request_rate || !endptr || *endptr != '\0') {
fprintf(stderr, "error: rate must be greater than zero.\n");
return -1;
}
break;
}
#ifdef USE_TLS
case o_tls:
cfg->tls = true;
Expand Down Expand Up @@ -967,6 +981,7 @@ void usage() {
"Test Options:\n"
" -n, --requests=NUMBER Number of total requests per client (default: 10000)\n"
" use 'allkeys' to run on the entire key-range\n"
" --rate-limiting=NUMBER Number of requests per second\n"
" -c, --clients=NUMBER Number of clients per thread (default: 50)\n"
" -t, --threads=NUMBER Number of threads (default: 4)\n"
" --test-time=SECS Number of seconds to run the test\n"
Expand Down Expand Up @@ -1348,6 +1363,16 @@ int main(int argc, char *argv[])
delete tmp_protocol;
}

// if user configured rate limiting, do some calculations
if (cfg.request_rate) {
/* Our event resolution is (at least) 50 events per second (event every >= 20 ml).
* When we calculate the number of request per interval, we are taking
* the upper bound and adjust the interval accordingly to get more accuracy */
cfg.request_per_interval = (cfg.request_rate + 50 - 1) / 50;
unsigned int events_per_second = cfg.request_rate / cfg.request_per_interval;
cfg.request_interval_microsecond = 1000000 / events_per_second;
benchmark_debug_log("Rate limiting configured to send %u requests per %u millisecond\n", cfg.request_per_interval, cfg.request_interval_microsecond / 1000);
}

#ifdef USE_TLS
// Initialize OpenSSL only if we're really going to use it.
Expand Down
3 changes: 3 additions & 0 deletions memtier_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct benchmark_config {
bool cluster_mode;
struct arbitrary_command_list* arbitrary_commands;
const char *hdr_prefix;
unsigned int request_rate;
unsigned int request_per_interval;
unsigned int request_interval_microsecond;
#ifdef USE_TLS
bool tls;
const char *tls_cert;
Expand Down
39 changes: 36 additions & 3 deletions shard_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
#include "event2/bufferevent_ssl.h"
#endif

void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
assert(sc != NULL);
sc->handle_timer_event();
}

void cluster_client_read_handler(bufferevent *bev, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
Expand All @@ -66,7 +73,6 @@ void cluster_client_read_handler(bufferevent *bev, void *ctx)
void cluster_client_event_handler(bufferevent *bev, short events, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;

assert(sc != NULL);
sc->handle_event(events);
}
Expand Down Expand Up @@ -123,7 +129,7 @@ verify_request::~verify_request(void)
shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config,
struct event_base* event_base, abstract_protocol* abs_protocol) :
m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL),
m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected),
m_bev(NULL), m_request_per_cur_interval(0), m_pending_resp(0), m_connection_state(conn_disconnected),
m_hello(setup_done), m_authentication(setup_done), m_db_selection(setup_done), m_cluster_slots(setup_done) {
m_id = id;
m_conns_manager = conns_man;
Expand Down Expand Up @@ -341,6 +347,10 @@ request* shard_connection::pop_req() {
void shard_connection::push_req(request* req) {
m_pipeline->push(req);
m_pending_resp++;
if (m_config->request_rate) {
assert(m_request_per_cur_interval > 0);
m_request_per_cur_interval--;
}
}

bool shard_connection::is_conn_setup_done() {
Expand Down Expand Up @@ -486,21 +496,28 @@ void shard_connection::process_first_request() {
fill_pipeline();
}


void shard_connection::fill_pipeline(void)
{
struct timeval now;
gettimeofday(&now, NULL);

while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) {
if (!is_conn_setup_done()) {
send_conn_setup_commands(now);
return;
}

// don't exceed requests
if (m_conns_manager->hold_pipeline(m_id)) {
break;
}

// that's enough, we reached the rate limit
if (m_config->request_rate && m_request_per_cur_interval == 0) {
// return and skip on update events
return;
}

// client manage requests logic
m_conns_manager->create_request(now, m_id);
}
Expand All @@ -511,6 +528,9 @@ void shard_connection::fill_pipeline(void)
if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) {
benchmark_debug_log("%s Done, no requests to send no response to wait for\n", get_readable_id());
bufferevent_disable(m_bev, EV_WRITE|EV_READ);
if (m_config->request_rate) {
event_del(m_event_timer);
}
}
}
}
Expand All @@ -526,6 +546,14 @@ void shard_connection::handle_event(short events)
bufferevent_enable(m_bev, EV_READ|EV_WRITE);

if (!m_conns_manager->get_reqs_processed()) {
/* Set timer for request rate */
if (m_config->request_rate) {
struct timeval interval = { 0, (long int)m_config->request_interval_microsecond };
m_request_per_cur_interval = m_config->request_per_interval;
m_event_timer = event_new(m_event_base, -1, EV_PERSIST, cluster_client_timer_handler, (void *)this);
event_add(m_event_timer, &interval);
}

process_first_request();
} else {
benchmark_debug_log("reconnection complete, proceeding with test\n");
Expand Down Expand Up @@ -561,6 +589,11 @@ void shard_connection::handle_event(short events)
}
}

void shard_connection::handle_timer_event() {
m_request_per_cur_interval = m_config->request_per_interval;
fill_pipeline();
}

void shard_connection::send_wait_command(struct timeval* sent_time,
unsigned int num_slaves, unsigned int timeout) {
int cmd_size = 0;
Expand Down
4 changes: 4 additions & 0 deletions shard_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct verify_request : public request {
};

class shard_connection {
friend void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx);
friend void cluster_client_read_handler(bufferevent *bev, void *ctx);
friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx);

Expand Down Expand Up @@ -148,6 +149,7 @@ class shard_connection {
void fill_pipeline(void);

void handle_event(short evtype);
void handle_timer_event();

unsigned int m_id;
connections_manager* m_conns_manager;
Expand All @@ -160,9 +162,11 @@ class shard_connection {
struct sockaddr_un* m_unix_sockaddr;
struct bufferevent *m_bev;
struct event_base* m_event_base;
struct event* m_event_timer;

abstract_protocol* m_protocol;
std::queue<request *>* m_pipeline;
unsigned int m_request_per_cur_interval; // number requests to send during the current interval

int m_pending_resp;

Expand Down

0 comments on commit e21eff3

Please sign in to comment.