Skip to content

Commit

Permalink
Add rate limiting to control the number of requests per second (Redis…
Browse files Browse the repository at this point in the history
…Labs#237)

* Add rate limiting to control the number of requests per second

A new 'rate-limiting' option was added to control the number of
request per second.

The rate limiting is based on the 'Token Bucket' algorithm, and
according to the configured rate, on each interval, a "new" amount
of requests allowed to be sent to the server.

The rate-limiting is at the connection level. Therefore, in cluster mode,
the limitation is for each shard, so if, for example, the cluster has three
shards and the user configured one request per second. On every second,
memtier-benchmark will send three requests, one for each shard.

* Added tests to cover --rate-limiting option. Ensured the help message explains the per connection rate-limit

* Fixed per cluster rps test limits

---------

Co-authored-by: YaacovHazan <[email protected]>
Co-authored-by: filipecosta90 <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2023
1 parent 0325e00 commit 9ddfcff
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 9 deletions.
28 changes: 27 additions & 1 deletion memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"run_count = %u\n"
"debug = %u\n"
"requests = %llu\n"
"rate_limit = %u\n"
"clients = %u\n"
"threads = %u\n"
"test_time = %u\n"
Expand Down Expand Up @@ -176,6 +177,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->run_count,
cfg->debug,
cfg->requests,
cfg->request_rate,
cfg->clients,
cfg->threads,
cfg->test_time,
Expand Down Expand Up @@ -233,6 +235,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co
jsonhandler->write_obj("run_count" ,"%u", cfg->run_count);
jsonhandler->write_obj("debug" ,"%u", cfg->debug);
jsonhandler->write_obj("requests" ,"%llu", cfg->requests);
jsonhandler->write_obj("rate_limit" ,"%u", cfg->request_rate);
jsonhandler->write_obj("clients" ,"%u", cfg->clients);
jsonhandler->write_obj("threads" ,"%u", cfg->threads);
jsonhandler->write_obj("test_time" ,"%u", cfg->test_time);
Expand Down Expand Up @@ -421,6 +424,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_tls_sni,
o_tls_protocols,
o_hdr_file_prefix,
o_rate_limiting,
o_help
};

Expand Down Expand Up @@ -489,6 +493,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "command", 1, 0, o_command },
{ "command-key-pattern", 1, 0, o_command_key_pattern },
{ "command-ratio", 1, 0, o_command_ratio },
{ "rate-limiting", 1, 0, o_rate_limiting },
{ NULL, 0, 0, 0 }
};

