Skip to content

Commit

Permalink
guard with HAVE_PTHREADS
Browse files Browse the repository at this point in the history
  • Loading branch information
adammoody committed Nov 21, 2023
1 parent a04288c commit fbea6c4
Showing 1 changed file with 57 additions and 29 deletions.
86 changes: 57 additions & 29 deletions src/redset_xor.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <sys/stat.h>
#include <fcntl.h>

#ifdef HAVE_PTHREADS
#include <pthread.h>

/* get_nprocs() */
Expand All @@ -14,6 +15,7 @@
#else
#include <sys/sysinfo.h>
#endif
#endif /* HAVE_PTHREADS */

#include "mpi.h"

Expand All @@ -34,6 +36,16 @@ Distribute and file rebuild functions
=========================================
*/

/* XOR a with b, store result in a */
static void reduce_xor(unsigned char* a, const unsigned char* b, size_t count)
{
size_t i;
for (i = 0; i < count; i++) {
a[i] ^= b[i];
}
}

#ifdef HAVE_PTHREADS
/* Linux and OSX compatible 'get number of hardware threads' */
unsigned int redset_get_nprocs(void)
{
Expand All @@ -54,36 +66,30 @@ unsigned int redset_get_nprocs(void)
return cpu_threads;
}

/* defines work for each thread along with data structures
* to coordinate with main thread */
typedef struct {
int rank;
unsigned char* a;
unsigned char* b;
size_t n;

pthread_mutex_t mutex;

int done;
int main_waiting;
int thread_waiting;
pthread_cond_t cond_main;
pthread_cond_t cond_thread;
int rank; /* rank of thread (used for debugging) */
unsigned char* a; /* input/output buffer */
unsigned char* b; /* other input buffer */
size_t n; /* size of buffer in bytes */

pthread_mutex_t mutex; /* mutex for condition vars below */

int done; /* worker sets to 1 when done */
int main_waiting; /* indicates main thread is waiting on cond_main */
int thread_waiting; /* indicates worker thread is waiting on cond_thread */
pthread_cond_t cond_main; /* condition variable to wake main thread */
pthread_cond_t cond_thread; /* condition variable to wake worker thread */
} thread_xor_t;

/* data structure used on main process to track each thread it launches */
typedef struct {
int num;
pthread_t* tids;
thread_xor_t* data;
int num; /* number of threads */
pthread_t* tids; /* pthread_t for each thread */
thread_xor_t* data; /* data structure for each thread */
} threadset_t;

/* XOR a with b, store result in a */
static void reduce_xor(unsigned char* a, const unsigned char* b, size_t count)
{
size_t i;
for (i = 0; i < count; i++) {
a[i] ^= b[i];
}
}

