Skip to content

Commit

Permalink
Use pthreadpool object to store max number of threads to use
Browse files Browse the repository at this point in the history
Summary:
This commits changes API to set max num threads. It applies the limit to
the pthreadpool object.

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
kimishpatel committed Nov 22, 2021
1 parent fa30cac commit 2301913
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 234 deletions.
19 changes: 12 additions & 7 deletions include/pthreadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,31 @@ pthreadpool_t pthreadpool_create(size_t threads_count);
*
* @returns The number of threads in the thread pool.
*/
size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);
size_t pthreadpool_get_max_threads_count(pthreadpool_t threadpool);

/*
* API to enable doing work with fewer threads than available in
* threadpool.
* This API takes in a pointer to threadpool object and sets number
* of threads to use with that threadpool.
* Purpose of this is to ameliorate some perf degradation observed
* due to OS mapping a given set of threads to fewer cores.
*
* @param threadpool threadpool object.
* @param num_threads num threads to use for the subsequent tasks
* submitted.
* submitted using the threadpool object.
*/
void pthreadpool_set_num_threads_to_use(size_t num_threads);
void pthreadpool_set_threads_count(pthreadpool_t threadpool, size_t num_threads);

/*
* Query current setting of the number of threads to use
* API to get number of threads to be used via threadpool. This number
* can be different from the size of the threadpool. It may have been set
* by pthreadpool_set_threads_count.
*
* @returns The number of threads to be used for the subsequent tasks
* submitted.
* @param threadpool threadpool object.
* @returns number of threads used by threadpool.
*/
size_t pthreadpool_get_num_threads_to_use(void);
size_t pthreadpool_get_threads_count(pthreadpool_t threadpool);

/**
* Process items on a 1D grid.
Expand Down
42 changes: 21 additions & 21 deletions src/fastpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_fastpath(
const pthreadpool_task_1d_t task = (pthreadpool_task_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -77,7 +77,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_with_uarch_fastpath(
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -113,7 +113,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_1d_tile_1d_fastpath(
const pthreadpool_task_1d_tile_1d_t task = (pthreadpool_task_1d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -155,7 +155,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_fastpath(
const pthreadpool_task_2d_t task = (pthreadpool_task_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -201,7 +201,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_1d_fastpath(
const pthreadpool_task_2d_tile_1d_t task = (pthreadpool_task_2d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -251,7 +251,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_fastpath(
const pthreadpool_task_2d_tile_2d_t task = (pthreadpool_task_2d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -313,7 +313,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_2d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -366,7 +366,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_fastpath(
const pthreadpool_task_3d_t task = (pthreadpool_task_3d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -419,7 +419,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_1d_fastpath(
const pthreadpool_task_3d_tile_1d_t task = (pthreadpool_task_3d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -476,7 +476,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_fastpath(
const pthreadpool_task_3d_tile_2d_t task = (pthreadpool_task_3d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -546,7 +546,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_3d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -607,7 +607,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_fastpath(
const pthreadpool_task_4d_t task = (pthreadpool_task_4d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -668,7 +668,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_1d_fastpath(
const pthreadpool_task_4d_tile_1d_t task = (pthreadpool_task_4d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -733,7 +733,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_fastpath(
const pthreadpool_task_4d_tile_2d_t task = (pthreadpool_task_4d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -810,7 +810,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_4d_tile_2d_with_uarch_f
}
#endif

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -878,7 +878,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_fastpath(
const pthreadpool_task_5d_t task = (pthreadpool_task_5d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -946,7 +946,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_1d_fastpath(
const pthreadpool_task_5d_tile_1d_t task = (pthreadpool_task_5d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1019,7 +1019,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_5d_tile_2d_fastpath(
const pthreadpool_task_5d_tile_2d_t task = (pthreadpool_task_5d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1095,7 +1095,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_fastpath(
const pthreadpool_task_6d_t task = (pthreadpool_task_6d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1171,7 +1171,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_1d_fastpath(
const pthreadpool_task_6d_tile_1d_t task = (pthreadpool_task_6d_tile_1d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down Expand Up @@ -1252,7 +1252,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_thread_parallelize_6d_tile_2d_fastpath(
const pthreadpool_task_6d_tile_2d_t task = (pthreadpool_task_6d_tile_2d_t) pthreadpool_load_relaxed_void_p(&threadpool->task);
void *const argument = pthreadpool_load_relaxed_void_p(&threadpool->argument);

const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use);
const size_t threads_count = pthreadpool_load_relaxed_size_t(&threadpool->threads_count);
const size_t range_threshold = -threads_count;

/* Process thread's own range of items */
Expand Down
26 changes: 12 additions & 14 deletions src/gcd.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include "threadpool-object.h"
#include "threadpool-utils.h"

