Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue: Refactor event_handler_manager #245

Open
wants to merge 2 commits into
base: vNext
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 37 additions & 57 deletions src/core/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,38 +264,31 @@ event_handler_manager::event_handler_manager(bool internal_thread_mode)
m_epfd = -1;
m_event_handler_tid = 0;

m_b_continue_running = true;
if (!internal_thread_mode) {
m_b_continue_running = true;
return;
}

m_epfd = SYSCALL(epoll_create, INITIAL_EVENTS_NUM);
BULLSEYE_EXCLUDE_BLOCK_START
if (m_epfd == -1) {
evh_logdbg("epoll_create failed on ibv device collection (errno=%d %m)", errno);
free_evh_resources();
throw_xlio_exception("epoll_create failed on ibv device collection");
}
BULLSEYE_EXCLUDE_BLOCK_END

m_b_continue_running = true;

wakeup_set_epoll_fd(m_epfd);
going_to_sleep();
}

event_handler_manager::~event_handler_manager()
{
free_evh_resources();
}

void event_handler_manager::free_evh_resources()
{
evh_logfunc("");

// Flag thread to stop on next loop
stop_thread();
evh_logfunc("Thread stopped");

if (m_epfd >= 0) {
SYSCALL(close, m_epfd);
}
m_epfd = -1;
}

// event handler main thread startup
Expand Down Expand Up @@ -344,56 +337,53 @@ int event_handler_manager::start_thread()
{
cpu_set_t cpu_set;
pthread_attr_t tattr;
int ret;
bool affinity_requested = false;

if (!m_b_continue_running) {
errno = ECANCELED;
return -1;
}

if (m_event_handler_tid != 0) {
return 0;
}

// m_reg_action_q.reserve(); //todo change to vector and reserve

BULLSEYE_EXCLUDE_BLOCK_START
if (pthread_attr_init(&tattr)) {
evh_logpanic("Failed to initialize thread attributes");
ret = pthread_attr_init(&tattr);
if (ret != 0) {
return -1;
}
BULLSEYE_EXCLUDE_BLOCK_END

cpu_set = safe_mce_sys().internal_thread_affinity;
if (strcmp(safe_mce_sys().internal_thread_affinity_str, "-1") &&
!strcmp(safe_mce_sys().internal_thread_cpuset,
MCE_DEFAULT_INTERNAL_THREAD_CPUSET)) { // no affinity
BULLSEYE_EXCLUDE_BLOCK_START
if (pthread_attr_setaffinity_np(&tattr, sizeof(cpu_set), &cpu_set)) {
evh_logpanic("Failed to set CPU affinity");
!strcmp(safe_mce_sys().internal_thread_cpuset, MCE_DEFAULT_INTERNAL_THREAD_CPUSET)) {
ret = pthread_attr_setaffinity_np(&tattr, sizeof(cpu_set), &cpu_set);
if (ret != 0) {
evh_logwarn("Failed to set event handler thread affinity. (errno=%d)", errno);
}
BULLSEYE_EXCLUDE_BLOCK_END
affinity_requested = (ret == 0);
} else {
evh_logdbg("Internal thread affinity not set.");
}

int ret = pthread_create(&m_event_handler_tid, &tattr, event_handler_thread, this);
if (ret) {
// maybe it's the cset issue? try without affinity
evh_logwarn("Failed to start event handler thread with thread affinity - trying without. "
"[errno=%d %s]",
ret, strerror(ret));
BULLSEYE_EXCLUDE_BLOCK_START
if (pthread_attr_init(&tattr)) {
evh_logpanic("Failed to initialize thread attributes");
}
if (pthread_create(&m_event_handler_tid, &tattr, event_handler_thread, this)) {
evh_logpanic("Failed to start event handler thread");
}
BULLSEYE_EXCLUDE_BLOCK_END
ret = pthread_create(&m_event_handler_tid, &tattr, event_handler_thread, this);
if (ret != 0 && affinity_requested) {
// Try without affinity in case this is a cset issue.
evh_logwarn("Failed to start event handler thread with a thread affinity. Trying default "
"thread affinity. (errno=%d)",
errno);
pthread_attr_destroy(&tattr);
ret = pthread_attr_init(&tattr)
?: pthread_create(&m_event_handler_tid, &tattr, event_handler_thread, this);
}

// Destroy will either succeed or return EINVAL if the init fails in the above block.
pthread_attr_destroy(&tattr);

evh_logdbg("Started event handler thread");
return 0;
if (ret == 0) {
evh_logdbg("Started event handler thread.");
} else {
evh_logerr("Failed to start event handler thread. (errno=%d)", errno);
}
return ret;
}

void event_handler_manager::stop_thread()
Expand All @@ -410,18 +400,12 @@ void event_handler_manager::stop_thread()
// Wait for thread exit
if (m_event_handler_tid) {
pthread_join(m_event_handler_tid, nullptr);
evh_logdbg("event handler thread stopped");
evh_logdbg("Event handler thread stopped.");
} else {
evh_logdbg("event handler thread not running");
evh_logdbg("Event handler thread not running.");
}
}
m_event_handler_tid = 0;

// Close main epfd and signaling socket
if (m_epfd >= 0) {
SYSCALL(close, m_epfd);
}
m_epfd = -1;
}

void event_handler_manager::update_epfd(int fd, int operation, int events)
Expand Down Expand Up @@ -483,15 +467,11 @@ void event_handler_manager::post_new_reg_action(reg_action_t &reg_action)
return;
}

