Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix-data-import
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecosta90 committed Nov 16, 2023
2 parents 31caa28 + 9ddfcff commit efbeda8
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 9 deletions.
28 changes: 27 additions & 1 deletion memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"run_count = %u\n"
"debug = %u\n"
"requests = %llu\n"
"rate_limit = %u\n"
"clients = %u\n"
"threads = %u\n"
"test_time = %u\n"
Expand Down Expand Up @@ -176,6 +177,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->run_count,
cfg->debug,
cfg->requests,
cfg->request_rate,
cfg->clients,
cfg->threads,
cfg->test_time,
Expand Down Expand Up @@ -233,6 +235,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co
jsonhandler->write_obj("run_count" ,"%u", cfg->run_count);
jsonhandler->write_obj("debug" ,"%u", cfg->debug);
jsonhandler->write_obj("requests" ,"%llu", cfg->requests);
jsonhandler->write_obj("rate_limit" ,"%u", cfg->request_rate);
jsonhandler->write_obj("clients" ,"%u", cfg->clients);
jsonhandler->write_obj("threads" ,"%u", cfg->threads);
jsonhandler->write_obj("test_time" ,"%u", cfg->test_time);
Expand Down Expand Up @@ -421,6 +424,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_tls_sni,
o_tls_protocols,
o_hdr_file_prefix,
o_rate_limiting,
o_help
};

Expand Down Expand Up @@ -489,6 +493,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "command", 1, 0, o_command },
{ "command-key-pattern", 1, 0, o_command_key_pattern },
{ "command-ratio", 1, 0, o_command_ratio },
{ "rate-limiting", 1, 0, o_rate_limiting },
{ NULL, 0, 0, 0 }
};

