From f5d6c0deb3df983b00d81527e8a08220b8f7fb07 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Sun, 3 Nov 2024 15:20:58 +0000 Subject: [PATCH] added margo_set_progress_when_needed (#296) --- include/margo.h | 14 +++++++ src/margo-core.c | 36 +++++++++++++++--- src/margo-instance.h | 63 +++++++++++++++++++++++++++++--- tests/unit-tests/margo-forward.c | 17 ++++++++- 4 files changed, 119 insertions(+), 11 deletions(-) diff --git a/include/margo.h b/include/margo.h index 7356702..fe1f684 100644 --- a/include/margo.h +++ b/include/margo.h @@ -1888,6 +1888,20 @@ int margo_set_progress_timeout_ub_msec(margo_instance_id mid, unsigned timeout); int margo_get_progress_timeout_ub_msec(margo_instance_id mid, unsigned* timeout); +/** + * @brief Set the progress mode. By default, the progress ULT will run + * whenever it is scheduled, even if there aren't any on-going operations. + * For clients, it may be useful to prevent it from being scheduled if + * there is no reason for making progress (i.e. there is no on-going RPC). + * Setting when_needed to true does just that. + * + * @param mid Margo instance. + * @param when_needed Progress only when needed. + * + * @return 0 in case of success, -1 otherwise. + */ +int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed); + /** * @brief Sets configurable parameters/hints. * diff --git a/src/margo-core.c b/src/margo-core.c index 7ae950b..6218f3e 100644 --- a/src/margo-core.c +++ b/src/margo-core.c @@ -264,11 +264,13 @@ void margo_finalize(margo_instance_id mid) /* tell progress thread to wrap things up */ mid->hg_progress_shutdown_flag = 1; + PROGRESS_NEEDED_INCR(mid); /* wait for it to shutdown cleanly */ MARGO_TRACE(mid, "Waiting for progress thread to complete"); ABT_thread_join(mid->hg_progress_tid); ABT_thread_free(&mid->hg_progress_tid); + PROGRESS_NEEDED_DECR(mid); mid->refcount--; ABT_mutex_lock(mid->finalize_mutex); @@ -701,11 +703,13 @@ margo_addr_lookup(margo_instance_id mid, const char* name, hg_addr_t* addr) hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, (void*)eventual, name, HG_OP_ID_IGNORE); + PROGRESS_NEEDED_INCR(mid); if (hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&evt); *addr = evt->addr; hret = evt->hret; } + PROGRESS_NEEDED_DECR(mid); ABT_eventual_free(&eventual); #endif @@ -902,6 +906,8 @@ static hg_return_t margo_cb(const struct hg_cb_info* info) // handed to the user, hence it has to be freed here. if (req->kind == MARGO_REQ_CALLBACK) free(req); + PROGRESS_NEEDED_DECR(mid); + return HG_SUCCESS; } @@ -1115,6 +1121,7 @@ static hg_return_t margo_provider_iforward_internal( req->timer = NULL; // LCOV_EXCL_END } + PROGRESS_NEEDED_INCR(mid); finish: @@ -1319,6 +1326,8 @@ margo_irespond_internal(hg_handle_t handle, hret = HG_Respond(handle, margo_cb, (void*)req, (void*)&respond_args); + if (hret == HG_SUCCESS) { PROGRESS_NEEDED_INCR(mid); } + finish: /* monitoring */ @@ -1677,6 +1686,7 @@ static hg_return_t margo_bulk_itransfer_internal( hret = HG_Bulk_transfer(mid->hg.hg_context, margo_cb, (void*)req, op, origin_addr, origin_handle, origin_offset, local_handle, local_offset, size, HG_OP_ID_IGNORE); + if (hret == HG_SUCCESS) { PROGRESS_NEEDED_INCR(mid); } finish: @@ -1816,6 +1826,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms) ABT_POOL_NULL, &sleep_timer); margo_timer_start(sleep_timer, timeout_ms); + PROGRESS_NEEDED_INCR(mid); + /* yield thread for specified timeout */ ABT_mutex_lock(sleep_cb_dat.mutex); while (sleep_cb_dat.is_asleep) @@ -1828,6 +1840,8 @@ void margo_thread_sleep(margo_instance_id mid, double timeout_ms) margo_timer_destroy(sleep_timer); + PROGRESS_NEEDED_DECR(mid); + /* monitoring */ __MARGO_MONITOR(mid, FN_END, sleep, monitoring_args); } @@ -1956,14 +1970,15 @@ void __margo_hg_progress_fn(void* foo) unsigned int hg_progress_timeout; double next_timer_exp; unsigned int pending; - int spin_flag = 0; + int spin_flag = 0; double spin_start_ts = 0; while (!mid->hg_progress_shutdown_flag) { + + /* Wait for progress to actually be needed */ + WAIT_FOR_PROGRESS_TO_BE_NEEDED(mid); + do { - /* save value of instance diag variable, in case it is modified - * while we are in loop - */ ret = margo_internal_trigger(mid, 0, 1, &actual_count); } while ((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag); @@ -1978,7 +1993,7 @@ void __margo_hg_progress_fn(void* foo) /* We used a zero progress timeout (busy spinning) on the last * iteration. See if spindown time has elapsed yet. */ - if (((ABT_get_wtime() - spin_start_ts)*1000) + if (((ABT_get_wtime() - spin_start_ts) * 1000) < (double)mid->hg_progress_spindown_msec) { /* We are still in the spindown window; continue spinning * regardless of current conditions. @@ -2416,3 +2431,14 @@ hg_return_t _handler_for_NULL(hg_handle_t handle) margo_destroy(handle); return HG_SUCCESS; } + +int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed) +{ + if (mid == MARGO_INSTANCE_NULL) return -1; + mid->progress_when_needed.flag = when_needed; + if (!when_needed) { + ABT_cond_signal( + ABT_COND_MEMORY_GET_HANDLE(&mid->progress_when_needed.cond)); + } + return 0; +} diff --git a/src/margo-instance.h b/src/margo-instance.h index bcbd81d..11486fe 100644 --- a/src/margo-instance.h +++ b/src/margo-instance.h @@ -73,6 +73,14 @@ struct margo_instance { _Atomic unsigned hg_progress_timeout_ub; _Atomic unsigned hg_progress_spindown_msec; + /* "when_needed" progress logic */ + struct { + bool flag; + uint64_t pending; + ABT_mutex_memory mutex; + ABT_cond_memory cond; + } progress_when_needed; + uint16_t num_registered_rpcs; /* number of registered rpc's by all providers on this instance */ /* list of rpcs registered on this instance for debugging and profiling @@ -131,7 +139,8 @@ struct margo_instance { #define MARGO_RPC_POOL(mid) (mid)->abt.pools[mid->rpc_pool_idx].pool -typedef enum margo_request_kind { +typedef enum margo_request_kind +{ MARGO_REQ_EVENTUAL, MARGO_REQ_CALLBACK } margo_request_kind; @@ -159,10 +168,10 @@ struct margo_request_struct { struct margo_rpc_data { margo_instance_id mid; _Atomic(ABT_pool) pool; - char* rpc_name; - hg_proc_cb_t in_proc_cb; /* user-provided input proc */ - hg_proc_cb_t out_proc_cb; /* user-provided output proc */ - void* user_data; + char* rpc_name; + hg_proc_cb_t in_proc_cb; /* user-provided input proc */ + hg_proc_cb_t out_proc_cb; /* user-provided output proc */ + void* user_data; void (*user_free_callback)(void*); }; @@ -196,6 +205,50 @@ typedef struct { char is_asleep; } margo_thread_sleep_cb_dat; +#define PROGRESS_NEEDED_INCR(__mid__) \ + do { \ + bool notify = false; \ + if ((__mid__)->progress_when_needed.flag) { \ + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + notify = ++(__mid__)->progress_when_needed.pending == 1; \ + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + } \ + if (notify) { \ + ABT_cond_signal(ABT_COND_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.cond)); \ + } \ + } while (0) + +#define PROGRESS_NEEDED_DECR(__mid__) \ + do { \ + if ((__mid__)->progress_when_needed.flag) { \ + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + --(__mid__)->progress_when_needed.pending; \ + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + } \ + } while (0) + +#define WAIT_FOR_PROGRESS_TO_BE_NEEDED(__mid__) \ + do { \ + if ((__mid__)->progress_when_needed.flag) { \ + ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + while (!(__mid__)->progress_when_needed.pending) { \ + ABT_cond_wait(ABT_COND_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.cond), \ + ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + if (!(__mid__)->progress_when_needed.flag) break; \ + } \ + ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE( \ + &(__mid__)->progress_when_needed.mutex)); \ + } \ + } while (0) + #define MARGO_TRACE margo_trace #define MARGO_DEBUG margo_debug #define MARGO_INFO margo_info diff --git a/tests/unit-tests/margo-forward.c b/tests/unit-tests/margo-forward.c index e2f49c6..38d557c 100644 --- a/tests/unit-tests/margo-forward.c +++ b/tests/unit-tests/margo-forward.c @@ -107,6 +107,14 @@ static void* test_context_setup(const MunitParameter params[], void* user_data) } munit_assert_not_null(ctx->mid); + const char* progress_when_needed = munit_parameters_get(params, "progress_when_needed"); + if(progress_when_needed && strcmp(progress_when_needed, "true") == 0) + munit_assert_int( + margo_set_progress_when_needed(ctx->mid, true), ==, 0); + else + munit_assert_int( + margo_set_progress_when_needed(ctx->mid, false), ==, 0); + return ctx; } @@ -666,8 +674,15 @@ static MunitResult test_provider_cforward(const MunitParameter params[], static char* protocol_params[] = {"na+sm", NULL}; static char* progress_pool_params[] = {"fifo_wait", "prio_wait", "earliest_first", NULL}; +static char* progress_when_needed_params[] = {"true", "false", NULL}; static MunitParameterEnum test_params[] + = {{"protocol", protocol_params}, + {"progress_pool", progress_pool_params}, + {"progress_when_needed", progress_when_needed_params}, + {NULL, NULL}}; + +static MunitParameterEnum test_params2[] = {{"protocol", protocol_params}, {"progress_pool", progress_pool_params}, {NULL, NULL}}; @@ -678,7 +693,7 @@ static MunitTest test_suite_tests[] = { {(char*)"/forward_with_args", test_forward_with_args, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, {(char*)"/forward_with_shim", test_forward_with_shim, test_context_setup, - test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, + test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params2}, {(char*)"/forward_to_null", test_forward_to_null, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params}, {(char*)"/self_forward_to_null", test_self_forward_to_null, test_context_setup,