start_thread();

evh_logfunc("add event action %s (%d)", reg_action_str(reg_action.type), reg_action.type);

bool is_empty = false;
bool is_empty;
m_reg_action_q_lock.lock();
if (m_p_reg_action_q_to_push_to->empty()) {
is_empty = true;
}
is_empty = m_p_reg_action_q_to_push_to->empty();
m_p_reg_action_q_to_push_to->push_back(reg_action);
m_reg_action_q_lock.unlock();
if (is_empty) {
Expand Down
4 changes: 1 addition & 3 deletions src/core/event/event_handler_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class event_handler_manager : public wakeup_pipe {
void register_command_event(int fd, command *cmd);

void *thread_loop();
int start_thread();
void stop_thread();
bool is_running() { return m_b_continue_running; };

Expand Down Expand Up @@ -251,12 +252,9 @@ class event_handler_manager : public wakeup_pipe {
void handle_registration_action(reg_action_t &reg_action);
void process_ibverbs_event(event_handler_map_t::iterator &i);
void process_rdma_cm_event(event_handler_map_t::iterator &i);
int start_thread();

void event_channel_post_process_for_rdma_events(void *p_event);
void *event_channel_pre_process_for_rdma_events(void *p_event_channel_handle, void **p_event);

void free_evh_resources(void);
};

extern event_handler_manager *g_p_event_handler_manager;
Expand Down
7 changes: 7 additions & 0 deletions src/core/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ static void do_global_ctors_helper()
{
static lock_spin_recursive g_globals_lock;
std::lock_guard<decltype(g_globals_lock)> lock(g_globals_lock);
int rc;

if (g_init_global_ctors_done) {
return;
Expand Down Expand Up @@ -1018,6 +1019,12 @@ static void do_global_ctors_helper()

// Create all global management objects
NEW_CTOR(g_p_event_handler_manager, event_handler_manager());
rc = g_p_event_handler_manager->start_thread();
if (rc != 0) {
BULLSEYE_EXCLUDE_BLOCK_START
throw_xlio_exception("Failed to start event handler thread.\n");
BULLSEYE_EXCLUDE_BLOCK_END
}

xlio_shmem_stats_open(&g_p_vlogger_level, &g_p_vlogger_details);
*g_p_vlogger_level = g_vlogger_level;
Expand Down