Expand Down Expand Up @@ -861,6 +866,15 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
}
break;
}
case o_rate_limiting: {
endptr = NULL;
cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10);
if (!cfg->request_rate || !endptr || *endptr != '\0') {
fprintf(stderr, "error: rate must be greater than zero.\n");
return -1;
}
break;
}
#ifdef USE_TLS
case o_tls:
cfg->tls = true;
Expand Down Expand Up @@ -948,7 +962,7 @@ void usage() {
" --key=FILE Use specified private key for TLS\n"
" --cacert=FILE Use specified CA certs bundle for TLS\n"
" --tls-skip-verify Skip verification of server certificate\n"
" --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'"
" --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'.\n"
" --sni=STRING Add an SNI header\n"
#endif
" -x, --run-count=NUMBER Number of full-test iterations to perform\n"
Expand All @@ -967,6 +981,8 @@ void usage() {
"Test Options:\n"
" -n, --requests=NUMBER Number of total requests per client (default: 10000)\n"
" use 'allkeys' to run on the entire key-range\n"
" --rate-limiting=NUMBER The max number of requests to make per second from an individual connection (default is unlimited rate).\n"
" If you use --rate-limiting and a very large rate is entered which cannot be met, memtier will do as many requests as possible per second.\n"
" -c, --clients=NUMBER Number of clients per thread (default: 50)\n"
" -t, --threads=NUMBER Number of threads (default: 4)\n"
" --test-time=SECS Number of seconds to run the test\n"
Expand Down Expand Up @@ -1348,6 +1364,16 @@ int main(int argc, char *argv[])
delete tmp_protocol;
}

// if user configured rate limiting, do some calculations
if (cfg.request_rate) {
/* Our event resolution is (at least) 50 events per second (event every >= 20 ml).
* When we calculate the number of request per interval, we are taking
* the upper bound and adjust the interval accordingly to get more accuracy */
cfg.request_per_interval = (cfg.request_rate + 50 - 1) / 50;
unsigned int events_per_second = cfg.request_rate / cfg.request_per_interval;
cfg.request_interval_microsecond = 1000000 / events_per_second;
benchmark_debug_log("Rate limiting configured to send %u requests per %u millisecond\n", cfg.request_per_interval, cfg.request_interval_microsecond / 1000);
}

#ifdef USE_TLS
// Initialize OpenSSL only if we're really going to use it.
Expand Down
3 changes: 3 additions & 0 deletions memtier_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct benchmark_config {
bool cluster_mode;
struct arbitrary_command_list* arbitrary_commands;
const char *hdr_prefix;
unsigned int request_rate;
unsigned int request_per_interval;
unsigned int request_interval_microsecond;
#ifdef USE_TLS
bool tls;
const char *tls_cert;
Expand Down
39 changes: 36 additions & 3 deletions shard_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
#include "event2/bufferevent_ssl.h"
#endif

void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
assert(sc != NULL);
sc->handle_timer_event();
}

void cluster_client_read_handler(bufferevent *bev, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
Expand All @@ -66,7 +73,6 @@ void cluster_client_read_handler(bufferevent *bev, void *ctx)
void cluster_client_event_handler(bufferevent *bev, short events, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;

assert(sc != NULL);
sc->handle_event(events);
}
Expand Down Expand Up @@ -123,7 +129,7 @@ verify_request::~verify_request(void)
shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config,
struct event_base* event_base, abstract_protocol* abs_protocol) :
m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL),
m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected),
m_bev(NULL), m_request_per_cur_interval(0), m_pending_resp(0), m_connection_state(conn_disconnected),
m_hello(setup_done), m_authentication(setup_done), m_db_selection(setup_done), m_cluster_slots(setup_done) {
m_id = id;
m_conns_manager = conns_man;
Expand Down Expand Up @@ -341,6 +347,10 @@ request* shard_connection::pop_req() {
void shard_connection::push_req(request* req) {
m_pipeline->push(req);
m_pending_resp++;
if (m_config->request_rate) {
assert(m_request_per_cur_interval > 0);
m_request_per_cur_interval--;
}
}

bool shard_connection::is_conn_setup_done() {
Expand Down Expand Up @@ -486,21 +496,28 @@ void shard_connection::process_first_request() {
fill_pipeline();
}


void shard_connection::fill_pipeline(void)
{
struct timeval now;
gettimeofday(&now, NULL);

while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) {
if (!is_conn_setup_done()) {
send_conn_setup_commands(now);
return;
}

// don't exceed requests
if (m_conns_manager->hold_pipeline(m_id)) {
break;
}

// that's enough, we reached the rate limit
if (m_config->request_rate && m_request_per_cur_interval == 0) {
// return and skip on update events
return;
}

// client manage requests logic
m_conns_manager->create_request(now, m_id);
}
Expand All @@ -511,6 +528,9 @@ void shard_connection::fill_pipeline(void)
if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) {
benchmark_debug_log("%s Done, no requests to send no response to wait for\n", get_readable_id());
bufferevent_disable(m_bev, EV_WRITE|EV_READ);
if (m_config->request_rate) {
event_del(m_event_timer);
}
}
}
}
Expand All @@ -526,6 +546,14 @@ void shard_connection::handle_event(short events)
bufferevent_enable(m_bev, EV_READ|EV_WRITE);

if (!m_conns_manager->get_reqs_processed()) {
/* Set timer for request rate */
if (m_config->request_rate) {
struct timeval interval = { 0, (long int)m_config->request_interval_microsecond };
m_request_per_cur_interval = m_config->request_per_interval;
m_event_timer = event_new(m_event_base, -1, EV_PERSIST, cluster_client_timer_handler, (void *)this);
event_add(m_event_timer, &interval);
}

process_first_request();
} else {
benchmark_debug_log("reconnection complete, proceeding with test\n");
Expand Down Expand Up @@ -561,6 +589,11 @@ void shard_connection::handle_event(short events)
}
}

void shard_connection::handle_timer_event() {
m_request_per_cur_interval = m_config->request_per_interval;
fill_pipeline();
}

