From fbea6c4e901e62928125c3913021c7ed532a3cae Mon Sep 17 00:00:00 2001 From: Adam Moody Date: Mon, 20 Nov 2023 19:23:33 -0800 Subject: [PATCH] guard with HAVE_PTHREADS --- src/redset_xor.c | 86 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/src/redset_xor.c b/src/redset_xor.c index 390540a..faef9f7 100644 --- a/src/redset_xor.c +++ b/src/redset_xor.c @@ -6,6 +6,7 @@ #include #include +#ifdef HAVE_PTHREADS #include /* get_nprocs() */ @@ -14,6 +15,7 @@ #else #include #endif +#endif /* HAVE_PTHREADS */ #include "mpi.h" @@ -34,6 +36,16 @@ Distribute and file rebuild functions ========================================= */ +/* XOR a with b, store result in a */ +static void reduce_xor(unsigned char* a, const unsigned char* b, size_t count) +{ + size_t i; + for (i = 0; i < count; i++) { + a[i] ^= b[i]; + } +} + +#ifdef HAVE_PTHREADS /* Linux and OSX compatible 'get number of hardware threads' */ unsigned int redset_get_nprocs(void) { @@ -54,36 +66,30 @@ unsigned int redset_get_nprocs(void) return cpu_threads; } +/* defines work for each thread along with data structures + * to coordinate with main thread */ typedef struct { - int rank; - unsigned char* a; - unsigned char* b; - size_t n; - - pthread_mutex_t mutex; - - int done; - int main_waiting; - int thread_waiting; - pthread_cond_t cond_main; - pthread_cond_t cond_thread; + int rank; /* rank of thread (used for debugging) */ + unsigned char* a; /* input/output buffer */ + unsigned char* b; /* other input buffer */ + size_t n; /* size of buffer in bytes */ + + pthread_mutex_t mutex; /* mutex for condition vars below */ + + int done; /* worker sets to 1 when done */ + int main_waiting; /* indicates main thread is waiting on cond_main */ + int thread_waiting; /* indicates worker thread is waiting on cond_thread */ + pthread_cond_t cond_main; /* condition variable to wake main thread */ + pthread_cond_t cond_thread; /* condition variable to wake worker thread */ } thread_xor_t; +/* data structure used on main process to track each thread it launches */ typedef struct { - int num; - pthread_t* tids; - thread_xor_t* data; + int num; /* number of threads */ + pthread_t* tids; /* pthread_t for each thread */ + thread_xor_t* data; /* data structure for each thread */ } threadset_t; -/* XOR a with b, store result in a */ -static void reduce_xor(unsigned char* a, const unsigned char* b, size_t count) -{ - size_t i; - for (i = 0; i < count; i++) { - a[i] ^= b[i]; - } -} - /* The actual pthread function */ void* reduce_xor_pthread_fn(void* arg) { @@ -114,7 +120,7 @@ void* reduce_xor_pthread_fn2(void* arg) /* do work */ reduce_xor(d->a, d->b, d->n); - /* signal to main thread that we're done */ + /* signal main thread that we're done */ pthread_mutex_lock(&d->mutex); d->done = 1; if (d->main_waiting) { @@ -191,6 +197,7 @@ static int reduce_xor_pthread(unsigned char* a, unsigned char* b, size_t count) return ret; } +/* spawn a set of threads and fill in threadset structure to track them */ static int reduce_xor_pthread_setup(threadset_t* tset) { int i; @@ -199,6 +206,7 @@ static int reduce_xor_pthread_setup(threadset_t* tset) /* TODO: launch threads and attach to descriptor, activate via condition variable */ + /* compute number of threads to start up */ int max_threads = 16; int nthreads = redset_get_nprocs(); if (nthreads > max_threads) { @@ -236,6 +244,7 @@ static int reduce_xor_pthread_setup(threadset_t* tset) return ret; } +/* given data, assign work to threads and perform XOR reduction */ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsigned char* b, size_t count) { int i; @@ -246,6 +255,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig int nthreads = tset->num; + /* compute number of bytes for each thread to reduce */ size_t size = count / nthreads; if (size * nthreads < count) { size += 1; @@ -253,7 +263,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig /* TODO: ensure size of some minimum */ - /* define work for each thread */ + /* define work descriptor for each thread */ size_t offset = 0; for (i = 0; i < nthreads; i++) { size_t amt = count - offset; @@ -261,7 +271,6 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig amt = size; } - //printf("thread %d %lu\n", i, (unsigned long) amt); tset->data[i].a = a + offset; tset->data[i].b = b + offset; tset->data[i].n = amt; @@ -269,6 +278,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig offset += amt; } + /* signal each worker that has something to do */ for (i = 0; i < nthreads; i++) { if (tset->data[i].n > 0) { pthread_mutex_lock(&tset->data[i].mutex); @@ -280,6 +290,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig } } + /* wait for each worker to signal that it is done */ for (i = 0; i < nthreads; i++) { if (tset->data[i].n > 0) { pthread_mutex_lock(&tset->data[i].mutex); @@ -295,6 +306,7 @@ static int reduce_xor_pthread_execute(threadset_t* tset, unsigned char* a, unsig return ret; } +/* signal threads to shutdown, wait for them to exit, and free resources in thread set */ static int reduce_xor_pthread_teardown(threadset_t* tset) { int i; @@ -333,9 +345,11 @@ static int reduce_xor_pthread_teardown(threadset_t* tset) /* free memory in data structure */ redset_free(&tset->data); redset_free(&tset->tids); + tset->num = 0; return ret; } +#endif /* HAVE_PTHREADS */ /* set chunk filenames of form: xor.__of_.redset */ static void redset_build_xor_filename( @@ -624,8 +638,10 @@ int redset_apply_xor( kvtree_write_fd(my_chunk_file, fd_xor, header); kvtree_delete(&header); +#ifdef HAVE_PTHREADS threadset_t threads; reduce_xor_pthread_setup(&threads); +#endif /* HAVE_PTHREADS */ MPI_Request request[2]; MPI_Status status[2]; @@ -658,9 +674,12 @@ int redset_apply_xor( /* TODO: XORing with unsigned long would be faster here (if chunk size is multiple of this size) */ /* merge the blocks via xor operation */ if (chunk_id < d->ranks-1) { - //reduce_xor(send_buf, recv_buf, count); +#ifdef HAVE_PTHREADS //reduce_xor_pthread(send_buf, recv_buf, count); reduce_xor_pthread_execute(&threads, send_buf, recv_buf, count); +#else + reduce_xor(send_buf, recv_buf, count); +#endif /* HAVE_PTHREADS */ } if (chunk_id > 0) { @@ -691,7 +710,9 @@ int redset_apply_xor( redset_buffers_free(1, &send_bufs); redset_buffers_free(1, &recv_bufs); +#ifdef HAVE_PTHREADS reduce_xor_pthread_teardown(&threads); +#endif /* HAVE_PTHREADS */ #if 0 /* if crc_on_copy is set, compute and store CRC32 value for chunk file */ @@ -822,8 +843,10 @@ int redset_recover_xor_rebuild( ); } +#ifdef HAVE_PTHREADS threadset_t threads; reduce_xor_pthread_setup(&threads); +#endif /* HAVE_PTHREADS */ /* allocate buffer to read a piece of my file */ unsigned char** send_bufs = (unsigned char**) redset_buffers_alloc(1, redset_mpi_buf_size); @@ -865,9 +888,12 @@ int redset_recover_xor_rebuild( /* if not start of pipeline, receive data from left and xor with my own */ if (root != state->lhs_rank) { MPI_Recv(recv_buf, count, MPI_BYTE, state->lhs_rank, 0, d->comm, &status[0]); - //reduce_xor(send_buf, recv_buf, count); +#ifdef HAVE_PTHREADS //reduce_xor_pthread(send_buf, recv_buf, count); reduce_xor_pthread_execute(&threads, send_buf, recv_buf, count); +#else + reduce_xor(send_buf, recv_buf, count); +#endif /* HAVE_PTHREADS */ } /* send data to right-side partner */ @@ -946,7 +972,9 @@ int redset_recover_xor_rebuild( * we do this on every file instead of just the rebuilt files so that we preserve atime on all files */ redset_lofi_apply_meta(current_hash); +#ifdef HAVE_PTHREADS reduce_xor_pthread_teardown(&threads); +#endif /* HAVE_PTHREADS */ /* free the buffers */ redset_buffers_free(1, &recv_bufs);