Expand Down Expand Up @@ -861,6 +866,15 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
}
break;
}
case o_rate_limiting: {
endptr = NULL;
cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10);
if (!cfg->request_rate || !endptr || *endptr != '\0') {
fprintf(stderr, "error: rate must be greater than zero.\n");
return -1;
}
break;
}
#ifdef USE_TLS
case o_tls:
cfg->tls = true;
Expand Down Expand Up @@ -948,7 +962,7 @@ void usage() {
" --key=FILE Use specified private key for TLS\n"
" --cacert=FILE Use specified CA certs bundle for TLS\n"
" --tls-skip-verify Skip verification of server certificate\n"
" --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'"
" --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'.\n"
" --sni=STRING Add an SNI header\n"
#endif
" -x, --run-count=NUMBER Number of full-test iterations to perform\n"
Expand All @@ -967,6 +981,8 @@ void usage() {
"Test Options:\n"
" -n, --requests=NUMBER Number of total requests per client (default: 10000)\n"
" use 'allkeys' to run on the entire key-range\n"
" --rate-limiting=NUMBER The max number of requests to make per second from an individual connection (default is unlimited rate).\n"
" If you use --rate-limiting and a very large rate is entered which cannot be met, memtier will do as many requests as possible per second.\n"
" -c, --clients=NUMBER Number of clients per thread (default: 50)\n"
" -t, --threads=NUMBER Number of threads (default: 4)\n"
" --test-time=SECS Number of seconds to run the test\n"
Expand Down Expand Up @@ -1348,6 +1364,16 @@ int main(int argc, char *argv[])
delete tmp_protocol;
}

// if user configured rate limiting, do some calculations
if (cfg.request_rate) {
/* Our event resolution is (at least) 50 events per second (event every >= 20 ml).
* When we calculate the number of request per interval, we are taking
* the upper bound and adjust the interval accordingly to get more accuracy */
cfg.request_per_interval = (cfg.request_rate + 50 - 1) / 50;
unsigned int events_per_second = cfg.request_rate / cfg.request_per_interval;
cfg.request_interval_microsecond = 1000000 / events_per_second;
benchmark_debug_log("Rate limiting configured to send %u requests per %u millisecond\n", cfg.request_per_interval, cfg.request_interval_microsecond / 1000);
}

#ifdef USE_TLS
// Initialize OpenSSL only if we're really going to use it.
Expand Down
3 changes: 3 additions & 0 deletions memtier_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct benchmark_config {
bool cluster_mode;
struct arbitrary_command_list* arbitrary_commands;
const char *hdr_prefix;
unsigned int request_rate;
unsigned int request_per_interval;
unsigned int request_interval_microsecond;
#ifdef USE_TLS
bool tls;
const char *tls_cert;
Expand Down
39 changes: 36 additions & 3 deletions shard_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
#include "event2/bufferevent_ssl.h"
#endif

void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
assert(sc != NULL);
sc->handle_timer_event();
}

void cluster_client_read_handler(bufferevent *bev, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;
Expand All @@ -66,7 +73,6 @@ void cluster_client_read_handler(bufferevent *bev, void *ctx)
void cluster_client_event_handler(bufferevent *bev, short events, void *ctx)
{
shard_connection *sc = (shard_connection *) ctx;

assert(sc != NULL);
sc->handle_event(events);
}
Expand Down Expand Up @@ -123,7 +129,7 @@ verify_request::~verify_request(void)
shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config,
struct event_base* event_base, abstract_protocol* abs_protocol) :
m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL),
m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected),
m_bev(NULL), m_request_per_cur_interval(0), m_pending_resp(0), m_connection_state(conn_disconnected),
m_hello(setup_done), m_authentication(setup_done), m_db_selection(setup_done), m_cluster_slots(setup_done) {
m_id = id;
m_conns_manager = conns_man;
Expand Down Expand Up @@ -341,6 +347,10 @@ request* shard_connection::pop_req() {
void shard_connection::push_req(request* req) {
m_pipeline->push(req);
m_pending_resp++;
if (m_config->request_rate) {
assert(m_request_per_cur_interval > 0);
m_request_per_cur_interval--;
}
}

bool shard_connection::is_conn_setup_done() {
Expand Down Expand Up @@ -486,21 +496,28 @@ void shard_connection::process_first_request() {
fill_pipeline();
}


void shard_connection::fill_pipeline(void)
{
struct timeval now;
gettimeofday(&now, NULL);

while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) {
if (!is_conn_setup_done()) {
send_conn_setup_commands(now);
return;
}

// don't exceed requests
if (m_conns_manager->hold_pipeline(m_id)) {
break;
}

// that's enough, we reached the rate limit
if (m_config->request_rate && m_request_per_cur_interval == 0) {
// return and skip on update events
return;
}

// client manage requests logic
m_conns_manager->create_request(now, m_id);
}
Expand All @@ -511,6 +528,9 @@ void shard_connection::fill_pipeline(void)
if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) {
benchmark_debug_log("%s Done, no requests to send no response to wait for\n", get_readable_id());
bufferevent_disable(m_bev, EV_WRITE|EV_READ);
if (m_config->request_rate) {
event_del(m_event_timer);
}
}
}
}
Expand All @@ -526,6 +546,14 @@ void shard_connection::handle_event(short events)
bufferevent_enable(m_bev, EV_READ|EV_WRITE);

if (!m_conns_manager->get_reqs_processed()) {
/* Set timer for request rate */
if (m_config->request_rate) {
struct timeval interval = { 0, (long int)m_config->request_interval_microsecond };
m_request_per_cur_interval = m_config->request_per_interval;
m_event_timer = event_new(m_event_base, -1, EV_PERSIST, cluster_client_timer_handler, (void *)this);
event_add(m_event_timer, &interval);
}

process_first_request();
} else {
benchmark_debug_log("reconnection complete, proceeding with test\n");
Expand Down Expand Up @@ -561,6 +589,11 @@ void shard_connection::handle_event(short events)
}
}

void shard_connection::handle_timer_event() {
m_request_per_cur_interval = m_config->request_per_interval;
fill_pipeline();
}

void shard_connection::send_wait_command(struct timeval* sent_time,
unsigned int num_slaves, unsigned int timeout) {
int cmd_size = 0;
Expand Down
4 changes: 4 additions & 0 deletions shard_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct verify_request : public request {
};

class shard_connection {
friend void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx);
friend void cluster_client_read_handler(bufferevent *bev, void *ctx);
friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx);