void shard_connection::send_wait_command(struct timeval* sent_time,
unsigned int num_slaves, unsigned int timeout) {
int cmd_size = 0;
Expand Down
4 changes: 4 additions & 0 deletions shard_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct verify_request : public request {
};

class shard_connection {
friend void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx);
friend void cluster_client_read_handler(bufferevent *bev, void *ctx);
friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx);

Expand Down Expand Up @@ -148,6 +149,7 @@ class shard_connection {
void fill_pipeline(void);

void handle_event(short evtype);
void handle_timer_event();

unsigned int m_id;
connections_manager* m_conns_manager;
Expand All @@ -160,9 +162,11 @@ class shard_connection {
struct sockaddr_un* m_unix_sockaddr;
struct bufferevent *m_bev;
struct event_base* m_event_base;
struct event* m_event_timer;

abstract_protocol* m_protocol;
std::queue<request *>* m_pipeline;
unsigned int m_request_per_cur_interval; // number requests to send during the current interval

int m_pending_resp;

Expand Down
14 changes: 9 additions & 5 deletions tests/include.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def ensure_tls_protocols(master_nodes_connections):


def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count,
overall_request_count):
overall_request_count, overall_request_delta=None):
failed_asserts = env.getNumberOfFailedAssertion()
try:
# assert correct exit code
Expand All @@ -25,8 +25,11 @@ def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_re
env.assertTrue(os.path.isfile('{0}/mb.stdout'.format(config.results_dir)))
env.assertTrue(os.path.isfile('{0}/mb.stderr'.format(config.results_dir)))
env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir)))
# assert we have the expected request count
env.assertEqual(overall_expected_request_count, overall_request_count)
if overall_request_delta is None:
# assert we have the expected request count
env.assertEqual(overall_expected_request_count, overall_request_count)
else:
env.assertAlmostEqual(overall_expected_request_count, overall_request_count,overall_request_delta)
finally:
if env.getNumberOfFailedAssertion() > failed_asserts:
debugPrintMemtierOnError(config, env)
Expand Down Expand Up @@ -108,13 +111,14 @@ def addTLSArgs(benchmark_specs, env):



def get_default_memtier_config(threads=10, clients=5, requests=1000):
def get_default_memtier_config(threads=10, clients=5, requests=1000, test_time=None):
config = {
"memtier_benchmark": {
"binary": MEMTIER_BINARY,
"threads": threads,
"clients": clients,
"requests": requests
"requests": requests,
"test_time": test_time
},
}
return config
Expand Down
39 changes: 39 additions & 0 deletions tests/tests_oss_simple_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,42 @@ def test_default_arbitrary_command_hset_multi_data_placeholders(env):
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count,
overall_request_count)

def test_default_set_get_rate_limited(env):
master_nodes_list = env.getMasterNodesList()
for client_count in [1,2,4]:
for thread_count in [1,2]:
rps_per_client = 100
test_time_secs = 5
overall_expected_rps = rps_per_client * client_count * thread_count * len(master_nodes_list)
overall_expected_request_count = test_time_secs * overall_expected_rps
# we give a 1 sec margin
request_delta = overall_expected_rps
# we will specify rate limit and the test time, which should help us get an approximate request count
benchmark_specs = {"name": env.testName, "args": ['--rate-limiting={}'.format(rps_per_client)]}
addTLSArgs(benchmark_specs, env)
config = get_default_memtier_config(thread_count,client_count,None,test_time_secs)

master_nodes_connections = env.getOSSMasterNodesConnectionList()

# reset the commandstats
for master_connection in master_nodes_connections:
master_connection.execute_command("CONFIG", "RESETSTAT")

add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)

# Create a temporary directory
test_dir = tempfile.mkdtemp()

config = RunConfig(test_dir, env.testName, config, {})
ensure_clean_benchmark_folder(config.results_dir)

benchmark = Benchmark.from_json(config, benchmark_specs)

# benchmark.run() returns True if the return code of memtier_benchmark was 0
memtier_ok = benchmark.run()

master_nodes_connections = env.getOSSMasterNodesConnectionList()
merged_command_stats = {'cmdstat_set': {'calls': 0}, 'cmdstat_get': {'calls': 0}}
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count, request_delta)

0 comments on commit 9ddfcff

Please sign in to comment.