/* The actual pthread function */
void* reduce_xor_pthread_fn(void* arg)
{
Expand Down Expand Up @@ -114,7 +120,7 @@ void* reduce_xor_pthread_fn2(void* arg)
/* do work */
reduce_xor(d->a, d->b, d->n);

/* signal to main thread that we're done */
/* signal main thread that we're done */
pthread_mutex_lock(&d->mutex);
d->done = 1;
if (d->main_waiting) {
Expand Down Expand Up @@ -191,6 +197,7 @@ static int reduce_xor_pthread(unsigned char* a, unsigned char* b, size_t count)
return ret;
}

/* spawn a set of threads and fill in threadset structure to track them */
static int reduce_xor_pthread_setup(threadset_t* tset)
{
int i;
Expand All @@ -199,6 +206,7 @@ static int reduce_xor_pthread_setup(threadset_t* tset)

/* TODO: launch threads and attach to descriptor, activate via condition variable */

/* compute number of threads to start up */
int max_threads = 16;
int nthreads = redset_get_nprocs();
if (nthreads > max_threads) {
Expand Down Expand Up @@ -236,6 +244,7 @@ static int reduce_xor_pthread_setup(threadset_t* tset)
return ret;
}

/* given data, assign work to threads and perform XOR reduction */
static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsigned char* b, size_t count)
{
int i;
Expand All @@ -246,29 +255,30 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig

int nthreads = tset->num;

/* compute number of bytes for each thread to reduce */
size_t size = count / nthreads;
if (size * nthreads < count) {
size += 1;
}

/* TODO: ensure size of some minimum */

/* define work for each thread */
/* define work descriptor for each thread */
size_t offset = 0;
for (i = 0; i < nthreads; i++) {
size_t amt = count - offset;
if (amt > size) {
amt = size;
}

//printf("thread %d %lu\n", i, (unsigned long) amt);
tset->data[i].a = a + offset;
tset->data[i].b = b + offset;
tset->data[i].n = amt;

offset += amt;
}

/* signal each worker that has something to do */
for (i = 0; i < nthreads; i++) {
if (tset->data[i].n > 0) {
pthread_mutex_lock(&tset->data[i].mutex);
Expand All @@ -280,6 +290,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig
}
}

/* wait for each worker to signal that it is done */
for (i = 0; i < nthreads; i++) {
if (tset->data[i].n > 0) {
pthread_mutex_lock(&tset->data[i].mutex);
Expand All @@ -295,6 +306,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig
return ret;
}

/* signal threads to shutdown, wait for them to exit, and free resources in thread set */
static int reduce_xor_pthread_teardown(threadset_t* tset)
{
int i;
Expand Down Expand Up @@ -333,9 +345,11 @@ static int reduce_xor_pthread_teardown(threadset_t* tset)
/* free memory in data structure */
redset_free(&tset->data);
redset_free(&tset->tids);
tset->num = 0;

return ret;
}
#endif /* HAVE_PTHREADS */

/* set chunk filenames of form: xor.<group_id>_<xor_rank+1>_of_<xor_ranks>.redset */
static void redset_build_xor_filename(
Expand Down Expand Up @@ -624,8 +638,10 @@ int redset_apply_xor(
kvtree_write_fd(my_chunk_file, fd_xor, header);
kvtree_delete(&header);

#ifdef HAVE_PTHREADS
threadset_t threads;
reduce_xor_pthread_setup(&threads);
#endif /* HAVE_PTHREADS */

MPI_Request request[2];
MPI_Status status[2];
Expand Down Expand Up @@ -658,9 +674,12 @@ int redset_apply_xor(
/* TODO: XORing with unsigned long would be faster here (if chunk size is multiple of this size) */
/* merge the blocks via xor operation */
if (chunk_id < d->ranks-1) {
//reduce_xor(send_buf, recv_buf, count);
#ifdef HAVE_PTHREADS
//reduce_xor_pthread(send_buf, recv_buf, count);
reduce_xor_pthread_execute(&threads, send_buf, recv_buf, count);
#else
reduce_xor(send_buf, recv_buf, count);
#endif /* HAVE_PTHREADS */
}

if (chunk_id > 0) {
Expand Down Expand Up @@ -691,7 +710,9 @@ int redset_apply_xor(
redset_buffers_free(1, &send_bufs);
redset_buffers_free(1, &recv_bufs);

#ifdef HAVE_PTHREADS
reduce_xor_pthread_teardown(&threads);
#endif /* HAVE_PTHREADS */

#if 0
/* if crc_on_copy is set, compute and store CRC32 value for chunk file */
Expand Down Expand Up @@ -822,8 +843,10 @@ int redset_recover_xor_rebuild(
);
}

#ifdef HAVE_PTHREADS
threadset_t threads;
reduce_xor_pthread_setup(&threads);
#endif /* HAVE_PTHREADS */

/* allocate buffer to read a piece of my file */
unsigned char** send_bufs = (unsigned char**) redset_buffers_alloc(1, redset_mpi_buf_size);
Expand Down Expand Up @@ -865,9 +888,12 @@ int redset_recover_xor_rebuild(
/* if not start of pipeline, receive data from left and xor with my own */
if (root != state->lhs_rank) {
MPI_Recv(recv_buf, count, MPI_BYTE, state->lhs_rank, 0, d->comm, &status[0]);
//reduce_xor(send_buf, recv_buf, count);
#ifdef HAVE_PTHREADS
//reduce_xor_pthread(send_buf, recv_buf, count);
reduce_xor_pthread_execute(&threads, send_buf, recv_buf, count);
#else
reduce_xor(send_buf, recv_buf, count);
#endif /* HAVE_PTHREADS */
}

/* send data to right-side partner */
Expand Down Expand Up @@ -946,7 +972,9 @@ int redset_recover_xor_rebuild(
* we do this on every file instead of just the rebuilt files so that we preserve atime on all files */
redset_lofi_apply_meta(current_hash);

#ifdef HAVE_PTHREADS
reduce_xor_pthread_teardown(&threads);
#endif /* HAVE_PTHREADS */

/* free the buffers */
redset_buffers_free(1, &recv_bufs);
Expand Down

0 comments on commit fbea6c4

Please sign in to comment.