Expand Down Expand Up @@ -148,6 +149,7 @@ class shard_connection {
void fill_pipeline(void);

void handle_event(short evtype);
void handle_timer_event();

unsigned int m_id;
connections_manager* m_conns_manager;
Expand All @@ -160,9 +162,11 @@ class shard_connection {
struct sockaddr_un* m_unix_sockaddr;
struct bufferevent *m_bev;
struct event_base* m_event_base;
struct event* m_event_timer;

abstract_protocol* m_protocol;
std::queue<request *>* m_pipeline;
unsigned int m_request_per_cur_interval; // number requests to send during the current interval

int m_pending_resp;

Expand Down
14 changes: 9 additions & 5 deletions tests/include.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def ensure_tls_protocols(master_nodes_connections):


def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count,
overall_request_count):
overall_request_count, overall_request_delta=None):
failed_asserts = env.getNumberOfFailedAssertion()
try:
# assert correct exit code
Expand All @@ -25,8 +25,11 @@ def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_re
env.assertTrue(os.path.isfile('{0}/mb.stdout'.format(config.results_dir)))
env.assertTrue(os.path.isfile('{0}/mb.stderr'.format(config.results_dir)))
env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir)))
# assert we have the expected request count
env.assertEqual(overall_expected_request_count, overall_request_count)
if overall_request_delta is None:
# assert we have the expected request count
env.assertEqual(overall_expected_request_count, overall_request_count)
else:
env.assertAlmostEqual(overall_expected_request_count, overall_request_count,overall_request_delta)
finally:
if env.getNumberOfFailedAssertion() > failed_asserts:
debugPrintMemtierOnError(config, env)
Expand Down Expand Up @@ -108,13 +111,14 @@ def addTLSArgs(benchmark_specs, env):



def get_default_memtier_config(threads=10, clients=5, requests=1000):
def get_default_memtier_config(threads=10, clients=5, requests=1000, test_time=None):
config = {
"memtier_benchmark": {
"binary": MEMTIER_BINARY,
"threads": threads,
"clients": clients,
"requests": requests
"requests": requests,
"test_time": test_time
},
}
return config
Expand Down
39 changes: 39 additions & 0 deletions tests/tests_oss_simple_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,42 @@ def test_default_arbitrary_command_hset_multi_data_placeholders(env):
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count,
overall_request_count)

def test_default_set_get_rate_limited(env):
master_nodes_list = env.getMasterNodesList()
for client_count in [1,2,4]:
for thread_count in [1,2]:
rps_per_client = 100
test_time_secs = 5
overall_expected_rps = rps_per_client * client_count * thread_count * len(master_nodes_list)
overall_expected_request_count = test_time_secs * overall_expected_rps
# we give a 1 sec margin
request_delta = overall_expected_rps
# we will specify rate limit and the test time, which should help us get an approximate request count
benchmark_specs = {"name": env.testName, "args": ['--rate-limiting={}'.format(rps_per_client)]}
addTLSArgs(benchmark_specs, env)
config = get_default_memtier_config(thread_count,client_count,None,test_time_secs)

master_nodes_connections = env.getOSSMasterNodesConnectionList()

# reset the commandstats
for master_connection in master_nodes_connections:
master_connection.execute_command("CONFIG", "RESETSTAT")

add_required_env_arguments(benchmark_specs, config, env, master_nodes_list)

# Create a temporary directory
test_dir = tempfile.mkdtemp()

config = RunConfig(test_dir, env.testName, config, {})
ensure_clean_benchmark_folder(config.results_dir)

benchmark = Benchmark.from_json(config, benchmark_specs)

# benchmark.run() returns True if the return code of memtier_benchmark was 0
memtier_ok = benchmark.run()

master_nodes_connections = env.getOSSMasterNodesConnectionList()
merged_command_stats = {'cmdstat_set': {'calls': 0}, 'cmdstat_get': {'calls': 0}}
overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats)
assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count, request_delta)

0 comments on commit efbeda8

Please sign in to comment.