Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[eventd] Fix eventd UT flakiness #17055

Merged
merged 17 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sonic-eventd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ endif
-include rsyslog_plugin/subdir.mk
-include rsyslog_plugin_tests/subdir.mk

all: sonic-eventd eventd-tool rsyslog-plugin
all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests

sonic-eventd: $(OBJS)
@echo 'Building target: $@'
Expand Down
4 changes: 2 additions & 2 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,9 @@ capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst
switch(ctrl) {
case INIT_CAPTURE:
m_thr = thread(&capture_service::do_capture, this);
for(int i=0; !m_cap_run && (i < 100); ++i) {
for(int i=0; !m_cap_run && (i < CAPTURE_SERVICE_POLLING_RETRIES); ++i) {
/* Wait max a second for thread to init */
this_thread::sleep_for(chrono::milliseconds(10));
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION));
}
RET_ON_ERR(m_cap_run, "Failed to init capture");
m_ctrl = ctrl;
Expand Down
2 changes: 2 additions & 0 deletions src/sonic-eventd/src/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ typedef enum {

#define EVENTS_STATS_FIELD_NAME "value"
#define STATS_HEARTBEAT_MIN 300
#define CAPTURE_SERVICE_POLLING_DURATION 10
#define CAPTURE_SERVICE_POLLING_RETRIES 100

/*
* Started by eventd_service.
Expand Down
72 changes: 48 additions & 24 deletions src/sonic-eventd/tests/eventd_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,23 @@ static const test_data_t ldata[] = {


void run_cap(void *zctx, bool &term, string &read_source,
int &cnt)
int &cnt, bool &should_read_control)
{
void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
string source;
internal_event_t ev_int;
int block_ms = 200;
int i=0;
static int proxy_finished_init = false;

EXPECT_TRUE(NULL != mock_cap);
EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str()));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));

if(!proxy_finished_init) {
if(should_read_control) {
zmq_msg_t msg;
zmq_msg_init(&msg);
EXPECT_EQ(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message
proxy_finished_init = true;
EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture
}

while(!term) {
Expand Down Expand Up @@ -227,10 +225,10 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
}
}


TEST(eventd, proxy)
{
printf("Proxy TEST started\n");
bool should_read_control = false;
bool term_sub = false;
bool term_cap = false;
string rd_csource, rd_source, wr_source("hello");
Expand All @@ -247,12 +245,12 @@ TEST(eventd, proxy)
/* Starting proxy */
EXPECT_EQ(0, pxy->init());

/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));

/* subscriber in a thread */
thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz));

/* capture in a thread */
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz));

/* Init pub connection */
void *mock_pub = init_pub(zctx);

Expand All @@ -275,9 +273,6 @@ TEST(eventd, proxy)
}
this_thread::sleep_for(chrono::milliseconds(1000));

delete pxy;
pxy = NULL;

term_sub = true;
term_cap = true;

Expand All @@ -287,6 +282,18 @@ TEST(eventd, proxy)
EXPECT_EQ(rd_cevts_sz, wr_evts.size());

zmq_close(mock_pub);

/* Do control test */

should_read_control = true;

/* capture in a thread */
thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));

delete pxy;
pxy = NULL;

thrcc.join();
zmq_ctx_term(zctx);

/* Provide time for async proxy removal to complete */
Expand All @@ -295,7 +302,6 @@ TEST(eventd, proxy)
printf("eventd_proxy is tested GOOD\n");
}


TEST(eventd, capture)
{
printf("Capture TEST started\n");
Expand Down Expand Up @@ -329,9 +335,6 @@ TEST(eventd, capture)
/* Starting proxy */
EXPECT_EQ(0, pxy->init());

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Create capture service */
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);

Expand All @@ -341,6 +344,9 @@ TEST(eventd, capture)
/* Initialize the capture */
EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE));

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

EXPECT_TRUE(init_cache > 1);
EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata));

Expand Down Expand Up @@ -473,9 +479,6 @@ TEST(eventd, captureCacheMax)
/* Starting proxy */
EXPECT_EQ(0, pxy->init());

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Create capture service */
capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance);

Expand All @@ -484,6 +487,9 @@ TEST(eventd, captureCacheMax)

EXPECT_TRUE(init_cache > 1);

/* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz));

/* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i]));
Expand Down Expand Up @@ -595,6 +601,7 @@ TEST(eventd, service)
}

thread thread_service(&run_eventd_service);
this_thread::sleep_for(chrono::milliseconds(CAPTURE_SERVICE_POLLING_DURATION * CAPTURE_SERVICE_POLLING_RETRIES));

/* Need client side service to interact with server side */
EXPECT_EQ(0, service.init_client(zctx));
Expand All @@ -610,7 +617,7 @@ TEST(eventd, service)
string wr_source("hello");

/* Test service startup caching */
event_serialized_lst_t evts_start, evts_read;
event_serialized_lst_t evts_start, evts_read, polled_events;

for(int i=0; i<wr_sz; ++i) {
string evt_str;
Expand All @@ -624,15 +631,32 @@ TEST(eventd, service)
/* Publish events. */
run_pub(mock_pub, wr_source, wr_evts);

/* Published events must have been captured. Give a pause, to ensure sent. */
this_thread::sleep_for(chrono::milliseconds(200));
int max_polling_duration = 2000;
int polling_interval = 100;
auto poll_start_ts = chrono::steady_clock::now();

while(true) {
auto current_ts = chrono::steady_clock::now();
if(chrono::duration_cast<chrono::milliseconds>(current_ts - poll_start_ts).count() >= max_polling_duration) {
break;
}
event_serialized_lst_t read_events;
service.cache_read(read_events);
polled_events.insert(polled_events.end(), read_events.begin(), read_events.end());
if (!read_events.empty()) {
break;
}
this_thread::sleep_for(chrono::milliseconds(polling_interval));
}

EXPECT_EQ(0, service.cache_stop());

/* Read the cache; expect wr_sz events */
/* Read remaining events in cache, if any */
EXPECT_EQ(0, service.cache_read(evts_read));

EXPECT_EQ(evts_read, evts_start);
polled_events.insert(polled_events.end(), evts_read.begin(), evts_read.end());

EXPECT_EQ(polled_events, evts_start);

zmq_close(mock_pub);
}
Expand Down
Loading