Skip to content

Commit

Permalink
threadpool improvements:
Browse files Browse the repository at this point in the history
- add tp_thread_is_tp_thr();
- improve internal thread state handling;
- do not use pthread_t values directly in code;
- tp_shutdown_wait() and tp_destroy() now may return errors;
- add check that TP_F_ONESHOT and TP_F_DISPATCH not set together;
  • Loading branch information
rozhuk-im committed Apr 27, 2024
1 parent 4db42f6 commit 96cdf87
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 57 deletions.
16 changes: 12 additions & 4 deletions include/threadpool/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ typedef struct thread_pool_event_s { /* Thread pool event. */
#define TP_FF_RW_LOWAT (((uint32_t)1) << 0) /* For sockets: set SO_RCVLOWAT/SO_SNDLOWAT. */
#define TP_FF_RW_MASK 0x00000001u /* For internal use: fflags set mask. */
/* TP_EV_TIMER specific: if not set - the default is seconds. */
/* Data units selection ENUM for timer: select only one. */
#define TP_FF_T_SEC 0x00000000u /* data is seconds. */
#define TP_FF_T_MSEC 0x00000001u /* data is milliseconds. */
#define TP_FF_T_USEC 0x00000002u /* data is microseconds. */
#define TP_FF_T_NSEC 0x00000003u /* data is nanoseconds. */
#define TP_FF_T_TM_MASK 0x00000003u /* For internal use: fflags set mask for time units. */
/* Additional timer specific fflags. */
#define TP_FF_T_ABSTIME (((uint32_t)1) << 2) /* timeout is absolute. */
#define TP_FF_T_TM_MASK 0x00000003u /* For internal use: fflags set mask for time. */
#define TP_FF_T_MASK 0x00000007u /* For internal use: fflags set mask. */

static const char *tp_ff_time_units[] = { "s", "ms", "us", "ns", NULL };
Expand Down Expand Up @@ -144,25 +146,31 @@ int tp_settings_load_ini(const ini_p ini, const uint8_t *sect_name,
int tp_init(void);
int tp_create(tp_settings_p s, tp_p *ptp);

/* tp_shutdown() can be called by one of thread pool thread. */
void tp_shutdown(tp_p tp);
void tp_shutdown_wait(tp_p tp);
void tp_destroy(tp_p tp);
/* Next 2 functions can be called by thread pool thread due to deadlock. */
int tp_shutdown_wait(tp_p tp); /* Wait for all threads before return. */
int tp_destroy(tp_p tp);

int tp_threads_create(tp_p tp, int skip_first);
int tp_thread_attach_first(tp_p tp);
int tp_thread_dettach(tpt_p tpt);
size_t tp_thread_count_max_get(tp_p tp);
size_t tp_thread_count_get(tp_p tp);

/* Return tpt_p if caller thread is thread pool thread. */
tpt_p tp_thread_get_current(void);
/* Return non zero if tpt is one of tp threads.
* If tpt is NULL - tp_thread_get_current() used to get current thread tpt. */
int tp_thread_is_tp_thr(tp_p tp, tpt_p tpt);
tpt_p tp_thread_get(tp_p tp, size_t thread_num);
tpt_p tp_thread_get_rr(tp_p tp);
tpt_p tp_thread_get_pvt(tp_p tp); /* Shared virtual thread. */
int tp_thread_get_cpu_id(tpt_p tpt);
size_t tp_thread_get_num(tpt_p tpt);

tp_p tpt_get_tp(tpt_p tpt);
size_t tpt_is_running(tpt_p tpt);
int tpt_is_running(tpt_p tpt);
void *tpt_get_msg_queue(tpt_p tpt);


Expand Down
8 changes: 8 additions & 0 deletions liblcb.workspace
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,13 @@
<Project Name="test-base64" ConfigName="Release"/>
<Project Name="test-hash" ConfigName="Release"/>
</WorkspaceConfiguration>
<WorkspaceConfiguration Name="Debug-ASAN">
<Environment/>
<Project Name="lib" ConfigName="Debug"/>
<Project Name="test-base64" ConfigName="Debug"/>
<Project Name="test-ecdsa" ConfigName="Debug"/>
<Project Name="test-hash" ConfigName="Debug"/>
<Project Name="test-threadpool" ConfigName="Debug-ASAN"/>
</WorkspaceConfiguration>
</BuildMatrix>
</CodeLite_Workspace>
153 changes: 104 additions & 49 deletions src/threadpool/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ static const uint32_t tp_event_to_ep_map[] = {


typedef struct thread_pool_thread_s { /* thread pool thread info */
volatile size_t running; /* Running. */
volatile size_t state; /* State: TP_THREAD_STATE_*. */
volatile size_t tick_cnt; /* For detecting hangs thread. */
uintptr_t io_fd; /* IO handle: kqueue (per thread). */
#ifdef BSD /* BSD specific code. */
Expand All @@ -170,10 +170,16 @@ typedef struct thread_pool_thread_s { /* thread pool thread info */
tp_p tp; /* */
} tp_thread_t;

#define TP_THREAD_STATE_STOP 0
#define TP_THREAD_STATE_STOPING 1
#define TP_THREAD_STATE_STARTING 2
#define TP_THREAD_STATE_RUNNING 3


typedef struct thread_pool_s { /* thread pool */
tpt_p pvt; /* Pool virtual thread. */
size_t rr_idx;
volatile size_t rr_idx;
volatile size_t shutdown;
uint32_t flags;
char name[TP_NAME_SIZE];
size_t cpu_count;
Expand All @@ -195,8 +201,6 @@ void tpt_data_uninit(tpt_p tpt);

static void *tp_thread_proc(void *data);

void tpt_msg_shutdown_cb(tpt_p tpt, void *udata);

void tpt_cached_time_update_cb(tp_event_p ev, tp_udata_p tp_udata);


Expand Down Expand Up @@ -424,7 +428,7 @@ tpt_loop(tpt_p tpt) {
mem_bzero(&ke_timeout, sizeof(ke_timeout));

/* Main loop. */
while (0 != tpt->running) {
while (TP_THREAD_STATE_RUNNING == tpt->state) {
tpt->tick_cnt ++; /* Tic-toc. */
cnt = kevent((int)tpt->io_fd, tpt->ev_changelist,
tpt->ev_nchanges, &kev, 1, NULL /* Infinite wait. */);
Expand Down Expand Up @@ -766,7 +770,7 @@ tpt_loop(tpt_p tpt) {
tp = tpt->tp;
pvt = tp->pvt;
/* Main loop. */
while (0 != tpt->running) {
while (TP_THREAD_STATE_RUNNING == tpt->state) {
tpt->tick_cnt ++; /* Tic-toc. */
cnt = epoll_wait((int)tpt->io_fd, &epev, 1, -1 /* infinite wait. */);
if (0 == cnt) /* Timeout. */
Expand Down Expand Up @@ -1024,82 +1028,113 @@ tp_create(tp_settings_p s, tp_p *ptp) {
return (error);
}

static void
tpt_msg_shutdown_cb(tpt_p tpt, void *udata __unused) {

tpt->state = TP_THREAD_STATE_STOPING;
}
void
tp_shutdown(tp_p tp) {
size_t i;

if (NULL == tp)
return;
if (0 != tp->shutdown)
return;
tp->shutdown ++;
/* Shutdown threads. */
for (i = 0; i < tp->threads_max; i ++) {
if (0 == tp->threads[i].running)
for (size_t i = 0; i < tp->threads_max; i ++) {
if (0 == tpt_is_running(&tp->threads[i]))
continue;
tpt_msg_send(&tp->threads[i], NULL, 0,
tpt_msg_shutdown_cb, NULL);
}
}
void
tpt_msg_shutdown_cb(tpt_p tpt, void *udata __unused) {

tpt->running = 0;
}

void
int
tp_shutdown_wait(tp_p tp) {
size_t cnt;
int error;
size_t err_cnt = 0;
/* 1 sec = 1000000000 nanoseconds. */
struct timespec rqts = { .tv_sec = 0, .tv_nsec = 100000000 };

if (NULL == tp)
return;
/* Wait all threads before return. */
cnt = tp->threads_cnt;
while (0 != cnt) {
cnt = tp_thread_count_get(tp);
return (EINVAL);
if (0 == tp->shutdown)
return (EBUSY);
if (0 != tp_thread_is_tp_thr(tp, NULL))
return (EDEADLK);

for (size_t i = 0; i < tp->threads_max; i ++) {
if (TP_THREAD_STATE_STOP == tp->threads[i].state)
continue;
error = pthread_join(tp->threads[i].pt_id, NULL);
switch (error) {
case 0: /* No error. */
break;
case EDEADLK: /* Should not happen, checked by tp_thread_is_tp_thr(). */
return (error);
case EOPNOTSUPP: /* Probably other thread also call this right now. */
/* FreeBSD specific. */
default:
tp->threads[i].state = TP_THREAD_STATE_STOP;
err_cnt ++;
break;
}
}
/* Fallback code, normally not used. */
while (0 != err_cnt && 0 != tp_thread_count_get(tp)) {
nanosleep(&rqts, NULL); /* Ignore early wakeup and errors. */
}

return (0);
}

void
int
tp_destroy(tp_p tp) {
size_t i;
int error;

if (NULL == tp)
return;
return (EINVAL);
if (0 != tp_thread_is_tp_thr(tp, NULL))
return (EDEADLK);
/* Make sure that shutdown called. */
tp_shutdown(tp);
/* Wait all threads before free mem. */
tp_shutdown_wait(tp);
error = tp_shutdown_wait(tp);
if (0 != error)
return (error);
/* Free resources. */
tpt_data_uninit(tp->pvt);
for (i = 0; i < tp->threads_max; i ++) {
for (size_t i = 0; i < tp->threads_max; i ++) {
tpt_data_uninit(&tp->threads[i]);
}
free(tp);

return (0);
}


int
tp_threads_create(tp_p tp, int skip_first) {
size_t i;
tpt_p tpt;
char thr_name[64];

if (NULL == tp)
return (EINVAL);
if (0 != skip_first) {
tp->threads_cnt ++;
}
for (i = ((0 != skip_first) ? 1 : 0); i < tp->threads_max; i ++) {
if (0 != tp->shutdown)
return (EBUSY);

for (size_t i = ((0 != skip_first) ? 1 : 0); i < tp->threads_max; i ++) {
tpt = &tp->threads[i];
if (NULL == tpt->tp)
continue;
tpt->running = 1;
tpt->state = TP_THREAD_STATE_STARTING;
if (0 == pthread_create_eagain(&tpt->pt_id, NULL,
tp_thread_proc, tpt)) {
tp->threads_cnt ++;
snprintf(thr_name, sizeof(thr_name), "%s: %zu", tp->name, i);
pthread_set_name(tpt->pt_id, thr_name);
} else {
tpt->running = 0;
tpt->state = TP_THREAD_STATE_STOP;
}
}
return (0);
Expand All @@ -1111,12 +1146,18 @@ tp_thread_attach_first(tp_p tp) {

if (NULL == tp)
return (EINVAL);
if (0 != tp->shutdown)
return (EBUSY);

tpt = &tp->threads[0];
if (0 != tpt->running)
if (TP_THREAD_STATE_STOP != tpt->state)
return (ESPIPE);
tpt->running = 2;

tpt->state = TP_THREAD_STATE_STARTING;
tpt->pt_id = pthread_self();

tp_thread_proc(tpt);

return (0);
}

Expand All @@ -1125,7 +1166,7 @@ tp_thread_dettach(tpt_p tpt) {

if (NULL == tpt)
return (EINVAL);
tpt->running = 0;
tpt->state = TP_THREAD_STATE_STOP;
return (0);
}

Expand All @@ -1138,9 +1179,10 @@ tp_thread_proc(void *data) {
SYSLOGD_ERR(LOG_DEBUG, EINVAL, "Invalid data.");
return (NULL);
}
pthread_setspecific(tp_tls_key_tpt, (const void*)tpt);

tpt->running ++;
tpt->tp->threads_cnt ++;
tpt->state = TP_THREAD_STATE_RUNNING;
pthread_setspecific(tp_tls_key_tpt, (const void*)tpt);
syslog(LOG_INFO, "%s: thread %zu started...",
tpt->tp->name, tpt->thread_num);

Expand Down Expand Up @@ -1172,12 +1214,12 @@ tp_thread_proc(void *data) {

tpt_loop(tpt);

tpt->pt_id = 0;
tpt->running = 0; /* Reset state on exit or on error. */
tpt->tp->threads_cnt --;
pthread_setspecific(tp_tls_key_tpt, NULL);
syslog(LOG_INFO, "%s: thread %zu exited...",
tpt->tp->name, tpt->thread_num);
mem_bzero(&tpt->pt_id, sizeof(pthread_t));
tpt->state = TP_THREAD_STATE_STOP; /* Reset state on exit. */
tpt->tp->threads_cnt --;

return (NULL);
}
Expand All @@ -1199,9 +1241,9 @@ tp_thread_count_get(tp_p tp) {
if (NULL == tp)
return (0);
for (i = 0, cnt = 0; i < tp->threads_max; i ++) {
if (0 != tp->threads[i].pt_id) {
cnt ++;
}
if (0 == tpt_is_running(&tp->threads[i]))
continue;
cnt ++;
}
return (cnt);
}
Expand All @@ -1213,6 +1255,17 @@ tp_thread_get_current(void) {
return ((tpt_p)pthread_getspecific(tp_tls_key_tpt));
}

int
tp_thread_is_tp_thr(tp_p tp, tpt_p tpt) {

if (NULL == tp)
return (0);
if (NULL == tpt) {
tpt = tp_thread_get_current();
}
return ((NULL != tpt && tpt->tp == tp));
}

tpt_p
tp_thread_get(tp_p tp, size_t thread_num) {

Expand Down Expand Up @@ -1271,12 +1324,13 @@ tpt_get_tp(tpt_p tpt) {
return (tpt->tp);
}

size_t
int
tpt_is_running(tpt_p tpt) {

if (NULL == tpt)
return (0);
return (tpt->running);
return ((TP_THREAD_STATE_RUNNING == tpt->state ||
TP_THREAD_STATE_STARTING == tpt->state));
}

void *
Expand Down Expand Up @@ -1326,7 +1380,8 @@ tpt_ev_validate(int op, tp_event_p ev, tp_udata_p tp_udata) {
NULL == tp_udata)
return (EINVAL);
/* flags. */
if (0 != (~(TP_F_S_MASK) & ev->flags))
if (0 != (~(TP_F_S_MASK) & ev->flags) ||
(TP_F_ONESHOT | TP_F_DISPATCH) == ((TP_F_ONESHOT | TP_F_DISPATCH) & ev->flags))
return (EINVAL); /* Invalid flags: some unknown bits is set. */
/* tp_udata */
if (NULL == tp_udata->cb_func ||
Expand All @@ -1343,7 +1398,7 @@ tpt_ev_validate(int op, tp_event_p ev, tp_udata_p tp_udata) {
return (EINVAL); /* Invalid fflags: some unknown bits is set. */
break;
case TP_EV_TIMER:
#if 0 /* XXX: check this. */
#if defined(TP_F_EDGE) && 0 /* XXX: check this. */
if (0 != (TP_F_EDGE & ev->flags))
return (EINVAL); /* Invalid flags. */
#endif
Expand Down
Loading

0 comments on commit 96cdf87

Please sign in to comment.