thread_local size_t max_num_threads = UINT_MAX;

static void thread_main(void* arg, size_t thread_index) {
struct pthreadpool* threadpool = (struct pthreadpool*) arg;
struct thread_info* thread = &threadpool->threads[thread_index];
Expand Down Expand Up @@ -64,7 +62,7 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return NULL;
}
threadpool->threads_count = fxdiv_init_size_t(threads_count);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, threads_count);
for (size_t tid = 0; tid < threads_count; tid++) {
threadpool->threads[tid].thread_number = tid;
}
Expand All @@ -76,12 +74,12 @@ struct pthreadpool* pthreadpool_create(size_t threads_count) {
return threadpool;
}

void pthreadpool_set_num_threads_to_use(size_t num_threads) {
max_num_threads = num_threads;
}

size_t pthreadpool_get_num_threads_to_use() {
return max_num_threads;
void pthreadpool_set_threads_count(struct pthreadpool* threadpool, size_t num_threads) {
dispatch_semaphore_wait(threadpool->execution_semaphore, DISPATCH_TIME_FOREVER);
const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
const size_t num_threads_to_use_ = min(threads_count.value, num_threads);
pthreadpool_store_relaxed_size_t(&threadpool->num_threads_to_use, num_threads_to_use_);
dispatch_semaphore_signal(threadpool->execution_semaphore);
}

PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
Expand Down Expand Up @@ -109,17 +107,17 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);

/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
const struct fxdiv_result_size_tsize_t threads_count =
fxdiv_init_size_t(min(threadpool->threads_count.value, pthreadpool_get_num_threads_to_use()));
const struct fxdiv_divisor_size_t num_threads_to_use =
fxdiv_init_size_t(pthreadpool_load_relaxed_size_t(&threadpool->num_threads_to_use));

if (params_size != 0) {
memcpy(&threadpool->params, params, params_size);
}

/* Spread the work between threads */
const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, num_threads_to_use);
size_t range_start = 0;
for (size_t tid = 0; tid < threads_count.value; tid++) {
for (size_t tid = 0; tid < num_threads_to_use.value; tid++) {
struct thread_info* thread = &threadpool->threads[tid];
const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
const size_t range_end = range_start + range_length;
Expand All @@ -131,7 +129,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
range_start = range_end;
}

dispatch_apply_f(threads_count.value, DISPATCH_APPLY_AUTO, threadpool, thread_main);
dispatch_apply_f(num_threads_to_use.value, DISPATCH_APPLY_AUTO, threadpool, thread_main);

/* Unprotect the global threadpool structures */
dispatch_semaphore_signal(threadpool->execution_semaphore);
Expand Down
8 changes: 4 additions & 4 deletions src/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@


PTHREADPOOL_INTERNAL struct pthreadpool* pthreadpool_allocate(
size_t threads_count)
size_t max_threads_count)
{
assert(threads_count >= 1);
assert(max_threads_count >= 1);

const size_t threadpool_size = sizeof(struct pthreadpool) + threads_count * sizeof(struct thread_info);
const size_t threadpool_size = sizeof(struct pthreadpool) + max_threads_count * sizeof(struct thread_info);
struct pthreadpool* threadpool = NULL;
#if defined(__ANDROID__)
/*
Expand Down Expand Up @@ -55,7 +55,7 @@ PTHREADPOOL_INTERNAL void pthreadpool_deallocate(
{
assert(threadpool != NULL);

const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->threads_count.value * sizeof(struct thread_info);
const size_t threadpool_size = sizeof(struct pthreadpool) + threadpool->max_threads_count.value * sizeof(struct thread_info);
memset(threadpool, 0, threadpool_size);

#ifdef _WIN32
Expand Down
Loading

0 comments on commit 2301913

Please sign in to comment.