diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index f7e0da31..26e585cf 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -127,6 +127,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg) "run_count = %u\n" "debug = %u\n" "requests = %llu\n" + "rate_limit = %u\n" "clients = %u\n" "threads = %u\n" "test_time = %u\n" @@ -176,6 +177,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg) cfg->run_count, cfg->debug, cfg->requests, + cfg->request_rate, cfg->clients, cfg->threads, cfg->test_time, @@ -233,6 +235,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co jsonhandler->write_obj("run_count" ,"%u", cfg->run_count); jsonhandler->write_obj("debug" ,"%u", cfg->debug); jsonhandler->write_obj("requests" ,"%llu", cfg->requests); + jsonhandler->write_obj("rate_limit" ,"%u", cfg->request_rate); jsonhandler->write_obj("clients" ,"%u", cfg->clients); jsonhandler->write_obj("threads" ,"%u", cfg->threads); jsonhandler->write_obj("test_time" ,"%u", cfg->test_time); @@ -421,6 +424,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf o_tls_sni, o_tls_protocols, o_hdr_file_prefix, + o_rate_limiting, o_help }; @@ -489,6 +493,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf { "command", 1, 0, o_command }, { "command-key-pattern", 1, 0, o_command_key_pattern }, { "command-ratio", 1, 0, o_command_ratio }, + { "rate-limiting", 1, 0, o_rate_limiting }, { NULL, 0, 0, 0 } }; @@ -861,6 +866,15 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf } break; } + case o_rate_limiting: { + endptr = NULL; + cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10); + if (!cfg->request_rate || !endptr || *endptr != '\0') { + fprintf(stderr, "error: rate must be greater than zero.\n"); + return -1; + } + break; + } #ifdef USE_TLS case o_tls: cfg->tls = true; @@ -948,7 +962,7 @@ void usage() { " --key=FILE Use specified private key for TLS\n" " --cacert=FILE Use specified CA certs bundle for TLS\n" " --tls-skip-verify Skip verification of server certificate\n" - " --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'" + " --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'.\n" " --sni=STRING Add an SNI header\n" #endif " -x, --run-count=NUMBER Number of full-test iterations to perform\n" @@ -967,6 +981,8 @@ void usage() { "Test Options:\n" " -n, --requests=NUMBER Number of total requests per client (default: 10000)\n" " use 'allkeys' to run on the entire key-range\n" + " --rate-limiting=NUMBER The max number of requests to make per second from an individual connection (default is unlimited rate).\n" + " If you use --rate-limiting and a very large rate is entered which cannot be met, memtier will do as many requests as possible per second.\n" " -c, --clients=NUMBER Number of clients per thread (default: 50)\n" " -t, --threads=NUMBER Number of threads (default: 4)\n" " --test-time=SECS Number of seconds to run the test\n" @@ -1348,6 +1364,16 @@ int main(int argc, char *argv[]) delete tmp_protocol; } + // if user configured rate limiting, do some calculations + if (cfg.request_rate) { + /* Our event resolution is (at least) 50 events per second (event every >= 20 ml). + * When we calculate the number of request per interval, we are taking + * the upper bound and adjust the interval accordingly to get more accuracy */ + cfg.request_per_interval = (cfg.request_rate + 50 - 1) / 50; + unsigned int events_per_second = cfg.request_rate / cfg.request_per_interval; + cfg.request_interval_microsecond = 1000000 / events_per_second; + benchmark_debug_log("Rate limiting configured to send %u requests per %u millisecond\n", cfg.request_per_interval, cfg.request_interval_microsecond / 1000); + } #ifdef USE_TLS // Initialize OpenSSL only if we're really going to use it. diff --git a/memtier_benchmark.h b/memtier_benchmark.h index d3e7e697..b7a71903 100644 --- a/memtier_benchmark.h +++ b/memtier_benchmark.h @@ -104,6 +104,9 @@ struct benchmark_config { bool cluster_mode; struct arbitrary_command_list* arbitrary_commands; const char *hdr_prefix; + unsigned int request_rate; + unsigned int request_per_interval; + unsigned int request_interval_microsecond; #ifdef USE_TLS bool tls; const char *tls_cert; diff --git a/shard_connection.cpp b/shard_connection.cpp index 2da4f2b0..9db09903 100644 --- a/shard_connection.cpp +++ b/shard_connection.cpp @@ -56,6 +56,13 @@ #include "event2/bufferevent_ssl.h" #endif +void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx) +{ + shard_connection *sc = (shard_connection *) ctx; + assert(sc != NULL); + sc->handle_timer_event(); +} + void cluster_client_read_handler(bufferevent *bev, void *ctx) { shard_connection *sc = (shard_connection *) ctx; @@ -66,7 +73,6 @@ void cluster_client_read_handler(bufferevent *bev, void *ctx) void cluster_client_event_handler(bufferevent *bev, short events, void *ctx) { shard_connection *sc = (shard_connection *) ctx; - assert(sc != NULL); sc->handle_event(events); } @@ -123,7 +129,7 @@ verify_request::~verify_request(void) shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config, struct event_base* event_base, abstract_protocol* abs_protocol) : m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL), - m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected), + m_bev(NULL), m_request_per_cur_interval(0), m_pending_resp(0), m_connection_state(conn_disconnected), m_hello(setup_done), m_authentication(setup_done), m_db_selection(setup_done), m_cluster_slots(setup_done) { m_id = id; m_conns_manager = conns_man; @@ -341,6 +347,10 @@ request* shard_connection::pop_req() { void shard_connection::push_req(request* req) { m_pipeline->push(req); m_pending_resp++; + if (m_config->request_rate) { + assert(m_request_per_cur_interval > 0); + m_request_per_cur_interval--; + } } bool shard_connection::is_conn_setup_done() { @@ -486,21 +496,28 @@ void shard_connection::process_first_request() { fill_pipeline(); } - void shard_connection::fill_pipeline(void) { struct timeval now; gettimeofday(&now, NULL); + while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) { if (!is_conn_setup_done()) { send_conn_setup_commands(now); return; } + // don't exceed requests if (m_conns_manager->hold_pipeline(m_id)) { break; } + // that's enough, we reached the rate limit + if (m_config->request_rate && m_request_per_cur_interval == 0) { + // return and skip on update events + return; + } + // client manage requests logic m_conns_manager->create_request(now, m_id); } @@ -511,6 +528,9 @@ void shard_connection::fill_pipeline(void) if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) { benchmark_debug_log("%s Done, no requests to send no response to wait for\n", get_readable_id()); bufferevent_disable(m_bev, EV_WRITE|EV_READ); + if (m_config->request_rate) { + event_del(m_event_timer); + } } } } @@ -526,6 +546,14 @@ void shard_connection::handle_event(short events) bufferevent_enable(m_bev, EV_READ|EV_WRITE); if (!m_conns_manager->get_reqs_processed()) { + /* Set timer for request rate */ + if (m_config->request_rate) { + struct timeval interval = { 0, (long int)m_config->request_interval_microsecond }; + m_request_per_cur_interval = m_config->request_per_interval; + m_event_timer = event_new(m_event_base, -1, EV_PERSIST, cluster_client_timer_handler, (void *)this); + event_add(m_event_timer, &interval); + } + process_first_request(); } else { benchmark_debug_log("reconnection complete, proceeding with test\n"); @@ -561,6 +589,11 @@ void shard_connection::handle_event(short events) } } +void shard_connection::handle_timer_event() { + m_request_per_cur_interval = m_config->request_per_interval; + fill_pipeline(); +} + void shard_connection::send_wait_command(struct timeval* sent_time, unsigned int num_slaves, unsigned int timeout) { int cmd_size = 0; diff --git a/shard_connection.h b/shard_connection.h index fa95030e..12fbaae3 100644 --- a/shard_connection.h +++ b/shard_connection.h @@ -76,6 +76,7 @@ struct verify_request : public request { }; class shard_connection { + friend void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx); friend void cluster_client_read_handler(bufferevent *bev, void *ctx); friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx); @@ -148,6 +149,7 @@ class shard_connection { void fill_pipeline(void); void handle_event(short evtype); + void handle_timer_event(); unsigned int m_id; connections_manager* m_conns_manager; @@ -160,9 +162,11 @@ class shard_connection { struct sockaddr_un* m_unix_sockaddr; struct bufferevent *m_bev; struct event_base* m_event_base; + struct event* m_event_timer; abstract_protocol* m_protocol; std::queue* m_pipeline; + unsigned int m_request_per_cur_interval; // number requests to send during the current interval int m_pending_resp; diff --git a/tests/include.py b/tests/include.py index 9cfb4c55..dd268883 100644 --- a/tests/include.py +++ b/tests/include.py @@ -16,7 +16,7 @@ def ensure_tls_protocols(master_nodes_connections): def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, - overall_request_count): + overall_request_count, overall_request_delta=None): failed_asserts = env.getNumberOfFailedAssertion() try: # assert correct exit code @@ -25,8 +25,11 @@ def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_re env.assertTrue(os.path.isfile('{0}/mb.stdout'.format(config.results_dir))) env.assertTrue(os.path.isfile('{0}/mb.stderr'.format(config.results_dir))) env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir))) - # assert we have the expected request count - env.assertEqual(overall_expected_request_count, overall_request_count) + if overall_request_delta is None: + # assert we have the expected request count + env.assertEqual(overall_expected_request_count, overall_request_count) + else: + env.assertAlmostEqual(overall_expected_request_count, overall_request_count,overall_request_delta) finally: if env.getNumberOfFailedAssertion() > failed_asserts: debugPrintMemtierOnError(config, env) @@ -108,13 +111,14 @@ def addTLSArgs(benchmark_specs, env): -def get_default_memtier_config(threads=10, clients=5, requests=1000): +def get_default_memtier_config(threads=10, clients=5, requests=1000, test_time=None): config = { "memtier_benchmark": { "binary": MEMTIER_BINARY, "threads": threads, "clients": clients, - "requests": requests + "requests": requests, + "test_time": test_time }, } return config diff --git a/tests/tests_oss_simple_flow.py b/tests/tests_oss_simple_flow.py index 30541869..f0743acd 100644 --- a/tests/tests_oss_simple_flow.py +++ b/tests/tests_oss_simple_flow.py @@ -378,3 +378,42 @@ def test_default_arbitrary_command_hset_multi_data_placeholders(env): overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats) assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count) + +def test_default_set_get_rate_limited(env): + master_nodes_list = env.getMasterNodesList() + for client_count in [1,2,4]: + for thread_count in [1,2]: + rps_per_client = 100 + test_time_secs = 5 + overall_expected_rps = rps_per_client * client_count * thread_count * len(master_nodes_list) + overall_expected_request_count = test_time_secs * overall_expected_rps + # we give a 1 sec margin + request_delta = overall_expected_rps + # we will specify rate limit and the test time, which should help us get an approximate request count + benchmark_specs = {"name": env.testName, "args": ['--rate-limiting={}'.format(rps_per_client)]} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config(thread_count,client_count,None,test_time_secs) + + master_nodes_connections = env.getOSSMasterNodesConnectionList() + + # reset the commandstats + for master_connection in master_nodes_connections: + master_connection.execute_command("CONFIG", "RESETSTAT") + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + # Create a temporary directory + test_dir = tempfile.mkdtemp() + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # benchmark.run() returns True if the return code of memtier_benchmark was 0 + memtier_ok = benchmark.run() + + master_nodes_connections = env.getOSSMasterNodesConnectionList() + merged_command_stats = {'cmdstat_set': {'calls': 0}, 'cmdstat_get': {'calls': 0}} + overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats) + assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count, request_delta)