diff --git a/include/pthreadpool.h b/include/pthreadpool.h index 7b7766b..8d6010d 100644 --- a/include/pthreadpool.h +++ b/include/pthreadpool.h @@ -83,26 +83,31 @@ pthreadpool_t pthreadpool_create(size_t threads_count); * * @returns The number of threads in the thread pool. */ -size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); +size_t pthreadpool_get_max_threads_count(pthreadpool_t threadpool); /* * API to enable doing work with fewer threads than available in * threadpool. + * This API takes in a pointer to threadpool object and sets number + * of threads to use with that threadpool. * Purpose of this is to ameliorate some perf degradation observed * due to OS mapping a given set of threads to fewer cores. * + * @param threadpool threadpool object. * @param num_threads num threads to use for the subsequent tasks - * submitted. + * submitted using the threadpool object. */ -void pthreadpool_set_num_threads_to_use(size_t num_threads); +void pthreadpool_set_threads_count(pthreadpool_t threadpool, size_t num_threads); /* - * Query current setting of the number of threads to use + * API to get number of threads to be used via threadpool. This number + * can be different from the size of the threadpool. It may have been set + * by pthreadpool_set_threads_count. * - * @returns The number of threads to be used for the subsequent tasks - * submitted. + * @param threadpool threadpool object. + * @returns number of threads used by threadpool. */ -size_t pthreadpool_get_num_threads_to_use(void); +size_t pthreadpool_get_threads_count(pthreadpool_t threadpool); /** * Process items on a 1D grid. diff --git a/src/fastpath.c b/src/fastpath.c index 1f5e7c8..af41c8f 100644 --- a/src/fastpath.c +++ b/src/fastpath.c @@ -32,7 +32,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath( const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -77,7 +77,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath( } #endif - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -113,7 +113,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath( const pthreadpool_task_1d_tile_1d_t task = (pthreadpool_task_1d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -155,7 +155,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath( const pthreadpool_task_2d_t task = (pthreadpool_task_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -201,7 +201,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath( const pthreadpool_task_2d_tile_1d_t task = (pthreadpool_task_2d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -251,7 +251,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath( const pthreadpool_task_2d_tile_2d_t task = (pthreadpool_task_2d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -313,7 +313,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_f } #endif - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -366,7 +366,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath( const pthreadpool_task_3d_t task = (pthreadpool_task_3d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -419,7 +419,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath( const pthreadpool_task_3d_tile_1d_t task = (pthreadpool_task_3d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -476,7 +476,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath( const pthreadpool_task_3d_tile_2d_t task = (pthreadpool_task_3d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -546,7 +546,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_f } #endif - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -607,7 +607,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath( const pthreadpool_task_4d_t task = (pthreadpool_task_4d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -668,7 +668,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath( const pthreadpool_task_4d_tile_1d_t task = (pthreadpool_task_4d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -733,7 +733,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath( const pthreadpool_task_4d_tile_2d_t task = (pthreadpool_task_4d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -810,7 +810,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_f } #endif - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -878,7 +878,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath( const pthreadpool_task_5d_t task = (pthreadpool_task_5d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -946,7 +946,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath( const pthreadpool_task_5d_tile_1d_t task = (pthreadpool_task_5d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1019,7 +1019,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath( const pthreadpool_task_5d_tile_2d_t task = (pthreadpool_task_5d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1095,7 +1095,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath( const pthreadpool_task_6d_t task = (pthreadpool_task_6d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1171,7 +1171,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath( const pthreadpool_task_6d_tile_1d_t task = (pthreadpool_task_6d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ @@ -1252,7 +1252,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath( const pthreadpool_task_6d_tile_2d_t task = (pthreadpool_task_6d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task); void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument); - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); const size_t range_threshold = -threads_count; /* Process thread's own range of items */ diff --git a/src/gcd.c b/src/gcd.c index 7f38076..314f6ee 100644 --- a/src/gcd.c +++ b/src/gcd.c @@ -21,8 +21,6 @@ #include "threadpool-object.h" #include "threadpool-utils.h" -thread_local size_t max_num_threads = UINT_MAX; - static void thread_main(void* arg, size_t thread_index) { struct pthreadpool* threadpool = (struct pthreadpool*) arg; struct thread_info* thread = &threadpool->threads[thread_index]; @@ -64,7 +62,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return NULL; } threadpool->threads_count = fxdiv_init_size_t(threads_count); - pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, threads_count); for (size_t tid = 0; tid < threads_count; tid++) { threadpool->threads[tid].thread_number = tid; } @@ -76,12 +74,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } -void pthreadpool_set_num_threads_to_use(size_t num_threads) { - max_num_threads = num_threads; -} - -size_t pthreadpool_get_num_threads_to_use() { - return max_num_threads; +void pthreadpool_set_threads_count(struct pthreadpool* threadpool, size_t num_threads) { + dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER); + const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; + const size_t num_threads_to_use_ = min(threads_count.value, num_threads); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_); + dispatch_semaphore_signal(threadpool->execution_semaphore); } PTHREADPOOL_INTERNAL void pthreadpool_parallelize( @@ -109,17 +107,17 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - const struct fxdiv_result_size_tsize_t threads_count = - fxdiv_init_size_t(min(threadpool->threads_count.value, pthreadpool_get_num_threads_to_use())); + const struct fxdiv_divisor_size_t num_threads_to_use = + fxdiv_init_size_t(pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use)); if (params_size != 0) { memcpy(&threadpool->params, params, params_size); } /* Spread the work between threads */ - const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count); + const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, num_threads_to_use); size_t range_start = 0; - for (size_t tid = 0; tid < threads_count.value; tid++) { + for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder); const size_t range_end = range_start + range_length; @@ -131,7 +129,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } - dispatch_apply_f(threads_count.value, DISPATCH_APPLY_AUTO, threadpool, thread_main); + dispatch_apply_f(num_threads_to_use.value, DISPATCH_APPLY_AUTO, threadpool, thread_main); /* Unprotect the global threadpool structures */ dispatch_semaphore_signal(threadpool->execution_semaphore); diff --git a/src/memory.c b/src/memory.c index fc0d83e..772caed 100644 --- a/src/memory.c +++ b/src/memory.c @@ -20,11 +20,11 @@ PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate( - size_t threads_count) + size_t max_threads_count) { - assert(threads_count >= 1); + assert(max_threads_count >= 1); - const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info); + const size_t threadpool_size = sizeof(struct pthreadpool) + max_threads_count * sizeof(struct thread_info); struct pthreadpool* threadpool = NULL; #if defined(__ANDROID__) /* @@ -55,7 +55,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_deallocate( { assert(threadpool != NULL); - const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->threads_count.value * sizeof(struct thread_info); + const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->max_threads_count.value * sizeof(struct thread_info); memset(threadpool, 0, threadpool_size); #ifdef _WIN32 diff --git a/src/portable-api.c b/src/portable-api.c index 88fcbf9..7871f86 100644 --- a/src/portable-api.c +++ b/src/portable-api.c @@ -21,12 +21,20 @@ #include "threadpool-utils.h" +size_t pthreadpool_get_max_threads_count(struct pthreadpool* threadpool) { + if (threadpool == NULL) { + return 1; + } + + return threadpool->max_threads_count.value; +} + size_t pthreadpool_get_threads_count(struct pthreadpool* threadpool) { if (threadpool == NULL) { return 1; } - return threadpool->threads_count.value; + return pthreadpool_load_relaxed_size_t(&threadpool->threads_count); } static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_info* thread) { @@ -44,7 +52,7 @@ static void thread_parallelize_1d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -84,7 +92,7 @@ static void thread_parallelize_1d_with_uarch(struct pthreadpool* threadpool, str /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -120,7 +128,7 @@ static void thread_parallelize_1d_tile_1d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -161,7 +169,7 @@ static void thread_parallelize_2d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -205,7 +213,7 @@ static void thread_parallelize_2d_tile_1d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -252,7 +260,7 @@ static void thread_parallelize_2d_tile_2d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -309,7 +317,7 @@ static void thread_parallelize_2d_tile_2d_with_uarch(struct pthreadpool* threadp /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -358,7 +366,7 @@ static void thread_parallelize_3d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -409,7 +417,7 @@ static void thread_parallelize_3d_tile_1d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -464,7 +472,7 @@ static void thread_parallelize_3d_tile_2d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -529,7 +537,7 @@ static void thread_parallelize_3d_tile_2d_with_uarch(struct pthreadpool* threadp /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -586,7 +594,7 @@ static void thread_parallelize_4d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -645,7 +653,7 @@ static void thread_parallelize_4d_tile_1d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -707,7 +715,7 @@ static void thread_parallelize_4d_tile_2d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -779,7 +787,7 @@ static void thread_parallelize_4d_tile_2d_with_uarch(struct pthreadpool* threadp /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -843,7 +851,7 @@ static void thread_parallelize_5d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -909,7 +917,7 @@ static void thread_parallelize_5d_tile_1d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -979,7 +987,7 @@ static void thread_parallelize_5d_tile_2d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -1052,7 +1060,7 @@ static void thread_parallelize_6d(struct pthreadpool* threadpool, struct thread_ /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -1126,7 +1134,7 @@ static void thread_parallelize_6d_tile_1d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -1204,7 +1212,7 @@ static void thread_parallelize_6d_tile_2d(struct pthreadpool* threadpool, struct /* There still may be other threads with work */ const size_t thread_number = thread->thread_number; - const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); for (size_t tid = modulo_decrement(thread_number, threads_count); tid != thread_number; tid = modulo_decrement(tid, threads_count)) @@ -1235,9 +1243,7 @@ void pthreadpool_parallelize_1d( size_t range, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1274,9 +1280,7 @@ void pthreadpool_parallelize_1d_with_uarch( size_t range, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || range <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -1325,9 +1329,7 @@ void pthreadpool_parallelize_1d_tile_1d( size_t tile, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || range <= tile) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1368,9 +1370,7 @@ void pthreadpool_parallelize_2d( size_t range_j, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i | range_j) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1413,9 +1413,7 @@ void pthreadpool_parallelize_2d_tile_1d( size_t tile_j, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1462,9 +1460,7 @@ void pthreadpool_parallelize_2d_tile_2d( size_t tile_j, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1516,9 +1512,7 @@ void pthreadpool_parallelize_2d_tile_2d_with_uarch( size_t tile_j, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i <= tile_i && range_j <= tile_j)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -1578,9 +1572,7 @@ void pthreadpool_parallelize_3d( size_t range_k, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1627,9 +1619,7 @@ void pthreadpool_parallelize_3d_tile_1d( size_t tile_k, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1680,9 +1670,7 @@ void pthreadpool_parallelize_3d_tile_2d( size_t tile_k, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1738,9 +1726,7 @@ void pthreadpool_parallelize_3d_tile_2d_with_uarch( size_t tile_k, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i <= 1 && range_j <= tile_j && range_k <= tile_k)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -1804,9 +1790,7 @@ void pthreadpool_parallelize_4d( size_t range_l, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1859,9 +1843,7 @@ void pthreadpool_parallelize_4d_tile_1d( size_t tile_l, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1918,9 +1900,7 @@ void pthreadpool_parallelize_4d_tile_2d( size_t tile_l, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -1981,9 +1961,7 @@ void pthreadpool_parallelize_4d_tile_2d_with_uarch( size_t tile_l, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j) <= 1 && range_k <= tile_k && range_l <= tile_l)) { /* No thread pool used: execute task sequentially on the calling thread */ @@ -2052,9 +2030,7 @@ void pthreadpool_parallelize_5d( size_t range_m, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -2111,9 +2087,7 @@ void pthreadpool_parallelize_5d_tile_1d( size_t tile_m, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -2174,9 +2148,7 @@ void pthreadpool_parallelize_5d_tile_2d( size_t tile_m, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k) <= 1 && range_l <= tile_l && range_m <= tile_m)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -2238,9 +2210,7 @@ void pthreadpool_parallelize_6d( size_t range_n, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || (range_i | range_j | range_k | range_l | range_m | range_n) <= 1) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -2301,9 +2271,7 @@ void pthreadpool_parallelize_6d_tile_1d( size_t tile_n, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l | range_m) <= 1 && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; @@ -2368,9 +2336,7 @@ void pthreadpool_parallelize_6d_tile_2d( size_t tile_n, uint32_t flags) { - size_t threads_count; - const size_t num_threads_to_use = pthreadpool_get_num_threads_to_use(); - threads_count = min(threadpool->threads_count.value, num_threads_to_use); + const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count); if (threadpool == NULL || threads_count <= 1 || ((range_i | range_j | range_k | range_l) <= 1 && range_m <= tile_m && range_n <= tile_n)) { /* No thread pool used: execute task sequentially on the calling thread */ struct fpu_state saved_fpu_state = { 0 }; diff --git a/src/pthreads.c b/src/pthreads.c index 9e93d47..e8dbdeb 100644 --- a/src/pthreads.c +++ b/src/pthreads.c @@ -54,7 +54,6 @@ #include "threadpool-object.h" #include "threadpool-utils.h" -thread_local size_t max_num_threads = UINT_MAX; #if PTHREADPOOL_USE_FUTEX #if defined(__linux__) @@ -228,41 +227,41 @@ static void* thread_main(void* arg) { }; } -struct pthreadpool* pthreadpool_create(size_t threads_count) { +struct pthreadpool* pthreadpool_create(size_t max_threads_count) { #if PTHREADPOOL_USE_CPUINFO if (!cpuinfo_initialize()) { return NULL; } #endif - if (threads_count == 0) { + if (max_threads_count == 0) { #if PTHREADPOOL_USE_CPUINFO - threads_count = cpuinfo_get_processors_count(); + max_threads_count = cpuinfo_get_processors_count(); #elif defined(_SC_NPROCESSORS_ONLN) - threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); + max_threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); #if defined(__EMSCRIPTEN_PTHREADS__) /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ - if (threads_count >= 8) { - threads_count = 8; + if (max_threads_count >= 8) { + max_threads_count = 8; } #endif #elif defined(_WIN32) SYSTEM_INFO system_info; ZeroMemory(&system_info, sizeof(system_info)); GetSystemInfo(&system_info); - threads_count = (size_t) system_info.dwNumberOfProcessors; + max_threads_count = (size_t) system_info.dwNumberOfProcessors; #else #error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required" #endif } - struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); + struct pthreadpool* threadpool = pthreadpool_allocate(max_threads_count); if (threadpool == NULL) { return NULL; } - threadpool->threads_count = fxdiv_init_size_t(threads_count); - pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, threads_count); - for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->max_threads_count = fxdiv_init_size_t(max_threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->threads_count, max_threads_count); + for (size_t tid = 0; tid < max_threads_count; tid++) { threadpool->threads[tid].thread_number = tid; threadpool->threads[tid].threadpool = threadpool; // Since command is per thread we are creating conditional variables per thread as well. @@ -274,7 +273,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { } /* Thread pool with a single thread computes everything on the caller thread. */ - if (threads_count > 1) { + if (max_threads_count > 1) { pthread_mutex_init(&threadpool->execution_mutex, NULL); #if !PTHREADPOOL_USE_FUTEX pthread_mutex_init(&threadpool->completion_mutex, NULL); @@ -284,10 +283,10 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { #if PTHREADPOOL_USE_FUTEX pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, max_threads_count - 1 /* caller thread */); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ - for (size_t tid = 1; tid < threads_count; tid++) { + for (size_t tid = 1; tid < max_threads_count; tid++) { pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); } @@ -297,12 +296,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } -void pthreadpool_set_num_threads_to_use(size_t num_threads) { - max_num_threads = num_threads; -} - -size_t pthreadpool_get_num_threads_to_use() { - return max_num_threads; +void pthreadpool_set_threads_count(struct pthreadpool* threadpool, size_t num_threads) { + pthread_mutex_lock(&threadpool->execution_mutex); + const struct fxdiv_divisor_size_t max_threads_count = threadpool->max_threads_count; + const size_t num_threads_to_use = min(max_threads_count.value, num_threads); + pthreadpool_store_release_size_t(&threadpool->threads_count, num_threads_to_use); + pthread_mutex_unlock(&threadpool->execution_mutex); } PTHREADPOOL_INTERNAL void pthreadpool_parallelize( @@ -323,12 +322,11 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( /* Protect the global threadpool structures */ pthread_mutex_lock(&threadpool->execution_mutex); - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - size_t max_threads_to_use = pthreadpool_get_num_threads_to_use(); - const struct fxdiv_divisor_size_t num_threads_to_use = fxdiv_init_size_t(min(threads_count.value, max_threads_to_use)); + const struct fxdiv_divisor_size_t threads_count = + fxdiv_init_size_t(pthreadpool_load_relaxed_size_t(&threadpool->threads_count)); #if !PTHREADPOOL_USE_FUTEX /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ - for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + for (size_t tid = 1; tid < threads_count.value; tid++) { pthread_mutex_lock(&(threadpool->threads[tid].command_mutex)); } #endif @@ -340,8 +338,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, num_threads_to_use.value - 1 /* caller thread */); - pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use.value); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */); #if PTHREADPOOL_USE_FUTEX pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); #endif @@ -352,9 +349,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( } /* Spread the work between threads */ - const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, num_threads_to_use); + const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count); size_t range_start = 0; - for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { + for (size_t tid = 0; tid < threads_count.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder); const size_t range_end = range_start + range_length; @@ -366,7 +363,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( range_start = range_end; } - for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + for (size_t tid = 1; tid < threads_count.value; tid++) { /* * Update the threadpool command. * Imporantly, do it after initializing command parameters (range, task, argument, flags) @@ -424,14 +421,14 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - if (threads_count.value > 1) { + const struct fxdiv_divisor_size_t max_threads_count = threadpool->max_threads_count; + if (max_threads_count.value > 1) { #if PTHREADPOOL_USE_FUTEX - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, max_threads_count.value - 1 /* caller thread */); pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); /* Wake up worker threads */ - for (size_t tid = 1; tid < threads_count.value; tid++) { + for (size_t tid = 1; tid < max_threads_count.value; tid++) { /* * Store the command with release semantics to guarantee that if a worker thread observes * the new command value, it also observes the updated active_threads/has_active_threads values. @@ -441,13 +438,13 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { futex_wake_all(&threadpool->threads[tid].command); } #else - for (size_t tid = 1; tid < threads_count.value; tid++) { + for (size_t tid = 1; tid < max_threads_count.value; tid++) { /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ pthread_mutex_lock(&threadpool->threads[tid].command_mutex); } - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, max_threads_count.value - 1 /* caller thread */); - for (size_t tid = 1; tid < threads_count.value; tid++) { + for (size_t tid = 1; tid < max_threads_count.value; tid++) { /* * Store the command with release semantics to guarantee that if a worker thread observes * the new command value, it also observes the updated active_threads value. @@ -461,14 +458,14 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { pthread_cond_broadcast(&threadpool->threads[tid].command_condvar); } - for (size_t tid = 1; tid < threads_count.value; tid++) { + for (size_t tid = 1; tid < max_threads_count.value; tid++) { /* Commit the state changes and let workers start processing */ pthread_mutex_unlock(&threadpool->threads[tid].command_mutex); } #endif /* Wait until all threads return */ - for (size_t thread = 1; thread < threads_count.value; thread++) { + for (size_t thread = 1; thread < max_threads_count.value; thread++) { pthread_join(threadpool->threads[thread].thread_object, NULL); } @@ -477,7 +474,7 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { #if !PTHREADPOOL_USE_FUTEX pthread_mutex_destroy(&threadpool->completion_mutex); pthread_cond_destroy(&threadpool->completion_condvar); - for (size_t tid = 0; tid < threads_count.value; tid++) { + for (size_t tid = 0; tid < max_threads_count.value; tid++) { pthread_mutex_destroy(&threadpool->threads[tid].command_mutex); pthread_cond_destroy(&threadpool->threads[tid].command_condvar); } diff --git a/src/threadpool-common.h b/src/threadpool-common.h index f3d6b27..ca84744 100644 --- a/src/threadpool-common.h +++ b/src/threadpool-common.h @@ -73,19 +73,3 @@ #define PTHREADPOOL_INTERNAL #endif #endif - -// ported from: https://stackoverflow.com/questions/18298280/how-to-declare-a-variable-as-thread-local-portably -/* gcc doesn't know _Thread_local from C11 yet */ -#ifdef __GNUC__ -# define thread_local __thread -/* -// c11 standard already has thread_local specified -// https://en.cppreference.com/w/c/thread/thread_local -#elif __STDC_VERSION__ >= 201112L -# define thread_local _Thread_local -*/ -#elif defined(_MSC_VER) -# define thread_local __declspec(thread) -#else -# error Cannot define thread_local -#endif diff --git a/src/threadpool-object.h b/src/threadpool-object.h index e3ec4af..b14e1e7 100644 --- a/src/threadpool-object.h +++ b/src/threadpool-object.h @@ -60,7 +60,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED thread_info { */ pthreadpool_atomic_size_t range_length; /** - * Thread number in the 0..threads_count-1 range. + * Thread number in the 0..max_threads_count-1 range. */ size_t thread_number; /** @@ -610,7 +610,6 @@ struct pthreadpool_6d_tile_2d_params { }; struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { -#if !PTHREADPOOL_USE_GCD /** * The number of threads that are processing an operation. */ @@ -621,8 +620,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * As per this change, this feature is not available in GCD based * pthreadpool */ - pthreadpool_atomic_size_t num_threads_to_use; -#endif + pthreadpool_atomic_size_t threads_count; #if PTHREADPOOL_USE_FUTEX /** * Indicates if there are active threads. @@ -715,7 +713,7 @@ struct PTHREADPOOL_CACHELINE_ALIGNED pthreadpool { * FXdiv divisor for the number of threads in the thread pool. * This struct never change after pthreadpool_create. */ - struct fxdiv_divisor_size_t threads_count; + struct fxdiv_divisor_size_t max_threads_count; /** * Thread information structures that immediately follow this structure. */ @@ -726,7 +724,7 @@ PTHREADPOOL_STATIC_ASSERT(sizeof(struct pthreadpool) % PTHREADPOOL_CACHELINE_SIZ "pthreadpool structure must occupy an integer number of cache lines (64 bytes)"); PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate( - size_t threads_count); + size_t max_threads_count); PTHREADPOOL_INTERNAL void pthreadpool_deallocate( struct pthreadpool* threadpool); diff --git a/src/windows.c b/src/windows.c index 4c34c15..2798888 100644 --- a/src/windows.c +++ b/src/windows.c @@ -22,8 +22,6 @@ #include "threadpool-object.h" #include "threadpool-utils.h" -thread_local size_t max_num_threads = UINT_MAX; - static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) { if (pthreadpool_decrement_fetch_acquire_release_size_t(&threadpool->active_threads) == 0) { SetEvent(threadpool->completion_event[event_index]); @@ -135,27 +133,27 @@ static DWORD WINAPI thread_main(LPVOID arg) { return 0; } -struct pthreadpool* pthreadpool_create(size_t threads_count) { - if (threads_count == 0) { +struct pthreadpool* pthreadpool_create(size_t max_threads_count) { + if (max_threads_count == 0) { SYSTEM_INFO system_info; ZeroMemory(&system_info, sizeof(system_info)); GetSystemInfo(&system_info); - threads_count = (size_t) system_info.dwNumberOfProcessors; + max_threads_count = (size_t) system_info.dwNumberOfProcessors; } - struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); + struct pthreadpool* threadpool = pthreadpool_allocate(max_threads_count); if (threadpool == NULL) { return NULL; } - threadpool->threads_count = fxdiv_init_size_t(threads_count); - pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, threads_count); - for (size_t tid = 0; tid < threads_count; tid++) { + threadpool->max_threads_count = fxdiv_init_size_t(max_threads_count); + pthreadpool_store_relaxed_size_t(&threadpool->threads_count, max_threads_count); + for (size_t tid = 0; tid < max_threads_count; tid++) { threadpool->threads[tid].thread_number = tid; threadpool->threads[tid].threadpool = threadpool; } /* Thread pool with a single thread computes everything on the caller thread. */ - if (threads_count > 1) { + if (max_threads_count > 1) { threadpool->execution_mutex = CreateMutexW( NULL /* mutex attributes */, FALSE /* initially owned */, @@ -168,11 +166,11 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { NULL /* name */); } - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, max_threads_count - 1 /* caller thread */); pthreadpool_store_relaxed_size_t(&threadpool->completion_event_index, 0); /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ - for (size_t tid = 1; tid < threads_count; tid++) { + for (size_t tid = 1; tid < max_threads_count; tid++) { threadpool->threads[tid].thread_handle = CreateThread( NULL /* thread attributes */, 0 /* stack size: default */, @@ -195,12 +193,14 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) { return threadpool; } -void pthreadpool_set_num_threads_to_use(size_t num_threads) { - max_num_threads = num_threads; -} - -size_t pthreadpool_get_num_threads_to_use() { - return max_num_threads; +void pthreadpool_set_threads_count(struct pthreadpool* threadpool, size_t num_threads) { + const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE); + assert(wait_status == WAIT_OBJECT_0); + const struct fxdiv_divisor_size_t max_threads_count = threadpool->max_threads_count; + const size_t num_threads_to_use = min(max_threads_count.value, num_threads); + pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use); + const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex); + assert(release_mutex_status != FALSE); } PTHREADPOOL_INTERNAL void pthreadpool_parallelize( @@ -222,9 +222,8 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE); assert(wait_status == WAIT_OBJECT_0); - const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; - size_t max_threads_to_use = pthreadpool_get_num_threads_to_use(); - const struct fxdiv_divisor_size_t num_threads_to_use = fxdiv_init_size_t(min(threads_count.value, max_threads_to_use)); + const struct fxdiv_divisor_size_t threads_count = + fxdiv_init_size_t(pthreadpool_load_relaxed_size_t(&threadpool->threads_count)); /* Setup global arguments */ pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); @@ -232,8 +231,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( pthreadpool_store_relaxed_void_p(&threadpool->argument, context); pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, num_threads_to_use.value - 1 /* caller thread */); - pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use.value); + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */); if (params_size != 0) { CopyMemory(&threadpool->params, params, params_size); @@ -241,9 +239,9 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( } /* Spread the work between threads */ - const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, num_threads_to_use); + const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count); size_t range_start = 0; - for (size_t tid = 0; tid < num_threads_to_use.value; tid++) { + for (size_t tid = 0; tid < threads_count.value; tid++) { struct thread_info* thread = &threadpool->threads[tid]; const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder); const size_t range_end = range_start + range_length; @@ -259,7 +257,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( completion_event_index = completion_event_index ^ 1; pthreadpool_store_relaxed_size_t(&threadpool->completion_event_index, completion_event_index); - for (size_t tid = 1; tid < num_threads_to_use.value; tid++) { + for (size_t tid = 1; tid < threads_count.value; tid++) { /* * Update the threadpool command. * Imporantly, do it after initializing command parameters (range, task, argument, flags) @@ -337,11 +335,11 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize( void pthreadpool_destroy(struct pthreadpool* threadpool) { if (threadpool != NULL) { - const size_t threads_count = threadpool->threads_count.value; - if (threads_count > 1) { - pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); + const size_t max_threads_count = threadpool->max_threads_count.value; + if (max_threads_count > 1) { + pthreadpool_store_relaxed_size_t(&threadpool->active_threads, max_threads_count - 1 /* caller thread */); - for (size_t tid = 1; tid < threads_count; tid++) { + for (size_t tid = 1; tid < max_threads_count; tid++) { /* * Store the command with release semantics to guarantee that if a worker thread observes * the new command value, it also observes the updated active_threads values. @@ -360,7 +358,7 @@ void pthreadpool_destroy(struct pthreadpool* threadpool) { } /* Wait until all threads return */ - for (size_t tid = 1; tid < threads_count; tid++) { + for (size_t tid = 1; tid < max_threads_count; tid++) { const HANDLE thread_handle = threadpool->threads[tid].thread_handle; if (thread_handle != NULL) { const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE); diff --git a/test/pthreadpool.cc b/test/pthreadpool.cc index f6b0325..d7c446f 100644 --- a/test/pthreadpool.cc +++ b/test/pthreadpool.cc @@ -7433,7 +7433,7 @@ TEST(CapNumThreadsTest, RunUnderCapacity) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(2); + pthreadpool_set_threads_count(threadpool.get(), 2); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7472,8 +7472,8 @@ TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes1) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(2); - pthreadpool_set_num_threads_to_use(3); + pthreadpool_set_threads_count(threadpool.get(), 2); + pthreadpool_set_threads_count(threadpool.get(), 3); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7512,7 +7512,7 @@ TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes2) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(2); + pthreadpool_set_threads_count(threadpool.get(), 2); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7540,7 +7540,7 @@ TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes2) { context.addend = addend; context.sum = sum; - pthreadpool_set_num_threads_to_use(3); + pthreadpool_set_threads_count(threadpool.get(), 3); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7579,7 +7579,7 @@ TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes3) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(1); + pthreadpool_set_threads_count(threadpool.get(), 1); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7607,7 +7607,7 @@ TEST(CapNumThreadsTest, RunUnderCapacitySetMultipleTimes3) { context.addend = addend; context.sum = sum; - pthreadpool_set_num_threads_to_use(4); + pthreadpool_set_threads_count(threadpool.get(), 4); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7646,7 +7646,7 @@ TEST(CapNumThreadsTest, RunAtCapacity) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(4); + pthreadpool_set_threads_count(threadpool.get(), 4); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7685,7 +7685,7 @@ TEST(CapNumThreadsTest, RunOverCapacity) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(16); + pthreadpool_set_threads_count(threadpool.get(), 16); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP(); @@ -7724,7 +7724,7 @@ TEST(CapNumThreadsTest, RunSingleThreaded) { auto_pthreadpool_t threadpool(pthreadpool_create(4), pthreadpool_destroy); ASSERT_TRUE(threadpool.get()); - pthreadpool_set_num_threads_to_use(1); + pthreadpool_set_threads_count(threadpool.get(), 1); if (pthreadpool_get_threads_count(threadpool.get()) <= 1) { GTEST_SKIP();