From 1f79f7406d5e77ca213d0e76ec040025699c1f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20K=2E=20Guti=C3=A9rrez?= Date: Wed, 10 Jul 2024 11:53:57 -0600 Subject: [PATCH] Re-enable testing of subgroup operations in OpenMP regions. (#206) Signed-off-by: Samuel K. Gutierrez --- src/qvi-thread.cc | 339 +++++++++++++++---------------------------- src/qvi-thread.h | 21 ++- tests/test-threads.c | 31 +++- 3 files changed, 157 insertions(+), 234 deletions(-) diff --git a/src/qvi-thread.cc b/src/qvi-thread.cc index c45cc58..ab40ee0 100644 --- a/src/qvi-thread.cc +++ b/src/qvi-thread.cc @@ -44,15 +44,16 @@ qvi_thread_omp_get_thread_num(void) #endif } +/** Used to indicate that the rank shall be calculated automatically. */ +static constexpr int automatic_rank = -1; + /** * We need to have one structure for fields shared by all threads included in * another with fields specific to each thread. */ struct qvi_thread_group_shared_s { /** Atomic reference count used for resource freeing. */ - std::atomic in_use{0}; - /** Group size. */ - int size = 0; + std::atomic_int refc; /** Barrier object (used in scope). */ pthread_barrier_t barrier; /** Constructor. */ @@ -60,8 +61,7 @@ struct qvi_thread_group_shared_s { /** Constructor. */ qvi_thread_group_shared_s( int group_size - ) : in_use(group_size) - , size(group_size) + ) : refc(group_size) { const int rc = pthread_barrier_init(&barrier, NULL, group_size); if (rc != 0) throw qvi_runtime_error(); @@ -71,7 +71,7 @@ struct qvi_thread_group_shared_s { destruct( qvi_thread_group_shared_s *sgroup ) { - if (--sgroup->in_use == 0) { + if (!sgroup->refc--) { pthread_barrier_destroy(&sgroup->barrier); delete sgroup; } @@ -83,13 +83,16 @@ struct qvi_thread_group_s { * Shared data between threads in the group: * ALL threads point to the same region. */ - qvi_thread_group_shared_s *sdata = nullptr; + qvi_thread_group_shared_s *shdata = nullptr; + /** Group size. */ + int size = 0; /** ID (rank) in group: this ID is unique to each thread. */ - int id = 0; + int rank = 0; /** Performs object construction, called by the real constructors. */ void construct( - int group_size + int group_size, + int group_rank ) { qvi_thread_group_shared_s *isdata = nullptr; #pragma omp single copyprivate(isdata) @@ -97,24 +100,29 @@ struct qvi_thread_group_s { const int rc = qvi_new(&isdata, group_size); if (rc != QV_SUCCESS) throw qvi_runtime_error(); } - sdata = isdata; - id = qvi_thread_omp_get_thread_num(); + shdata = isdata; + size = group_size; + rank = group_rank; + if (group_rank == automatic_rank) { + rank = qvi_thread_omp_get_thread_num(); + } } /** Constructor. */ qvi_thread_group_s(void) { - construct(qvi_thread_omp_get_num_threads()); + construct(qvi_thread_omp_get_num_threads(), automatic_rank); } /** Constructor. */ qvi_thread_group_s( - int group_size + int group_size, + int group_rank ) { - construct(group_size); + construct(group_size, group_rank); } /** Destructor. */ ~qvi_thread_group_s(void) { - qvi_thread_group_shared_s::destruct(sdata); + qvi_thread_group_shared_s::destruct(shdata); } }; @@ -137,7 +145,7 @@ qvi_thread_group_create_size( qvi_thread_group_t **group, int size ) { - return qvi_new(group, size); + return qvi_new(group, size, automatic_rank); } int @@ -160,21 +168,21 @@ int qvi_thread_group_id( const qvi_thread_group_t *group ) { - return group->id; + return group->rank; } int qvi_thread_group_size( const qvi_thread_group_t *group ) { - return group->sdata->size; + return group->size; } int qvi_thread_group_barrier( qvi_thread_group_t *group ) { - const int rc = pthread_barrier_wait(&(group->sdata->barrier)); + const int rc = pthread_barrier_wait(&(group->shdata->barrier)); if ((rc != 0) && (rc != PTHREAD_BARRIER_SERIAL_THREAD)) { return QV_ERR_INTERNAL; } @@ -185,68 +193,36 @@ qvi_thread_group_barrier( /** * Internal data structure for rank and size computing. */ -typedef struct color_key_id_s { - int color; - int key; - int rank; -} color_key_id_t; - -static void -swap_elts( - color_key_id_t *el1, - color_key_id_t *el2 -) { - const color_key_id_t tmp = *el1; - *el1 = *el2; - *el2 = tmp; -} +struct color_key_rank_s { + int color = 0; + int key = 0; + int rank = 0; +}; -static void -bubble_sort_by_color( - color_key_id_t tab[], - int size_tab +static bool +ckr_compare_by_color( + const color_key_rank_s &a, + const color_key_rank_s &b ) { - if (size_tab > 1) { - for(int i = 0 ; i < size_tab - 1 ; i++) { - if (tab[i].color > tab[i+1].color) { - swap_elts(&tab[i],&tab[i+1]); - } - } - bubble_sort_by_color(tab, size_tab - 1); - } + return a.color < b.color; } -static void -bubble_sort_by_key( - color_key_id_t tab[], - int size_tab +static bool +ckr_compare_by_key( + const color_key_rank_s &a, + const color_key_rank_s &b ) { - if (size_tab > 1) { - for(int i = 0 ; i < size_tab - 1 ; i++) { - if ((tab[i].color == tab[i+1].color) && - (tab[i].key > tab[i+1].key)) { - swap_elts(&tab[i],&tab[i+1]); - } - } - bubble_sort_by_key(tab,size_tab-1); - } + // If colors are the same, sort by key. + return a.color == b.color && a.key < b.key; } -static void -bubble_sort_by_rank( - color_key_id_t tab[], - int size_tab +static bool +ckr_compare_by_rank( + const color_key_rank_s &a, + const color_key_rank_s &b ) { - if (size_tab > 1) { - for(int i = 0 ; i < size_tab - 1 ; i++) { - if ((tab[i].color == tab[i+1].color) && - (tab[i].key == tab[i+1].key) && - (tab[i].rank > tab[i+1].rank)) { - swap_elts(&tab[i],&tab[i+1]); - } - } - bubble_sort_by_key(tab, size_tab - 1); - } + // If colors and keys are the same, sort by rank. + return a.color == b.color && a.key == b.key && a.rank < b.rank; } /** @@ -255,7 +231,7 @@ bubble_sort_by_rank( /* static void display_tab( pid_t tid, - color_key_id_t lptr[], + color_key_id_s lptr[], int size ) { fprintf(stdout,"=============================================================================\n"); @@ -283,84 +259,64 @@ qvi_get_subgroup_info( const qvi_thread_group_t *parent, int color, int key, - int *new_id, int *sgrp_size, int *sgrp_rank, int *num_of_sgrp ) { - int color_val= 0; - int num_colors = 0; - int idx = 0; - int idx2 = 0; - int size = parent->sdata->size; - int id = parent->id; - color_key_id_t *lptr = NULL; - -#pragma omp single copyprivate(lptr) - lptr = new color_key_id_t[size](); - - /* Gather colors and keys from ALL threads */ - lptr[id].color = color; - lptr[id].key = key; - lptr[id].rank = id; -#pragma omp barrier // to be sure that all threads have contributed - - /* Sort the color/key/rank array, according to color first, */ - /* then to key, but in the same color realm */ - /* if color and key are identical, sort by rank in old group */ -#pragma omp single /* since this data is shared, only one thread has to sort it */ + const int size = parent->size; + const int rank = parent->rank; + color_key_rank_s *ckrs = nullptr; + + #pragma omp single copyprivate(ckrs) + ckrs = new color_key_rank_s[size]; + // Gather colors and keys from ALL threads. + ckrs[rank].color = color; + ckrs[rank].key = key; + ckrs[rank].rank = rank; + // Barrier to be sure that all threads have contributed their values. + #pragma omp barrier + // Since these data are shared, only one thread has to sort them. The same + // goes for calculating the number of distinct colors provided. + int ncolors = 0; + #pragma omp single copyprivate(ncolors) { - bubble_sort_by_color(lptr, size); - bubble_sort_by_key(lptr, size); - bubble_sort_by_rank(lptr, size); - } - - /* compute number of subgroups */ - num_colors = 1; - color_val = lptr[0].color; - for(int idx = 0 ; idx < size ; idx++) { - if(lptr[idx].color != color_val) { - num_colors++; - color_val = lptr[idx].color; - } + // Sort the color/key/rank array. First according to color, then by key, + // but in the same color realm. If color and key are identical, sort by + // the rank from given group. + std::sort(ckrs, ckrs + size, ckr_compare_by_color); + std::sort(ckrs, ckrs + size, ckr_compare_by_key); + std::sort(ckrs, ckrs + size, ckr_compare_by_rank); + // Calculate the number of distinct colors provided. + std::set color_set; + for (int i = 0; i < size; ++i) { + color_set.insert(ckrs[i].color); + } + ncolors = color_set.size(); } - *num_of_sgrp = num_colors; - - /* compute number and ranks of subgroups */ - num_colors = 0; - color_val = lptr[0].color; - for(int idx = 0 ; idx < size ; idx++) { - if(lptr[idx].color != color_val) { - num_colors++; - color_val = lptr[idx].color; - } - if(lptr[idx].color == color) { - *sgrp_rank = num_colors; - break; - } + // The number of distinct colors is the number of subgroups. + *num_of_sgrp = ncolors; + // Compute my sub-group size and sub-group rank. + int group_rank = 0; + int group_size = 0; + for (int i = 0; i < size; ++i) { + if (color != ckrs[i].color) continue; + // Else we found the start of my color group. + const int current_color = ckrs[i].color; + for (int j = i; j < size && current_color == ckrs[j].color; ++j) { + if (ckrs[j].rank == rank) { + *sgrp_rank = group_rank; + } + group_size++; + group_rank++; + } + *sgrp_size = group_size; + break; } - - /* Compute subgroup size and thread id in subgroup */ - /* 1- Move to the right part in the array */ - idx = 0; - while((idx < size) && (lptr[idx].color != color)) - idx++; - - /* 2- Compute the subgroup size */ - idx2 = idx; - while((idx2 < size) && (lptr[idx2].color == color)) - idx2++; - *sgrp_size = (idx2 - idx); - - /* 3- Compute id in the subgroup */ - idx2 = idx; - while((idx2 < size) && (lptr[idx2].rank != id)) - idx2++; - *new_id = (idx2 - idx); - -#pragma omp barrier // to prevent the quickest thread to remove data before all others have used it -#pragma omp single - delete [] lptr; + // Barrier to sync for array deletion. + #pragma omp barrier + // Only one task deletes. + #pragma omp single + delete[] ckrs; return QV_SUCCESS; } @@ -379,85 +335,30 @@ qvi_thread_group_create_from_split( } #else { - int rc = QV_SUCCESS; - int newid = -1; /* thread new id in subgroup */ int sgrp_size = -1; /* size of this thread subgroup */ int sgrp_rank = -1; /* subgroup rank */ int num_sgrp = -1; /* number of subgroups */ - qvi_thread_group_t *new_group = nullptr; - - /* this used to internally allocate things */ - /* in a copyprivate fashion */ - //qvi_thread_group_t **new_group_ptr_array = nullptr; - //qvi_thread_group_t *tmp_group = nullptr; - qvi_thread_group_shared_s **sdata_ptr_array = nullptr; - qvi_thread_group_shared_s *tmp_sdata = nullptr; - omp_lock_t *lock_ptr = nullptr; // use pthread_mutex instead ??*/ - - if (!parent) { - *child = nullptr; - return QV_ERR_SPLIT; - } - - rc = qvi_get_subgroup_info( - parent, - color, - key, - &newid, - &sgrp_size, - &sgrp_rank, - &num_sgrp ); + int rc = qvi_get_subgroup_info( + parent, + color, + key, + &sgrp_size, + &sgrp_rank, + &num_sgrp + ); if (rc != QV_SUCCESS) { - *child = nullptr; - return QV_ERR_SPLIT; + *child = nullptr; + return QV_ERR_SPLIT; } - /* create and init each subgroup : qvi_thread_group_new canNOT be used */ - /* since it already contains a "single" clause. */ - /* Calling the function directly will deadlock. */ - /* The same goes for qvi_thread_group_create. */ - /* Do everything manually */ - - /* Equivalent to (sub)group "new" (cf qvi_thread_group_new) */ - new_group = new qvi_thread_group_t(); - - /* sdata pointer allocation */ -#pragma omp single copyprivate(sdata_ptr_array) - sdata_ptr_array = new qvi_thread_group_shared_s *[num_sgrp](); - - for(int i = 0 ; i < num_sgrp ; i++) { -#pragma omp single copyprivate(tmp_sdata) - //tmp_sdata = new qvi_thread_group_shared_s(); - /* Since it's shared, only the root(s) need(s) to set this */ - if ((newid == 0) && (i == sgrp_rank)) { - sdata_ptr_array[i] = tmp_sdata; - } - } - -#pragma omp single copyprivate(lock_ptr) - { - lock_ptr = new omp_lock_t(); - omp_init_lock(lock_ptr); - } - - /* Only the root(s) need(s) to initialize this */ - if (newid == 0) { - sdata_ptr_array[sgrp_rank]->size = sgrp_size; - //sdata_ptr_array[sgrp_rank]->in_use = sgrp_size; - pthread_barrier_init(&(sdata_ptr_array[sgrp_rank]->barrier),NULL,sgrp_size); + qvi_thread_group_t *ichild = nullptr; + rc = qvi_new(&ichild, sgrp_size, sgrp_rank); + if (rc != QV_SUCCESS) { + *child = nullptr; + return rc; } -#pragma omp barrier - /* All threads set their own (sub)group sdata pointer */ - new_group->sdata = sdata_ptr_array[sgrp_rank]; - new_group->id = newid; - *child = new_group; -#pragma omp barrier // to prevent the quickest thread to remove data before all others have used it -#pragma omp single - { - delete lock_ptr; - delete [] sdata_ptr_array; - } + *child = ichild; return rc; } #endif @@ -472,8 +373,8 @@ qvi_thread_group_gather_bbuffs( ) { QVI_UNUSED(root); const int send_count = (int)qvi_bbuff_size(txbuff); - const int group_size = group->sdata->size; - const int group_id = group->id; + const int group_size = group->size; + const int group_id = group->rank; int rc = QV_SUCCESS; qvi_bbuff_t **bbuffs = nullptr; // Zero initialize array of pointers to nullptr. @@ -516,7 +417,7 @@ qvi_thread_group_scatter_bbuffs( qvi_bbuff_t **rxbuff ) { QVI_UNUSED(root); - const int group_id = group->id; + const int group_id = group->rank; qvi_bbuff_t ***tmp = nullptr; /* GM: Oh man, that is UGLY */ diff --git a/src/qvi-thread.h b/src/qvi-thread.h index bc6d8c1..0a586f4 100644 --- a/src/qvi-thread.h +++ b/src/qvi-thread.h @@ -22,10 +22,10 @@ struct qvi_thread_group_s; typedef struct qvi_thread_group_s qvi_thread_group_t; +#if 0 /** * Mapping policies types. */ -/* typedef enum qv_policy_s { QV_POLICY_PACKED = 1, QV_POLICY_COMPACT = 1, @@ -37,23 +37,17 @@ typedef enum qv_policy_s { QV_POLICY_SCATTER = 4, QV_POLICY_CHOOSE = 5, } qv_policy_t; -*/ + /** * Layout for fine-grain binding * with default behaviour */ -/* typedef struct qv_layout_s { qv_policy_t policy; qv_hw_obj_type_t obj_type; int stride; } qv_layout_t; -*/ - -int -qvi_thread_group_size( - const qvi_thread_group_t *group -); +#endif int qvi_thread_group_new( @@ -66,8 +60,8 @@ qvi_thread_group_free( ); int -qvi_thread_group_barrier( - qvi_thread_group_t *group +qvi_thread_group_size( + const qvi_thread_group_t *group ); int @@ -75,6 +69,11 @@ qvi_thread_group_id( const qvi_thread_group_t *group ); +int +qvi_thread_group_barrier( + qvi_thread_group_t *group +); + int qvi_thread_group_create( qvi_thread_group_t **group diff --git a/tests/test-threads.c b/tests/test-threads.c index 322e9c2..6500e67 100644 --- a/tests/test-threads.c +++ b/tests/test-threads.c @@ -11,15 +11,14 @@ int main(void) { - char *ers = NULL; - int rc = QV_SUCCESS; - #pragma omp parallel #pragma omp single printf("# Starting OpenMP Test (nthreads=%d)\n", omp_get_num_threads()); -#pragma omp parallel private(ers, rc) +#pragma omp parallel { + char *ers = NULL; + int rc = QV_SUCCESS; qv_scope_t *base_scope = NULL; rc = qv_thread_scope_get( QV_SCOPE_PROCESS, @@ -30,8 +29,32 @@ main(void) qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); } + int taskid = 0; + rc = qv_scope_taskid(base_scope, &taskid); + if (rc != QV_SUCCESS) { + ers = "qv_scope_taskid() failed"; + qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); + } + qvi_test_scope_report(base_scope, "base_scope"); + qv_scope_t *sub_scope; + rc = qv_scope_split( + base_scope, 2, taskid, &sub_scope + ); + if (rc != QV_SUCCESS) { + ers = "qv_scope_split_at() failed"; + qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); + } + + qvi_test_scope_report(sub_scope, "sub_scope"); + + rc = qv_scope_free(sub_scope); + if (rc != QV_SUCCESS) { + ers = "qv_scope_free() failed"; + qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); + } + rc = qv_scope_free(base_scope); if (rc != QV_SUCCESS) { ers = "qv_scope_free() failed";