From b0d57b39be12bdafef8b26218533a0d31eb19bf3 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Tue, 4 Sep 2018 14:08:48 -0600 Subject: [PATCH 1/2] bw/bibw: add support for multiple process pairs This commit updates the bandwidth and bi-directional bandwidth tests to allow multiple process pairs. This allows us to compare the performance of MPI-only with MPI+threads within the same benchmark framework. Signed-off-by: Nathan Hjelm --- src/rmamt_bibw.c | 481 ++++++++++++++++++++------------------------ src/rmamt_bw.c | 460 ++++++++++++++++++++---------------------- src/rmamt_common.c | 15 +- src/rmamt_common.h | 16 +- src/rmamt_options.c | 31 +-- src/rmamt_options.h | 6 +- 6 files changed, 484 insertions(+), 525 deletions(-) diff --git a/src/rmamt_bibw.c b/src/rmamt_bibw.c index d6a3ca2..1dbd1c7 100644 --- a/src/rmamt_bibw.c +++ b/src/rmamt_bibw.c @@ -15,115 +15,127 @@ #include #include #include +#include +#include +#include +#include +#include uint64_t find_max(); -/* target-side functions for single window */ -static void *runfunc_put (ArgStruct* a); -static void *runfunc_get (ArgStruct* a); - /* origin-side window per thread functions */ -static void *bibw_put_lock_all_winperthread (ArgStruct* a); -static void *bibw_get_lock_all_winperthread (ArgStruct* a); -static void *bibw_put_fence_winperthread (ArgStruct* a); -static void *bibw_get_fence_winperthread (ArgStruct* a); -static void *bibw_put_lock_per_rank_winperthread (ArgStruct* a); -static void *bibw_get_lock_per_rank_winperthread (ArgStruct* a); -static void *bibw_put_flush_winperthread (ArgStruct* a); -static void *bibw_get_flush_winperthread (ArgStruct* a); -static void *bibw_put_pscw_winperthread (ArgStruct* a); -static void *bibw_get_pscw_winperthread (ArgStruct* a); - -static rmamt_fn_t rmamt_winperthread_fns[RMAMT_OPERATIONS_MAX][RMAMT_SYNC_MAX] = { +static void *bibw_put_lock_all_new (ArgStruct* a); +static void *bibw_get_lock_all_new (ArgStruct* a); +static void *bibw_put_fence_new (ArgStruct* a); +static void *bibw_get_fence_new (ArgStruct* a); +static void *bibw_put_lock_per_rank_new (ArgStruct* a); +static void *bibw_get_lock_per_rank_new (ArgStruct* a); +static void *bibw_put_flush_new (ArgStruct* a); +static void *bibw_get_flush_new (ArgStruct* a); +static void *bibw_put_pscw_new (ArgStruct* a); +static void *bibw_get_pscw_new (ArgStruct* a); +static void *bibw_put_flush_thread_new (ArgStruct* a); +static void *bibw_get_flush_thread_new (ArgStruct* a); +static void *bibw_put_flush_all_new (ArgStruct* a); +static void *bibw_get_flush_all_new (ArgStruct* a); +static void *bibw_put_flush_local_new (ArgStruct* a); +static void *bibw_get_flush_local_new (ArgStruct* a); +static void *bibw_put_flush_local_all_new (ArgStruct* a); +static void *bibw_get_flush_local_all_new (ArgStruct* a); + +static rmamt_fn_t rmamt_new_fns[RMAMT_OPERATIONS_MAX][RMAMT_SYNC_MAX] = { [RMAMT_PUT] = { - [RMAMT_LOCK_ALL] = bibw_put_lock_all_winperthread, - [RMAMT_FENCE] = bibw_put_fence_winperthread, - [RMAMT_LOCK] = bibw_put_lock_per_rank_winperthread, - [RMAMT_FLUSH] = bibw_put_flush_winperthread, - [RMAMT_PSCW] = bibw_put_pscw_winperthread, + [RMAMT_LOCK_ALL] = bibw_put_lock_all_new, + [RMAMT_FENCE] = bibw_put_fence_new, + [RMAMT_LOCK] = bibw_put_lock_per_rank_new, + [RMAMT_FLUSH] = bibw_put_flush_new, + [RMAMT_PSCW] = bibw_put_pscw_new, + [RMAMT_ALL_FLUSH] = bibw_put_flush_new, + [RMAMT_FLUSH_ALL] = bibw_put_flush_all_new, + [RMAMT_FLUSH_LOCAL] = bibw_put_flush_local_new, + [RMAMT_FLUSH_LOCAL_ALL] = bibw_put_flush_local_all_new, }, [RMAMT_GET] = { - [RMAMT_LOCK_ALL] = bibw_get_lock_all_winperthread, - [RMAMT_FENCE] = bibw_get_fence_winperthread, - [RMAMT_LOCK] = bibw_get_lock_per_rank_winperthread, - [RMAMT_FLUSH] = bibw_get_flush_winperthread, - [RMAMT_PSCW] = bibw_get_pscw_winperthread, + [RMAMT_LOCK_ALL] = bibw_get_lock_all_new, + [RMAMT_FENCE] = bibw_get_fence_new, + [RMAMT_LOCK] = bibw_get_lock_per_rank_new, + [RMAMT_FLUSH] = bibw_get_flush_new, + [RMAMT_PSCW] = bibw_get_pscw_new, + [RMAMT_ALL_FLUSH] = bibw_get_flush_new, + [RMAMT_FLUSH_ALL] = bibw_get_flush_all_new, + [RMAMT_FLUSH_LOCAL] = bibw_get_flush_local_new, + [RMAMT_FLUSH_LOCAL_ALL] = bibw_get_flush_local_all_new, }, }; -/* origin-side functions */ -static void *bibw_orig_lock_all (ArgStruct *a); -static void *bibw_orig_lock (ArgStruct *a); -static void *bibw_orig_flush (ArgStruct *a); -static void *bibw_orig_fence (ArgStruct *a); -static void *bibw_orig_pscw (ArgStruct *a); - -static rmamt_fn_t rmamt_origin_fns[RMAMT_SYNC_MAX] = { - [RMAMT_LOCK_ALL] = bibw_orig_lock_all, - [RMAMT_FENCE] = bibw_orig_fence, - [RMAMT_LOCK] = bibw_orig_lock, - [RMAMT_FLUSH] = bibw_orig_flush, - [RMAMT_PSCW] = bibw_orig_pscw, -}; - static ArgStruct args[MAX_THREADS]; static uint64_t thread_etimes[MAX_THREADS]; static char* tbufs[MAX_THREADS]; static char* obuf; static MPI_Win win[MAX_THREADS]; static int64_t times[MAX_THREADS][256]; +static uint64_t barrier_time; int main(int argc,char *argv[]) { + MPI_Comm test_comm, leader_comm; MPI_Group group = MPI_GROUP_NULL, comm_group; int nprocs, provided, rank, rc; pthread_t id[MAX_THREADS]; MPI_Request req; int64_t win_size; - int win_count; + int win_count, pairs; size_t max_size, min_size; - uint64_t stt, ttt = 0; FILE *output_file = NULL; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); if (provided != MPI_THREAD_MULTIPLE) { - printf("Thread multiple needed\n"); + printf("MPI Thread multiple support required to run this benchmark\n"); MPI_Abort(MPI_COMM_WORLD, 1); } - MPI_Comm_size(MPI_COMM_WORLD,&nprocs); - if (nprocs != 2) { - printf("Run with 2 processes\n"); + MPI_T_init_thread (MPI_THREAD_MULTIPLE, &provided); + + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + if (nprocs & 1) { + printf("Run with a multiple of two MPI processes\n"); MPI_Abort(MPI_COMM_WORLD, 1); } - MPI_Comm_rank(MPI_COMM_WORLD,&rank); - MPI_Comm_group (MPI_COMM_WORLD, &comm_group); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + pairs = nprocs >> 1; + + /* create test communicator */ + MPI_Comm_split (MPI_COMM_WORLD, rank & ~1, 0, &test_comm); + MPI_Comm_split (MPI_COMM_WORLD, (rank & 1) ? MPI_UNDEFINED : 0, 0, &leader_comm); + + MPI_Comm_group (test_comm, &comm_group); rmamt_parse_options ("rmamt_bibw", argc, argv); if (rmamt_output_file && 0 == rank) { - output_file = fopen (rmamt_output_file, "w"); - if (NULL == output_file) { - fprintf (stderr, "Could not open %s for writing\n", rmamt_output_file); - MPI_Abort (MPI_COMM_WORLD, 1); - } - free (rmamt_output_file); + output_file = fopen (rmamt_output_file, "w"); + if (NULL == output_file) { + fprintf (stderr, "Could not open %s for writing\n", rmamt_output_file); + MPI_Abort (MPI_COMM_WORLD, 1); + } + free (rmamt_output_file); } if (rmamt_bind_threads) { - rc = rmamt_bind_init (); - if (-1 == rc) { - printf ("***** WARNING: Thread binding requested but not available *****\n"); - } + rc = rmamt_bind_init (); + if (-1 == rc) { + printf ("***** WARNING: Thread binding requested but not available *****\n"); + } } min_size = rmamt_min_size / rmamt_threads; if (min_size < 1) { - min_size = 1; + min_size = 1; } - max_size = rmamt_max_size / rmamt_threads; + max_size = rmamt_max_size / (rmamt_threads * pairs); win_count = rmamt_win_per_thread ? rmamt_threads : 1; win_size = rmamt_max_size / win_count; @@ -139,7 +151,7 @@ int main(int argc,char *argv[]) /* create windows */ for (int i = 0 ; i < win_count ; ++i) { - MPI_CHECK(MPI_Win_allocate (win_size, 1, MPI_INFO_NULL, MPI_COMM_WORLD, + MPI_CHECK(MPI_Win_allocate (win_size, 1, MPI_INFO_NULL, test_comm, tbufs + i, win + i)); if (win_size) { memset (tbufs[i], '-', win_size); @@ -147,10 +159,9 @@ int main(int argc,char *argv[]) } if (RMAMT_PSCW == rmamt_sync) { - MPI_Group_incl (comm_group, 1, &(int){!rank}, &group); + MPI_Group_incl (comm_group, 1, &(int){!(rank & 1)}, &group); } - if (!rank) { printf ("##########################################\n"); printf ("# RMA-MT Bi-directional Bandwidth\n"); @@ -158,92 +169,90 @@ int main(int argc,char *argv[]) printf ("# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); printf ("# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); printf ("# Thread count: %u\n", (unsigned) rmamt_threads); + printf ("# Number of MPI process pairs: %d\n", pairs); printf ("# Iterations: %u\n", (unsigned) rmamt_iterations); - printf ("# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); - printf ("# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); + printf ("# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); + printf ("# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); printf ("##########################################\n"); - printf (" BpT(%i)\t BxT(%i)\tBandwidth(MiB/s)\tMessage_Rate(M/s)\n", rmamt_threads, rmamt_threads); - if (output_file) { - fprintf (output_file, "##########################################\n"); - fprintf (output_file, "# RMA-MT Bandwidth\n"); - fprintf (output_file, "#\n"); - fprintf (output_file, "# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); - fprintf (output_file, "# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); - fprintf (output_file, "# Thread count: %u\n", (unsigned) rmamt_threads); - fprintf (output_file, "# Iterations: %u\n", (unsigned) rmamt_iterations); - fprintf (output_file, "# Ibarrier: %s, sleep interval: %uns\n", rmamt_use_ibarrier ? "yes" : "no", - rmamt_sleep_interval); - fprintf (output_file, "# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); - fprintf (output_file, "# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); - fprintf (output_file, "##########################################\n"); - fprintf (output_file, "BpT(%i),BxT(%i),Bandwidth(MiB/s),Message_Rate(M/s)\n", rmamt_threads, rmamt_threads); - } + printf (" BpT(%i)\t BxT(%i)\tBandwidth(MiB/s)\tMessage_Rate(M/s)\n", rmamt_threads, rmamt_threads); + if (output_file) { + fprintf (output_file, "##########################################\n"); + fprintf (output_file, "# RMA-MT Bi-directional Bandwidth\n"); + fprintf (output_file, "#\n"); + fprintf (output_file, "# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); + fprintf (output_file, "# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); + fprintf (output_file, "# Thread count: %u\n", (unsigned) rmamt_threads); + fprintf (output_file, "# Number of MPI process pairs: %d\n", pairs); + fprintf (output_file, "# Iterations: %u\n", (unsigned) rmamt_iterations); + fprintf (output_file, "# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); + fprintf (output_file, "# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); + fprintf (output_file, "##########################################\n"); + fprintf (output_file, "BpT(%i),BxT(%i),Bandwidth(MiB/s),Message_Rate(M/s)\n", rmamt_threads, rmamt_threads); + } } - thread_barrier_init (rmamt_win_per_thread ? rmamt_threads : rmamt_threads + 1); + thread_barrier_init (rmamt_threads); - stt = time_getns (); - for (int i = 0 ; i < rmamt_threads ; ++i) { - args[i].tid = i; - args[i].max_size = max_size; - args[i].min_size = min_size; - args[i].win = rmamt_win_per_thread ? win[i] : win[0]; - args[i].group = group; - args[i].target = !rank; - - //printf("args[%u].tid = %u\n", i, arggs[i].tid); - if (!rmamt_win_per_thread) { - pthread_create(id+i, NULL, (void *(*)(void *)) (RMAMT_GET == rmamt_operation ? runfunc_get : runfunc_put), args+i); - } else { - pthread_create(id+i, NULL, (void *(*)(void *)) rmamt_winperthread_fns[rmamt_operation][rmamt_sync], args+i); - } + /* attempt to measure barrier overhead to improve the quality of the benchmark as the number + * of MPI process pairs increases */ + MPI_Barrier (MPI_COMM_WORLD); + uint64_t start = time_getns (); + for (int i = 0 ; i < 100 ; ++i) { + MPI_Barrier (MPI_COMM_WORLD); } - /* wait for threads to be ready */ - thread_barrier (0); - - if (ttt < find_max()-stt) ttt = find_max()-stt; + barrier_time = (time_getns () - start) / 100; - if (!rmamt_win_per_thread) { - rmamt_origin_fns[rmamt_sync] (&(ArgStruct){.min_size = min_size, .max_size = max_size, .group = group, .win = win[0], .target = !rank}); + for (int i = 0 ; i < rmamt_threads ; ++i) { + args[i].tid = i; + args[i].max_size = max_size; + args[i].min_size = min_size; + args[i].win = rmamt_win_per_thread ? win[i] : win[0]; + args[i].group = group; + args[i].all_sync = (RMAMT_ALL_FLUSH == rmamt_sync); + args[i].target = !(rank & 1); + + args[i].do_init = (rmamt_win_per_thread || 0 == i); + args[i].do_sync = (args[i].all_sync || 0 == i); + + pthread_create(id+i, NULL, (void *(*)(void *)) rmamt_new_fns[rmamt_operation][rmamt_sync], args+i); } for (int i = 0 ; i < rmamt_threads ; ++i) { - pthread_join(id[i], NULL); + pthread_join(id[i], NULL); } - if (0 == rank) { - for (uint32_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { - /* there are messages going in both directions */ - size_t sz = 2 * j * (rmamt_threads / win_count); - float max_time = 0.0; - float speed = 0.0; + for (uint32_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { + double speed = 0.0, msg_rate = 0.0; - for (int i = 0 ; i < win_count ; ++i) { - speed += ((float) (sz * rmamt_iterations) * 953.67431640625) / (float) times[i][step]; - max_time = max_time > (float) times[i][step] ? max_time : (float) times[i][step]; - } + for (int i = 0 ; i < rmamt_threads ; ++i) { + speed += ((double) (j * rmamt_iterations) * 953.67431640625) / (double) times[i][step]; + msg_rate += ((double) (rmamt_iterations) * 1000000000.0) / (double) times[i][step]; + } + + MPI_Allreduce (MPI_IN_PLACE, &speed, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); + MPI_Allreduce (MPI_IN_PLACE, &msg_rate, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); - if (output_file) { - fprintf (output_file, "%lu,%lu,%f,%f\n", (unsigned long) j, (unsigned long) j * rmamt_threads, speed, - (rmamt_threads * 2 * rmamt_iterations * 1000000000.0) / max_time); - } + if (0 == rank) { + if (output_file) { + fprintf (output_file, "%lu,%lu,%lf,%lf\n", (unsigned long) j, (unsigned long) j * rmamt_threads, + speed, msg_rate); + } - printf ("%9lu\t%9lu\t%13.3f\t\t%13.3f\n", (unsigned long) j, (unsigned long) j * rmamt_threads, speed, - (rmamt_threads * 2 * rmamt_iterations * 1000000000.0) / max_time); + printf ("%9lu\t%9lu\t%13.3lf\t\t%13.3lf\n", (unsigned long) j, (unsigned long) j * rmamt_threads, + speed, msg_rate); } } if (MPI_GROUP_NULL != group) { MPI_Group_free (&group); } - MPI_Group_free (&comm_group); MPI_Barrier (MPI_COMM_WORLD); if (output_file) { - fclose (output_file); + fclose (output_file); } for (int i = 0 ; i < win_count ; ++i) { @@ -251,11 +260,13 @@ int main(int argc,char *argv[]) } if (rmamt_bind_threads) { - rmamt_bind_finalize (); + rmamt_bind_finalize (); } rmamt_free (obuf, rmamt_max_size); + MPI_Comm_free (&test_comm); + MPI_T_finalize (); MPI_Finalize(); return 0; } @@ -270,175 +281,125 @@ uint64_t find_max(){ } #define DEFINE_ORIGIN_THREAD_FN(sync, type, fn, init_fn, start_sync, end_sync, fini_fn, expose, release) \ - static void *bibw_ ## type ## _ ## sync ## _winperthread (ArgStruct* a) { \ + static void *bibw_ ## type ## _ ## sync ## _new (ArgStruct* a) { \ const int tid = (int) a->tid; \ - uint64_t start, stop, ttime; \ + uint64_t start, stop; \ size_t max_size = a->max_size; \ size_t min_size = a->min_size; \ + int barrier_cycle = 0; \ \ - if (rmamt_bind_threads) { \ - rmamt_bind (tid); \ - } \ - \ - thread_etimes[tid] = time_getns (); \ + if (rmamt_bind_threads) { \ + rmamt_bind (tid); \ + } \ \ - init_fn; \ - /* signal the main thread that we are ready */ \ - thread_barrier (0); \ + thread_etimes[tid] = time_getns (); \ \ - for (uint32_t j = min_size, cycle = 0 ; j <= max_size ; j <<= 1) { \ - expose; \ - start_sync; \ + if (a->do_init) { \ + init_fn; \ + } \ \ - for (int l = 0 ; l < RMAMT_WARMUP_ITERATIONS ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, a->target, 0, j, \ - MPI_BYTE, a->win); \ + for (uint32_t j = min_size, cycle = 0 ; j <= max_size ; j <<= 1, ++cycle) { \ + if (0 == tid) { \ + expose; \ } \ \ - end_sync; \ - release; \ - \ - MPI_Barrier (MPI_COMM_WORLD); \ - \ - thread_barrier (cycle * 2 + 1); \ + if (a->do_sync) { \ + start_sync; \ + } \ \ - start = time_getns (); \ - expose; \ - start_sync; \ + thread_barrier (barrier_cycle++); \ \ - for (int l = 0 ; l < rmamt_iterations ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, a->target, 0, j, \ - MPI_BYTE, a->win); \ + for (int l = 0 ; l < RMAMT_WARMUP_ITERATIONS ; l++) { \ + fn (obuf + tid * j, j, MPI_BYTE, a->target, 0, j, \ + MPI_BYTE, a->win); \ } \ \ - end_sync; \ - release; \ - MPI_Barrier (MPI_COMM_WORLD); \ - ttime = time_getns () - start; \ + if (!a->all_sync) { \ + thread_barrier (barrier_cycle++); \ + } \ \ - times[tid][cycle] = ttime; \ - thread_barrier (cycle * 2 + 2); \ - ++cycle; \ - } \ + if (a->do_sync) { \ + end_sync; \ + } \ \ - fini_fn; \ + if (0 == tid) { \ + release; \ + MPI_Barrier (MPI_COMM_WORLD); \ + } \ \ - return 0; \ - } - -DEFINE_ORIGIN_THREAD_FN(lock_all, put, MPI_Put, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), - (void) 0, (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(lock_all, get, MPI_Get, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), - (void) 0, (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(lock_per_rank, put, MPI_Put, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), - MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(lock_per_rank, get, MPI_Get, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), - MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(flush, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, - MPI_Win_flush (a->target, a->win), MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(flush, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, - MPI_Win_flush (a->target, a->win), MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(pscw, put, MPI_Put, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0, - MPI_Win_post (a->group, 0, a->win), MPI_Win_wait (a->win)) -DEFINE_ORIGIN_THREAD_FN(pscw, get, MPI_Get, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0, - MPI_Win_post (a->group, 0, a->win), MPI_Win_wait (a->win)) -DEFINE_ORIGIN_THREAD_FN(fence, put, MPI_Put, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), - (void) 0, (void) 0, (void) 0) -DEFINE_ORIGIN_THREAD_FN(fence, get, MPI_Get, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), - (void) 0, (void) 0, (void) 0) - -/* origin-side loops */ -#define DEFINE_ORIGIN_FN(sync, init_fn, start_sync, end_sync, fini_fn, expose, release) \ - static void *bibw_orig_ ## sync (ArgStruct *a) { \ - size_t max_size = a->max_size; \ - size_t min_size = a->min_size; \ + thread_barrier (barrier_cycle++); \ \ - init_fn; \ + start = time_getns (); \ \ - for (uint64_t j = min_size, cycle = 0, barrier_cycle = 1 ; j <= max_size ; j <<= 1, cycle++) { \ - uint64_t stime, etime, ttime; \ + if (0 == tid) { \ + expose; \ + } \ \ - /* warm up */ \ - MPI_CHECK( expose ); \ - MPI_CHECK( start_sync ); \ + if (a->do_sync) { \ + start_sync; \ + } \ \ thread_barrier (barrier_cycle++); \ - thread_barrier (barrier_cycle++); \ - \ - MPI_CHECK( end_sync); \ - MPI_CHECK( release ); \ \ - MPI_Barrier (MPI_COMM_WORLD); \ - \ - /* timing */ \ - stime = time_getns(); \ - MPI_CHECK( expose ); \ - MPI_CHECK( start_sync ); \ + for (int l = 0 ; l < rmamt_iterations ; l++) { \ + fn (obuf + tid * j, j, MPI_BYTE, a->target, 0, j, \ + MPI_BYTE, a->win); \ + } \ \ thread_barrier (barrier_cycle++); \ - thread_barrier (barrier_cycle++); \ - \ - MPI_CHECK( end_sync ); \ - MPI_CHECK( release ); \ - MPI_Barrier (MPI_COMM_WORLD); \ - etime = time_getns (); \ - ttime = etime - stime; \ - \ - times[0][cycle] = ttime; \ - } \ - \ - fini_fn; \ \ - return NULL; \ - } - -DEFINE_ORIGIN_FN(lock_all, (void) 0, MPI_Win_lock_all(0, a->win), MPI_Win_unlock_all (a->win), (void) 0, - MPI_SUCCESS, MPI_SUCCESS) -DEFINE_ORIGIN_FN(lock, (void) 0, MPI_Win_lock(MPI_LOCK_SHARED, a->target, 0, a->win), MPI_Win_unlock (a->target, a->win), - (void) 0, MPI_SUCCESS, MPI_SUCCESS) -DEFINE_ORIGIN_FN(flush, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), MPI_SUCCESS, MPI_Win_flush (a->target, a->win), - MPI_Win_unlock (a->target, a->win), MPI_SUCCESS, MPI_SUCCESS) -DEFINE_ORIGIN_FN(fence, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), MPI_SUCCESS, MPI_Win_fence (0, a->win), - (void) 0, MPI_SUCCESS, MPI_SUCCESS) -DEFINE_ORIGIN_FN(pscw, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0, - MPI_Win_post (a->group, 0, a->win), MPI_Win_wait (a->win)) - -#define DEFINE_ORIGIN_THREAD_RUNFN(fn, type) \ - static void *runfunc_ ## type (ArgStruct* a) { \ - int tid = (int) a->tid; \ - size_t max_size = a->max_size; \ - size_t min_size = a->min_size; \ - \ - if (rmamt_bind_threads) { \ - rmamt_bind (tid); \ - } \ + if (a->do_sync) { \ + end_sync; \ + } \ \ - thread_etimes[tid] = time_getns (); \ + if (0 == tid) { \ + release; \ + MPI_Barrier (MPI_COMM_WORLD); \ + } \ \ - /* signal the main thread that we are ready */ \ - thread_barrier (0); \ + thread_barrier (barrier_cycle++); \ \ - for (uint32_t j = min_size, cycle = 1 ; j <= max_size ; j <<= 1) { \ - thread_barrier (cycle++); \ + stop = time_getns (); \ \ - for (int l = 0 ; l < RMAMT_WARMUP_ITERATIONS ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, a->target, tid * j, \ - j, MPI_BYTE, win[0]); \ - } \ - \ - thread_barrier (cycle++); \ - thread_barrier (cycle++); \ + times[tid][cycle] = stop - start - barrier_time; \ \ - for (int l = 0 ; l < rmamt_iterations ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, a->target, tid * j, \ - j, MPI_BYTE, win[0]); \ - } \ + thread_barrier (barrier_cycle++); \ + } \ \ - thread_barrier (cycle++); \ + if (a->do_init) { \ + fini_fn; \ } \ \ return 0; \ } -DEFINE_ORIGIN_THREAD_RUNFN(MPI_Get, get) -DEFINE_ORIGIN_THREAD_RUNFN(MPI_Put, put) +DEFINE_ORIGIN_THREAD_FN(lock_all, put, MPI_Put, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), (void) 0, (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(lock_all, get, MPI_Get, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), (void) 0, (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(lock_per_rank, put, MPI_Put, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), MPI_Win_unlock (a->target, a->win), + (void) 0, (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(lock_per_rank, get, MPI_Get, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), MPI_Win_unlock (a->target, a->win), + (void) 0, (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush (a->target, a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush (a->target, a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(pscw, put, MPI_Put, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0, + MPI_Win_post (a->group, 0, a->win), MPI_Win_wait (a->win)) +DEFINE_ORIGIN_THREAD_FN(pscw, get, MPI_Get, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0, + MPI_Win_post (a->group, 0, a->win), MPI_Win_wait (a->win)) +DEFINE_ORIGIN_THREAD_FN(fence, put, MPI_Put, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), (void) 0, + MPI_Win_fence (0, a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(fence, get, MPI_Get, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), (void) 0, + MPI_Win_fence (0, a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_all, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_all (a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_all, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_all (a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_local, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local (a->target, a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_local, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local (a->target, a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_local_all, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local_all (a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_local_all, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local_all (a->win), + MPI_Win_unlock (a->target, a->win), (void) 0, (void) 0) diff --git a/src/rmamt_bw.c b/src/rmamt_bw.c index 8592dbd..064ba97 100644 --- a/src/rmamt_bw.c +++ b/src/rmamt_bw.c @@ -18,119 +18,128 @@ #include #include #include - -uint64_t find_max(); +#include +#include /* target-side functions for single window */ -static void *runfunc_put (ArgStruct* a); -static void *runfunc_get (ArgStruct* a); static void *runfunc_pscw (ArgStruct* a); static void *runfunc_fence (ArgStruct* a); /* origin-side window per thread functions */ -static void *bw_put_lock_all_winperthread (ArgStruct* a); -static void *bw_get_lock_all_winperthread (ArgStruct* a); -static void *bw_put_fence_winperthread (ArgStruct* a); -static void *bw_get_fence_winperthread (ArgStruct* a); -static void *bw_put_lock_per_rank_winperthread (ArgStruct* a); -static void *bw_get_lock_per_rank_winperthread (ArgStruct* a); -static void *bw_put_flush_winperthread (ArgStruct* a); -static void *bw_get_flush_winperthread (ArgStruct* a); -static void *bw_put_pscw_winperthread (ArgStruct* a); -static void *bw_get_pscw_winperthread (ArgStruct* a); - -static rmamt_fn_t rmamt_winperthread_fns[RMAMT_OPERATIONS_MAX][RMAMT_SYNC_MAX] = { +static void *bw_put_lock_all_new (ArgStruct* a); +static void *bw_get_lock_all_new (ArgStruct* a); +static void *bw_put_fence_new (ArgStruct* a); +static void *bw_get_fence_new (ArgStruct* a); +static void *bw_put_lock_per_rank_new (ArgStruct* a); +static void *bw_get_lock_per_rank_new (ArgStruct* a); +static void *bw_put_flush_new (ArgStruct* a); +static void *bw_get_flush_new (ArgStruct* a); +static void *bw_put_pscw_new (ArgStruct* a); +static void *bw_get_pscw_new (ArgStruct* a); +static void *bw_put_flush_thread_new (ArgStruct* a); +static void *bw_get_flush_thread_new (ArgStruct* a); +static void *bw_put_flush_all_new (ArgStruct* a); +static void *bw_get_flush_all_new (ArgStruct* a); +static void *bw_put_flush_local_new (ArgStruct* a); +static void *bw_get_flush_local_new (ArgStruct* a); +static void *bw_put_flush_local_all_new (ArgStruct* a); +static void *bw_get_flush_local_all_new (ArgStruct* a); + +static rmamt_fn_t rmamt_new_fns[RMAMT_OPERATIONS_MAX][RMAMT_SYNC_MAX] = { [RMAMT_PUT] = { - [RMAMT_LOCK_ALL] = bw_put_lock_all_winperthread, - [RMAMT_FENCE] = bw_put_fence_winperthread, - [RMAMT_LOCK] = bw_put_lock_per_rank_winperthread, - [RMAMT_FLUSH] = bw_put_flush_winperthread, - [RMAMT_PSCW] = bw_put_pscw_winperthread, + [RMAMT_LOCK_ALL] = bw_put_lock_all_new, + [RMAMT_FENCE] = bw_put_fence_new, + [RMAMT_LOCK] = bw_put_lock_per_rank_new, + [RMAMT_FLUSH] = bw_put_flush_new, + [RMAMT_PSCW] = bw_put_pscw_new, + [RMAMT_ALL_FLUSH] = bw_put_flush_new, + [RMAMT_FLUSH_ALL] = bw_put_flush_all_new, + [RMAMT_FLUSH_LOCAL] = bw_put_flush_local_new, + [RMAMT_FLUSH_LOCAL_ALL] = bw_put_flush_local_all_new, }, [RMAMT_GET] = { - [RMAMT_LOCK_ALL] = bw_get_lock_all_winperthread, - [RMAMT_FENCE] = bw_get_fence_winperthread, - [RMAMT_LOCK] = bw_get_lock_per_rank_winperthread, - [RMAMT_FLUSH] = bw_get_flush_winperthread, - [RMAMT_PSCW] = bw_get_pscw_winperthread, + [RMAMT_LOCK_ALL] = bw_get_lock_all_new, + [RMAMT_FENCE] = bw_get_fence_new, + [RMAMT_LOCK] = bw_get_lock_per_rank_new, + [RMAMT_FLUSH] = bw_get_flush_new, + [RMAMT_PSCW] = bw_get_pscw_new, + [RMAMT_ALL_FLUSH] = bw_get_flush_new, + [RMAMT_FLUSH_ALL] = bw_get_flush_all_new, + [RMAMT_FLUSH_LOCAL] = bw_get_flush_local_new, + [RMAMT_FLUSH_LOCAL_ALL] = bw_get_flush_local_all_new, }, }; -/* origin-side functions */ -static void *bw_orig_lock_all (ArgStruct *a); -static void *bw_orig_lock (ArgStruct *a); -static void *bw_orig_flush (ArgStruct *a); -static void *bw_orig_fence (ArgStruct *a); -static void *bw_orig_pscw (ArgStruct *a); - -static rmamt_fn_t rmamt_origin_fns[RMAMT_SYNC_MAX] = { - [RMAMT_LOCK_ALL] = bw_orig_lock_all, - [RMAMT_FENCE] = bw_orig_fence, - [RMAMT_LOCK] = bw_orig_lock, - [RMAMT_FLUSH] = bw_orig_flush, - [RMAMT_PSCW] = bw_orig_pscw, -}; - static ArgStruct args[MAX_THREADS]; static uint64_t thread_etimes[MAX_THREADS]; static char* tbufs[MAX_THREADS]; static char* obuf; static MPI_Win win[MAX_THREADS]; static int64_t times[MAX_THREADS][256]; +static uint64_t barrier_time; int main(int argc,char *argv[]) { + MPI_Comm test_comm, leader_comm; MPI_Group group = MPI_GROUP_NULL, comm_group; int nprocs, provided, rank, rc; pthread_t id[MAX_THREADS]; MPI_Request req; int64_t win_size; - int win_count; + int win_count, pairs; size_t max_size, min_size; - uint64_t stt, ttt = 0; FILE *output_file = NULL; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); if (provided != MPI_THREAD_MULTIPLE) { - printf("Thread multiple needed\n"); + printf("MPI Thread multiple support required to run this benchmark\n"); MPI_Abort(MPI_COMM_WORLD, 1); } - MPI_Comm_size(MPI_COMM_WORLD,&nprocs); - if (nprocs != 2) { - printf("Run with 2 processes\n"); + MPI_T_init_thread (MPI_THREAD_MULTIPLE, &provided); + + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + if (nprocs & 1) { + printf("Run with a multiple of two MPI processes\n"); MPI_Abort(MPI_COMM_WORLD, 1); } - MPI_Comm_rank(MPI_COMM_WORLD,&rank); - MPI_Comm_group (MPI_COMM_WORLD, &comm_group); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + pairs = nprocs >> 1; + + /* create test communicator */ + MPI_Comm_split (MPI_COMM_WORLD, rank & ~1, 0, &test_comm); + MPI_Comm_split (MPI_COMM_WORLD, (rank & 1) ? MPI_UNDEFINED : 0, 0, &leader_comm); + + MPI_Comm_group (test_comm, &comm_group); rmamt_parse_options ("rmamt_bw", argc, argv); if (rmamt_output_file && 0 == rank) { - output_file = fopen (rmamt_output_file, "w"); - if (NULL == output_file) { - fprintf (stderr, "Could not open %s for writing\n", rmamt_output_file); - MPI_Abort (MPI_COMM_WORLD, 1); - } - free (rmamt_output_file); + output_file = fopen (rmamt_output_file, "w"); + if (NULL == output_file) { + fprintf (stderr, "Could not open %s for writing\n", rmamt_output_file); + MPI_Abort (MPI_COMM_WORLD, 1); + } + free (rmamt_output_file); } if (rmamt_bind_threads) { - rc = rmamt_bind_init (); - if (-1 == rc) { - printf ("***** WARNING: Thread binding requested but not available *****\n"); - } + rc = rmamt_bind_init (); + if (-1 == rc) { + printf ("***** WARNING: Thread binding requested but not available *****\n"); + } } min_size = rmamt_min_size / rmamt_threads; if (min_size < 1) { - min_size = 1; + min_size = 1; } - max_size = rmamt_max_size / rmamt_threads; + max_size = rmamt_max_size / (rmamt_threads * pairs); win_count = rmamt_win_per_thread ? rmamt_threads : 1; - win_size = (0 != rank) ? rmamt_max_size / win_count : 0; + win_size = rmamt_max_size / win_count; obuf = rmamt_malloc (rmamt_max_size); if (!obuf) { @@ -144,7 +153,7 @@ int main(int argc,char *argv[]) /* create windows */ for (int i = 0 ; i < win_count ; ++i) { - MPI_CHECK(MPI_Win_allocate (win_size, 1, MPI_INFO_NULL, MPI_COMM_WORLD, + MPI_CHECK(MPI_Win_allocate (win_size, 1, MPI_INFO_NULL, test_comm, tbufs + i, win + i)); if (win_size) { memset (tbufs[i], '-', win_size); @@ -152,69 +161,65 @@ int main(int argc,char *argv[]) } if (RMAMT_PSCW == rmamt_sync) { - MPI_Group_incl (comm_group, 1, &(int){!rank}, &group); + MPI_Group_incl (comm_group, 1, &(int){!(rank & 1)}, &group); } + if (!(rank & 1)) { + if (!rank) { + printf ("##########################################\n"); + printf ("# RMA-MT Bandwidth\n"); + printf ("#\n"); + printf ("# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); + printf ("# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); + printf ("# Thread count: %u\n", (unsigned) rmamt_threads); + printf ("# Number of MPI process pairs: %d\n", pairs); + printf ("# Iterations: %u\n", (unsigned) rmamt_iterations); + printf ("# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); + printf ("# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); + printf ("##########################################\n"); + printf (" BpT(%i)\t BxT(%i)\tBandwidth(MiB/s)\tMessage_Rate(M/s)\n", rmamt_threads, rmamt_threads); + if (output_file) { + fprintf (output_file, "##########################################\n"); + fprintf (output_file, "# RMA-MT Bandwidth\n"); + fprintf (output_file, "#\n"); + fprintf (output_file, "# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); + fprintf (output_file, "# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); + fprintf (output_file, "# Thread count: %u\n", (unsigned) rmamt_threads); + fprintf (output_file, "# Number of MPI process pairs: %d\n", pairs); + fprintf (output_file, "# Iterations: %u\n", (unsigned) rmamt_iterations); + fprintf (output_file, "# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); + fprintf (output_file, "# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); + fprintf (output_file, "##########################################\n"); + fprintf (output_file, "BpT(%i),BxT(%i),Bandwidth(MiB/s),Message_Rate(M/s)\n", rmamt_threads, rmamt_threads); + } + } - if (!rank) { - printf ("##########################################\n"); - printf ("# RMA-MT Bandwidth\n"); - printf ("#\n"); - printf ("# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); - printf ("# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); - printf ("# Thread count: %u\n", (unsigned) rmamt_threads); - printf ("# Iterations: %u\n", (unsigned) rmamt_iterations); - printf ("# Ibarrier: %s, sleep interval: %uns\n", rmamt_use_ibarrier ? "yes" : "no", - rmamt_sleep_interval); - printf ("# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); - printf ("# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); - printf ("##########################################\n"); - printf (" BpT(%i)\t BxT(%i)\tBandwidth(MiB/s)\tMessage_Rate(M/s)\n", rmamt_threads, rmamt_threads); - if (output_file) { - fprintf (output_file, "##########################################\n"); - fprintf (output_file, "# RMA-MT Bandwidth\n"); - fprintf (output_file, "#\n"); - fprintf (output_file, "# Operation: %s\n", rmamt_operation_strings[rmamt_operation]); - fprintf (output_file, "# Sync: %s\n", rmamt_sync_strings[rmamt_sync]); - fprintf (output_file, "# Thread count: %u\n", (unsigned) rmamt_threads); - fprintf (output_file, "# Iterations: %u\n", (unsigned) rmamt_iterations); - fprintf (output_file, "# Ibarrier: %s, sleep interval: %uns\n", rmamt_use_ibarrier ? "yes" : "no", - rmamt_sleep_interval); - fprintf (output_file, "# Bind worker threads: %s\n", rmamt_bind_threads ? "yes" : "no"); - fprintf (output_file, "# Number of windows: %u\n", rmamt_win_per_thread ? rmamt_threads : 1); - fprintf (output_file, "##########################################\n"); - fprintf (output_file, "BpT(%i),BxT(%i),Bandwidth(MiB/s),Message_Rate(M/s)\n", rmamt_threads, rmamt_threads); - } - } + thread_barrier_init (rmamt_threads); - if (0 == rank) { - thread_barrier_init (rmamt_win_per_thread ? rmamt_threads : rmamt_threads + 1); + /* attempt to measure barrier overhead to improve the quality of the benchmark as the number + * of MPI process pairs increases */ + MPI_Barrier (leader_comm); + uint64_t start = time_getns (); + for (int i = 0 ; i < 100 ; ++i) { + MPI_Barrier (leader_comm); + } - stt = time_getns (); + barrier_time = (time_getns () - start) / 100; for (int i = 0 ; i < rmamt_threads ; ++i) { args[i].tid = i; args[i].max_size = max_size; - args[i].min_size = min_size; + args[i].min_size = min_size; args[i].win = rmamt_win_per_thread ? win[i] : win[0]; args[i].group = group; + args[i].all_sync = (RMAMT_ALL_FLUSH == rmamt_sync); + args[i].target = !(rank & 1); + args[i].comm = leader_comm; - //printf("args[%u].tid = %u\n", i, arggs[i].tid); - if (!rmamt_win_per_thread) { - pthread_create(id+i, NULL, (void *(*)(void *)) (RMAMT_GET == rmamt_operation ? runfunc_get : runfunc_put), args+i); - } else { - pthread_create(id+i, NULL, (void *(*)(void *)) rmamt_winperthread_fns[rmamt_operation][rmamt_sync], args+i); - } - } - - - /* wait for threads to be ready */ - thread_barrier (0); + args[i].do_init = (rmamt_win_per_thread || 0 == i); + args[i].do_sync = (args[i].all_sync || 0 == i); - if (ttt < find_max()-stt) ttt = find_max()-stt; - - if (!rmamt_win_per_thread) { - rmamt_origin_fns[rmamt_sync] (&(ArgStruct){.min_size = min_size, .max_size = max_size, .group = group, .win = win[0]}); + pthread_create(id+i, NULL, (void *(*)(void *)) rmamt_new_fns[rmamt_operation][rmamt_sync], args+i); } for (int i = 0 ; i < rmamt_threads ; ++i) { @@ -222,22 +227,25 @@ int main(int argc,char *argv[]) } for (uint32_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { - size_t sz = j * (rmamt_threads / win_count); - float max_time = 0.0; - float speed = 0.0; + double speed = 0.0, msg_rate = 0.0; - for (int i = 0 ; i < win_count ; ++i) { - speed += ((float) (sz * rmamt_iterations) * 953.67431640625) / (float) times[i][step]; - max_time = max_time > (float) times[i][step] ? max_time : (float) times[i][step]; + for (int i = 0 ; i < rmamt_threads ; ++i) { + speed += ((double) (j * rmamt_iterations) * 953.67431640625) / (double) times[i][step]; + msg_rate += ((double) (rmamt_iterations) * 1000000000.0) / (double) times[i][step]; } - if (output_file) { - fprintf (output_file, "%lu,%lu,%f,%f\n", (unsigned long) j, (unsigned long) j * rmamt_threads, speed, - (rmamt_threads * rmamt_iterations * 1000000000.0) / max_time); - } + MPI_Allreduce (MPI_IN_PLACE, &speed, 1, MPI_DOUBLE, MPI_SUM, leader_comm); + MPI_Allreduce (MPI_IN_PLACE, &msg_rate, 1, MPI_DOUBLE, MPI_SUM, leader_comm); + + if (0 == rank) { + if (output_file) { + fprintf (output_file, "%lu,%lu,%lf,%lf\n", (unsigned long) j, (unsigned long) j * rmamt_threads, + speed, msg_rate); + } - printf ("%9lu\t%9lu\t%13.3f\t\t%13.3f\n", (unsigned long) j, (unsigned long) j * rmamt_threads, speed, - (rmamt_threads * rmamt_iterations * 1000000000.0) / max_time); + printf ("%9lu\t%9lu\t%13.3lf\t\t%13.3lf\n", (unsigned long) j, (unsigned long) j * rmamt_threads, + speed, msg_rate); + } } if (rmamt_use_ibarrier) { @@ -250,7 +258,7 @@ int main(int argc,char *argv[]) if (RMAMT_PSCW == rmamt_sync || RMAMT_FENCE == rmamt_sync) { args[0].tid = 0; args[0].max_size = max_size; - args[0].min_size = min_size; + args[0].min_size = min_size; args[0].group = group; args[0].win = win[0]; args[0].group = group; @@ -311,7 +319,7 @@ int main(int argc,char *argv[]) MPI_Barrier (MPI_COMM_WORLD); if (output_file) { - fclose (output_file); + fclose (output_file); } for (int i = 0 ; i < win_count ; ++i) { @@ -319,24 +327,21 @@ int main(int argc,char *argv[]) } if (rmamt_bind_threads) { - rmamt_bind_finalize (); + rmamt_bind_finalize (); } rmamt_free (obuf, rmamt_max_size); + MPI_Comm_free (&test_comm); + if (MPI_COMM_NULL != leader_comm) { + MPI_Comm_free (&leader_comm); + } + + MPI_T_finalize (); MPI_Finalize(); return 0; } -uint64_t find_max(){ - uint64_t max = 0; - int tmp; - int sz = sizeof(thread_etimes)/sizeof(thread_etimes[0]); - for (tmp = 0; tmp < sz; tmp++) - if(max < thread_etimes[tmp]) max=thread_etimes[tmp]; - return (double) max; -} - static void *runfunc_pscw (ArgStruct* a) { MPI_Win twin = a->win; size_t max_size = a->max_size; @@ -370,146 +375,111 @@ static void *runfunc_fence (ArgStruct* a) { } #define DEFINE_ORIGIN_THREAD_FN(sync, type, fn, init_fn, start_sync, end_sync, fini_fn) \ - static void *bw_ ## type ## _ ## sync ## _winperthread (ArgStruct* a) { \ + static void *bw_ ## type ## _ ## sync ## _new (ArgStruct* a) { \ const int tid = (int) a->tid; \ uint64_t start, stop; \ size_t max_size = a->max_size; \ size_t min_size = a->min_size; \ + int barrier_cycle = 0; \ \ - if (rmamt_bind_threads) { \ - rmamt_bind (tid); \ - } \ - \ - thread_etimes[tid] = time_getns (); \ + if (rmamt_bind_threads) { \ + rmamt_bind (tid); \ + } \ \ - init_fn; \ - /* signal the main thread that we are ready */ \ - thread_barrier (0); \ + thread_etimes[tid] = time_getns (); \ \ - for (uint32_t j = min_size, cycle = 0 ; j <= max_size ; j <<= 1) { \ - start_sync; \ + if (a->do_init) { \ + init_fn; \ + } \ \ - for (int l = 0 ; l < RMAMT_WARMUP_ITERATIONS ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, 1, 0, j, MPI_BYTE, \ - a->win); \ + for (uint32_t j = min_size, cycle = 0 ; j <= max_size ; j <<= 1, ++cycle) { \ + if (a->do_sync) { \ + start_sync; \ } \ \ - end_sync; \ - \ - thread_barrier (cycle * 2 + 1); \ - \ - start = time_getns (); \ - start_sync; \ + thread_barrier (barrier_cycle++); \ \ - for (int l = 0 ; l < rmamt_iterations ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, 1, 0, j, MPI_BYTE, \ - a->win); \ + for (int l = 0 ; l < RMAMT_WARMUP_ITERATIONS ; l++) { \ + fn (obuf + tid * j, j, MPI_BYTE, a->target, 0, j, \ + MPI_BYTE, a->win); \ } \ \ - end_sync; \ - stop = time_getns (); \ - \ - times[tid][cycle] = stop - start; \ - thread_barrier (cycle * 2 + 2); \ - ++cycle; \ - } \ - \ - fini_fn; \ - \ - return 0; \ - } - -DEFINE_ORIGIN_THREAD_FN(lock_all, put, MPI_Put, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(lock_all, get, MPI_Get, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(lock_per_rank, put, MPI_Put, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, 1, 0, a->win), MPI_Win_unlock (1, a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(lock_per_rank, get, MPI_Get, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, 1, 0, a->win), MPI_Win_unlock (1, a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(flush, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, 1, 0, a->win), (void) 0, MPI_Win_flush (1, a->win), MPI_Win_unlock (1, a->win)) -DEFINE_ORIGIN_THREAD_FN(flush, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, 1, 0, a->win), (void) 0, MPI_Win_flush (1, a->win), MPI_Win_unlock (1, a->win)) -DEFINE_ORIGIN_THREAD_FN(pscw, put, MPI_Put, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(pscw, get, MPI_Get, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(fence, put, MPI_Put, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), (void) 0) -DEFINE_ORIGIN_THREAD_FN(fence, get, MPI_Get, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), (void) 0) - -/* origin-side loops */ -#define DEFINE_ORIGIN_FN(sync, init_fn, start_sync, end_sync, fini_fn) \ - static void *bw_orig_ ## sync (ArgStruct *a) { \ - size_t max_size = a->max_size; \ - size_t min_size = a->min_size; \ - \ - init_fn; \ + if (!a->all_sync) { \ + thread_barrier (barrier_cycle++); \ + } \ \ - for (uint64_t j = min_size, cycle = 0, barrier_cycle = 1 ; j <= max_size ; j <<= 1, cycle++) { \ - uint64_t stime, etime, ttime; \ + if (a->do_sync) { \ + end_sync; \ + } \ \ - /* warm up */ \ - MPI_CHECK( start_sync ); \ + if (0 == tid) { \ + MPI_Barrier (a->comm); \ + } \ \ - thread_barrier (barrier_cycle++); \ thread_barrier (barrier_cycle++); \ \ - MPI_CHECK( end_sync); \ + start = time_getns (); \ \ - /* timing */ \ - stime = time_getns(); \ - MPI_CHECK( start_sync ); \ + if (a->do_sync) { \ + start_sync; \ + } \ \ thread_barrier (barrier_cycle++); \ - thread_barrier (barrier_cycle++); \ - \ - MPI_CHECK( end_sync ); \ - etime = time_getns (); \ - ttime = etime - stime; \ \ - times[0][cycle] = ttime; \ - } \ - \ - fini_fn; \ + for (int l = 0 ; l < rmamt_iterations ; l++) { \ + fn (obuf + tid * j, j, MPI_BYTE, a->target, 0, j, \ + MPI_BYTE, a->win); \ + } \ \ - return NULL; \ - } - -DEFINE_ORIGIN_FN(lock_all, (void) 0, MPI_Win_lock_all(0, a->win), MPI_Win_unlock_all (a->win), (void) 0) -DEFINE_ORIGIN_FN(lock, (void) 0, MPI_Win_lock(MPI_LOCK_SHARED, 1, 0, a->win), MPI_Win_unlock (1, a->win), (void) 0) -DEFINE_ORIGIN_FN(flush, MPI_Win_lock (MPI_LOCK_SHARED, 1, 0, a->win), MPI_SUCCESS, MPI_Win_flush (1, a->win), MPI_Win_unlock (1, a->win)) -DEFINE_ORIGIN_FN(fence, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), MPI_SUCCESS, MPI_Win_fence (0, a->win), (void) 0) -DEFINE_ORIGIN_FN(pscw, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0) - -#define DEFINE_ORIGIN_THREAD_RUNFN(fn, type) \ - static void *runfunc_ ## type (ArgStruct* a) { \ - int tid = (int) a->tid; \ - size_t max_size = a->max_size; \ - size_t min_size = a->min_size; \ - \ - if (rmamt_bind_threads) { \ - rmamt_bind (tid); \ - } \ + thread_barrier (barrier_cycle++); \ \ - thread_etimes[tid] = time_getns (); \ + if (a->do_sync) { \ + end_sync; \ + } \ \ - /* signal the main thread that we are ready */ \ - thread_barrier (0); \ + if (0 == tid) { \ + MPI_Barrier (a->comm); \ + } \ \ - for (uint32_t j = min_size, cycle = 1 ; j <= max_size ; j <<= 1) { \ - thread_barrier (cycle++); \ + thread_barrier (barrier_cycle++); \ \ - for (int l = 0 ; l < RMAMT_WARMUP_ITERATIONS ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, 1, tid * j, j, \ - MPI_BYTE, win[0]); \ - } \ + stop = time_getns (); \ \ - thread_barrier (cycle++); \ - thread_barrier (cycle++); \ + times[tid][cycle] = stop - start - barrier_time; \ \ - for (int l = 0 ; l < rmamt_iterations ; l++) { \ - fn (obuf + tid * j, j, MPI_BYTE, 1, tid * j, j, \ - MPI_BYTE, win[0]); \ - } \ + thread_barrier (barrier_cycle++); \ + } \ \ - thread_barrier (cycle++); \ + if (a->do_init) { \ + fini_fn; \ } \ \ return 0; \ } -DEFINE_ORIGIN_THREAD_RUNFN(MPI_Get, get) -DEFINE_ORIGIN_THREAD_RUNFN(MPI_Put, put) +DEFINE_ORIGIN_THREAD_FN(lock_all, put, MPI_Put, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(lock_all, get, MPI_Get, (void) 0, MPI_Win_lock_all (0, a->win), MPI_Win_unlock_all (a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(lock_per_rank, put, MPI_Put, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), MPI_Win_unlock (a->target, a->win), + (void) 0) +DEFINE_ORIGIN_THREAD_FN(lock_per_rank, get, MPI_Get, (void) 0, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), MPI_Win_unlock (a->target, a->win), + (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush (a->target, a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(flush, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush (a->target, a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(pscw, put, MPI_Put, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(pscw, get, MPI_Get, (void) 0, MPI_Win_start (a->group, 0, a->win), MPI_Win_complete (a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(fence, put, MPI_Put, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(fence, get, MPI_Get, MPI_Win_fence (MPI_MODE_NOPRECEDE, a->win), (void) 0, MPI_Win_fence (0, a->win), (void) 0) +DEFINE_ORIGIN_THREAD_FN(flush_all, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_all (a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(flush_all, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_all (a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(flush_local, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local (a->target, a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(flush_local, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local (a->target, a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(flush_local_all, put, MPI_Put, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local_all (a->win), + MPI_Win_unlock (a->target, a->win)) +DEFINE_ORIGIN_THREAD_FN(flush_local_all, get, MPI_Get, MPI_Win_lock (MPI_LOCK_SHARED, a->target, 0, a->win), (void) 0, MPI_Win_flush_local_all (a->win), + MPI_Win_unlock (a->target, a->win)) diff --git a/src/rmamt_common.c b/src/rmamt_common.c index 6076101..a25ce9e 100644 --- a/src/rmamt_common.c +++ b/src/rmamt_common.c @@ -59,16 +59,29 @@ void rmamt_free (void *ptr, size_t size) { hwloc_topology_t topology; +int shared_comm_size, shared_comm_rank; + int rmamt_bind_init (void) { + MPI_Comm shared_comm; + hwloc_topology_init (&topology); hwloc_topology_load (topology); + /* deterimine how many local ranks there are */ + MPI_Comm_split_type (MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shared_comm); + MPI_Comm_size (shared_comm, &shared_comm_size); + MPI_Comm_rank (shared_comm, &shared_comm_rank); + + MPI_Comm_free (&shared_comm); + return 0; } void rmamt_bind (int thread_id) { int ncores = hwloc_get_nbobjs_by_type (topology, HWLOC_OBJ_CORE); - int obj_id = thread_id % ncores; + int cores_per_rank = ncores < shared_comm_size ? 1 : ncores / shared_comm_size; + int core_base = (cores_per_rank * shared_comm_rank) % ncores; + int obj_id = core_base + thread_id % cores_per_rank; hwloc_obj_t obj = hwloc_get_obj_by_type (topology, HWLOC_OBJ_CORE, obj_id); if (NULL != obj) { diff --git a/src/rmamt_common.h b/src/rmamt_common.h index a2bfc87..66cdb24 100644 --- a/src/rmamt_common.h +++ b/src/rmamt_common.h @@ -50,17 +50,22 @@ typedef volatile long atomic_long; typedef struct arg_struct { MPI_Win win; + MPI_Comm comm; int tid; size_t max_size; size_t min_size; MPI_Group group; int target; + bool do_sync; + bool do_init; + bool all_sync; } ArgStruct; typedef void *(*rmamt_fn_t) (ArgStruct *); static atomic_long current_value; static long barrier_value; +static volatile _Atomic long current_cycle = 0; static void thread_barrier_init (int value) { @@ -70,21 +75,22 @@ static void thread_barrier_init (int value) static void thread_barrier (int cycle) { - static atomic_long current_cycle = ATOMIC_VAR_INIT(0); long tmp; /* wait for the expected cycle */ - while (cycle > atomic_load (¤t_cycle)); + while (cycle > current_cycle); /* decrement the counter */ tmp = atomic_fetch_add (¤t_value, -1); if (2 == tmp) { /* this thread was the last one. reset the counter and increment the cycle */ - atomic_init (¤t_value, barrier_value); - atomic_fetch_add (¤t_cycle, 1); + atomic_init (¤t_value, barrier_value); + + /* only one thread will modify the cycle at any time so atomics are not needed*/ + ++current_cycle; } else { /* wait for cycle increment */ - while (cycle == atomic_load (¤t_cycle)); + while (cycle == current_cycle); } } diff --git a/src/rmamt_options.c b/src/rmamt_options.c index c917f79..9c47f39 100644 --- a/src/rmamt_options.c +++ b/src/rmamt_options.c @@ -26,6 +26,11 @@ char *rmamt_sync_strings[] = { [RMAMT_LOCK] = "lock", [RMAMT_FLUSH] = "flush", [RMAMT_PSCW] = "pscw", + [RMAMT_ALL_FLUSH] = "all_flush", + [RMAMT_FLUSH_ALL] = "flush_all", + [RMAMT_FLUSH_LOCAL] = "flush_local", + [RMAMT_FLUSH_LOCAL_ALL] = "flush_local_all", + [RMAMT_SYNC_MAX] = NULL, }; bool rmamt_win_per_thread = false; @@ -48,10 +53,11 @@ static void print_usage (const char *name, bool failure) if (0 == rank) { printf ("RMA-MT Multi-threaded MPI-3 One-sided benchmarks\n\n" - "Usage: %s [-wn] [-t ] [-s ] [-i ] <-o put|get> <-s lock_all|fence|lock|flush|pscw\n\n" + "Usage: %s [-wn] [-t ] [-s ] [-i ] <-o put|get> <-s lock_all|fence|lock|flush|pscw|all_flush\n\n" "Options:\n\n" " -o,--operation=value Operation to benchmark: put, get\n" - " -s,--sync=value Synchronization function to use: lock_all, fence, lock, flush, pscw\n" + " -s,--sync=value Synchronization function to use: lock_all, fence, lock, flush,\n" + " pscw, all_flush, flush_all, flush_local, flush_local_all\n" " -m, --max-size=value Maximum aggregate transfer size\n" " -l, --min-size=value Minimum aggregate transfer size\n" " -w,--win-per-thread Use a different MPI window in each thread\n" @@ -112,17 +118,16 @@ int rmamt_parse_options (const char *name, int argc, char *argv[]) } break; case 's': - if (0 == strcasecmp (optarg, "lock_all")) { - rmamt_sync = RMAMT_LOCK_ALL; - } else if (0 == strcasecmp (optarg, "fence")) { - rmamt_sync = RMAMT_FENCE; - } else if (0 == strcasecmp (optarg, "lock")) { - rmamt_sync = RMAMT_LOCK; - } else if (0 == strcasecmp (optarg, "flush")) { - rmamt_sync = RMAMT_FLUSH; - } else if (0 == strcasecmp (optarg, "pscw")) { - rmamt_sync = RMAMT_PSCW; - } else { + rmamt_sync = RMAMT_SYNC_MAX; + + for (int i = 0 ; rmamt_sync_strings[i] ; ++i) { + if (0 == strcasecmp (optarg, rmamt_sync_strings[i])) { + rmamt_sync = i; + break; + } + } + + if (RMAMT_SYNC_MAX == rmamt_sync) { print_usage (name, true); } diff --git a/src/rmamt_options.h b/src/rmamt_options.h index 2c23b5f..821f443 100644 --- a/src/rmamt_options.h +++ b/src/rmamt_options.h @@ -5,7 +5,7 @@ #include #define MAX_THREADS 512 -#define RMAMT_MAX_SIZE 4194304 +#define RMAMT_MAX_SIZE (1 << 22) enum { RMAMT_PUT, @@ -21,6 +21,10 @@ enum { RMAMT_LOCK, RMAMT_FLUSH, RMAMT_PSCW, + RMAMT_ALL_FLUSH, + RMAMT_FLUSH_ALL, + RMAMT_FLUSH_LOCAL, + RMAMT_FLUSH_LOCAL_ALL, RMAMT_SYNC_MAX, }; From 56d39a7618955e04dcac1de3885ac7b05d7f0c97 Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Thu, 4 Oct 2018 09:38:31 -0600 Subject: [PATCH 2/2] fix overflow issue Signed-off-by: Nathan Hjelm --- src/rmamt_bibw.c | 2 +- src/rmamt_bw.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rmamt_bibw.c b/src/rmamt_bibw.c index 1dbd1c7..da87ba9 100644 --- a/src/rmamt_bibw.c +++ b/src/rmamt_bibw.c @@ -222,7 +222,7 @@ int main(int argc,char *argv[]) pthread_join(id[i], NULL); } - for (uint32_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { + for (uint64_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { double speed = 0.0, msg_rate = 0.0; for (int i = 0 ; i < rmamt_threads ; ++i) { diff --git a/src/rmamt_bw.c b/src/rmamt_bw.c index 064ba97..0e644bb 100644 --- a/src/rmamt_bw.c +++ b/src/rmamt_bw.c @@ -226,7 +226,7 @@ int main(int argc,char *argv[]) pthread_join(id[i], NULL); } - for (uint32_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { + for (uint64_t j = min_size, step = 0 ; j <= max_size ; j <<= 1, ++step) { double speed = 0.0, msg_rate = 0.0; for (int i = 0 ; i < rmamt_threads ; ++i) {