From 3e7a91ac878d328725d57c1081886dbcd6f43dbd Mon Sep 17 00:00:00 2001 From: Houjun Tang Date: Mon, 21 Aug 2023 19:06:43 -0400 Subject: [PATCH] More fixes for stacking cache and async vol --- src/h5_async_vol.c | 180 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 162 insertions(+), 18 deletions(-) diff --git a/src/h5_async_vol.c b/src/h5_async_vol.c index e65c49b..6c99092 100644 --- a/src/h5_async_vol.c +++ b/src/h5_async_vol.c @@ -146,6 +146,8 @@ typedef struct async_task_t { struct H5VL_async_t **parent_objs; /* pointer back to the parent async object when multi-dset is used*/ #endif + struct async_task_list_t *task_list_head; + clock_t create_time; clock_t start_time; clock_t end_time; @@ -2489,7 +2491,7 @@ H5VL_async_task_wait(async_task_t *async_task) static herr_t push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func) { - int i, is_dep_done = 1, ntask = 0, locked = 0; + int i, is_dep_done = 1, ntask = 0, locked = 0, ret_val = 0; ABT_thread_state thread_state; async_task_t * task_elt, *task_tmp; async_task_list_t *task_list_tmp, *task_list_elt; @@ -2556,6 +2558,7 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func task_elt->func, task_elt->dep_tasks[i]->func); #endif } + if (task_elt && task_elt->dep_tasks[i] && NULL != task_elt->dep_tasks[i]->abt_thread) { if (ABT_thread_get_state(task_elt->dep_tasks[i]->abt_thread, &thread_state) != ABT_SUCCESS) { @@ -2570,8 +2573,20 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func fprintf(fout_g, " [ASYNC VOL DBG] wait dep task [%p], thread_state %d\n", task_elt->dep_tasks[i]->func, thread_state); #endif + + if (locked && ABT_mutex_unlock(qhead->head_mutex) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); + return -1; + } // Dependent task already in abt pool, release lock and wait for it to finish H5VL_async_task_wait(task_elt->dep_tasks[i]); + + if (ABT_mutex_unlock(qhead->head_mutex) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); + return -1; + } + else + locked = 1; #ifdef ENABLE_DBG_MSG if (async_instance_g && (async_instance_g->mpi_rank == ASYNC_DBG_MSG_RANK || -1 == ASYNC_DBG_MSG_RANK)) @@ -2581,8 +2596,8 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func is_dep_done = 1; continue; } // End if thread is not terminated - } // End if dependent task is not finished - } // End for dependent parents of current task + } // End if dependent task is not finished + } // End for dependent parents of current task if (is_dep_done == 0) { #ifdef ENABLE_DBG_MSG @@ -2594,10 +2609,11 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func continue; } - if (task_elt && task_elt->is_done == 0) { + if (task_elt && task_elt->is_done == 0 && task_elt->in_abt_pool == 0) { ntask = get_n_running_task_in_queue_nolock(task_elt, __func__); if (ntask > 0) { func_log_int1(__func__, "skipping create thread, pool already has", ntask); + ret_val = -1; goto done; } @@ -2607,9 +2623,14 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func &task_elt->abt_thread) != ABT_SUCCESS) { fprintf(fout_g, " [ASYNC VOL ERROR] %s ABT_thread_create failed for %p\n", __func__, task_elt->func); - break; + ret_val = -1; + goto done; + } + else { + task_elt->in_abt_pool = 1; + ret_val = 0; + goto done; } - task_elt->in_abt_pool = 1; } #ifdef ENABLE_DBG_MSG else { @@ -2619,14 +2640,11 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func task_elt->func); } #endif - // Remove task from current task list - DL_DELETE(task_list_elt->task_list, task_elt); - task_elt->prev = NULL; - task_elt->next = NULL; - goto done; } // End DL_FOREACH_SAFE(task_list_elt, task_elt, task_tmp) } // End DL_FOREACH_SAFE(qhead->queue, task_list_elt, task_list_tmp) + ret_val = 0; + done: // Remove head if all its tasks have been pushed to Argobots pool if (qhead->queue && qhead->queue->task_list == NULL && qhead->queue->next == qhead->queue) { @@ -2641,9 +2659,37 @@ push_task_to_abt_pool(async_qhead_t *qhead, ABT_pool pool, const char *call_func func_leave(__func__); - return 1; + return ret_val = 0; } // End push_task_to_abt_pool +static void +remove_task_from_list(async_task_t *task, const char *func_name) +{ + if (ABT_mutex_lock(async_instance_g->qhead.head_mutex) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_lock\n", __func__); + return; + } + + // Remove task list if there is only one element (this task) in the list + if (task->task_list_head->task_list == task->task_list_head->task_list->prev) { + DL_DELETE(async_instance_g->qhead.queue, task->task_list_head); + } + else { + // Remove task from current task list + DL_DELETE(task->task_list_head->task_list, task); + func_log(func_name, "removed task from list"); + task->prev = NULL; + task->next = NULL; + } + + if (ABT_mutex_unlock(async_instance_g->qhead.head_mutex) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); + return; + } + + return; +} + /** * \ingroup ASYNC * @@ -2774,6 +2820,7 @@ add_task_to_queue(async_qhead_t *qhead, async_task_t *task, task_list_qtype task now.tv_usec, task->func, qtype_names_g[task_type]); #endif DL_APPEND(qhead->queue->prev->task_list, task); + task->task_list_head = qhead->queue->prev; } else { // Create a new task list in queue and add the current task to it @@ -2790,6 +2837,7 @@ add_task_to_queue(async_qhead_t *qhead, async_task_t *task, task_list_qtype task #endif DL_APPEND(new_list->task_list, task); DL_APPEND(qhead->queue, new_list); + task->task_list_head = new_list; } if (ABT_mutex_unlock(qhead->head_mutex) != ABT_SUCCESS) { @@ -5465,6 +5513,8 @@ async_attr_create_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -5861,6 +5911,8 @@ async_attr_open_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -6263,6 +6315,8 @@ async_attr_read_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -6597,6 +6651,8 @@ async_attr_write_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -6955,6 +7011,8 @@ async_attr_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -7290,6 +7348,8 @@ async_attr_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -7635,6 +7695,8 @@ async_attr_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -7962,6 +8024,8 @@ async_attr_close_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -8317,6 +8381,8 @@ async_dataset_create_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -8713,6 +8779,8 @@ async_dataset_open_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -9101,6 +9169,8 @@ async_dataset_read_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -9631,6 +9701,8 @@ async_dataset_read_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -10058,6 +10130,8 @@ async_dataset_write_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -10754,6 +10828,8 @@ async_dataset_write_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -11171,6 +11247,8 @@ async_dataset_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -11508,6 +11586,8 @@ async_dataset_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -11841,6 +11921,8 @@ async_dataset_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -12168,6 +12250,8 @@ async_dataset_close_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -12298,7 +12382,9 @@ async_dataset_close(task_list_qtype qtype, async_instance_t *aid, H5VL_async_t * } } else { - if (NULL == req || qtype == ISOLATED) + if (qtype == DEPENDENT) + add_task_to_queue(&aid->qhead, async_task, DEPENDENT); + else if (NULL == req || qtype == ISOLATED) add_task_to_queue(&aid->qhead, async_task, ISOLATED); else if (async_task->async_obj->is_col_meta == true) add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); @@ -12523,6 +12609,8 @@ async_datatype_commit_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -12895,6 +12983,8 @@ async_datatype_open_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -13249,6 +13339,8 @@ async_datatype_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -13580,6 +13672,8 @@ async_datatype_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -13909,6 +14003,8 @@ async_datatype_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -14237,6 +14333,8 @@ async_datatype_close_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -14592,6 +14690,8 @@ async_file_create_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -15288,6 +15388,8 @@ async_file_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -15619,6 +15721,8 @@ async_file_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -15959,9 +16063,13 @@ async_file_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { fprintf(fout_g, " [ ABT ERROR] %s H5TSmutex_release failed\n", __func__); } + #ifdef ENABLE_DBG_MSG if (async_instance_g && (async_instance_g->mpi_rank == ASYNC_DBG_MSG_RANK || -1 == ASYNC_DBG_MSG_RANK)) fprintf(fout_g, " [ ABT DBG] %s released global lock %u\n", __func__, mutex_count); @@ -16318,6 +16426,8 @@ async_file_close_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (async_instance_g && NULL != async_instance_g->qhead.queue && async_instance_g->start_abt_push) @@ -16696,6 +16806,8 @@ async_group_create_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -17073,6 +17185,8 @@ async_group_open_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -17427,6 +17541,8 @@ async_group_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -17763,6 +17879,8 @@ async_group_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -18091,6 +18209,8 @@ async_group_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -18422,6 +18542,8 @@ async_group_close_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -18778,6 +18900,8 @@ async_link_create_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -19149,6 +19273,8 @@ async_link_copy_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -19513,6 +19639,8 @@ async_link_move_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -19873,6 +20001,8 @@ async_link_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -20216,6 +20346,8 @@ async_link_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -20557,6 +20689,8 @@ async_link_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -20901,6 +21035,8 @@ async_object_open_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -21274,6 +21410,8 @@ async_object_copy_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -21632,6 +21770,8 @@ async_object_get_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -21975,6 +22115,8 @@ async_object_specific_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -22322,6 +22464,8 @@ async_object_optional_fn(void *foo) task->in_abt_pool = 0; task->is_done = 1; + remove_task_from_list(task, __func__); + func_log(__func__, "release global lock"); if (acquired == true && H5TSmutex_release(&mutex_count) < 0) { @@ -23027,10 +23171,10 @@ H5VL_async_is_implicit_disabled(int op_type, const char *func_name) } // If implicit is disabled, wait for all existing async task to finish - if (ret_value == 1) { - func_log(func_name, "implicit async disabled, wait for existing async tasks"); - async_waitall(1); - } + /* if (ret_value == 1) { */ + /* func_log(func_name, "implicit async disabled, wait for existing async tasks"); */ + /* async_waitall(1); */ + /* } */ return ret_value; } @@ -23905,7 +24049,7 @@ H5VL_async_dataset_close(void *dset, hid_t dxpl_id, void **req) H5VL_async_t * o = (H5VL_async_t *)dset; herr_t ret_value; hbool_t is_term; - task_list_qtype qtype = REGULAR; + task_list_qtype qtype = DEPENDENT; #ifdef ENABLE_ASYNC_LOGGING printf("------- ASYNC VOL DATASET Close\n");