diff --git a/src/h5_async_vol.c b/src/h5_async_vol.c index b6198f7..d01e766 100644 --- a/src/h5_async_vol.c +++ b/src/h5_async_vol.c @@ -5166,6 +5166,51 @@ free_native_object_optional_args(async_object_optional_args_t *args) } } +static herr_t +block_and_wait_task(async_task_t *async_task, const char *call_func) +{ + hbool_t acquired = false; + unsigned int mutex_count = 0; + herr_t ret_val = 0; + + func_log(call_func, "task is set to be blocking"); + + if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, call_func) == 0) + push_task_to_abt_pool(&async_instance_g->qhead, async_instance_g->pool, call_func); + + func_log(call_func, "releasing global lock"); + + if (H5TSmutex_release(&mutex_count) < 0) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", call_func); + } + + func_log(call_func, "lock released, wait to finish previous task, SYNC MODE now!"); + + if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", call_func); + ret_val = -1; + } + + func_log(call_func, "finished all previous tasks"); + + while (acquired == false && mutex_count > 0) { + if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", call_func); + ret_val = -1; + } + } + + func_log_int1(call_func, "re-acquired global lock, count", mutex_count); + + /* Failed background thread execution */ + if (async_task->err_stack != 0) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s HDF5 op failed\n", call_func); + ret_val = -1; + } + + return ret_val; +} + static void async_attr_create_fn(void *foo) { @@ -5218,15 +5263,15 @@ async_attr_create_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -5504,39 +5549,8 @@ async_attr_create(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_lo /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) { - async_obj = NULL; + if (block_and_wait_task(async_task, __func__) < 0) goto error; - } } // Restore async operation state @@ -5544,7 +5558,6 @@ async_attr_create(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_lo func_leave(__func__); -done: return async_obj; error: // Restore async operation state @@ -5614,15 +5627,15 @@ async_attr_open_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -5888,35 +5901,7 @@ async_attr_open(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_loc_ /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -5995,15 +5980,15 @@ async_attr_read_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -6216,35 +6201,7 @@ async_attr_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_type_ /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -6323,15 +6280,15 @@ async_attr_write_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -6569,35 +6526,7 @@ async_attr_write(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_type /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -6676,15 +6605,15 @@ async_attr_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -6893,35 +6822,7 @@ async_attr_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_ob /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -7000,15 +6901,15 @@ async_attr_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -7235,35 +7136,7 @@ async_attr_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -7342,15 +7215,15 @@ async_attr_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -7556,35 +7429,7 @@ async_attr_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -7663,15 +7508,15 @@ async_attr_close_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -7885,35 +7730,7 @@ async_attr_close(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_ /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -7992,15 +7809,15 @@ async_dataset_create_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -8277,35 +8094,7 @@ async_dataset_create(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -8314,7 +8103,6 @@ async_dataset_create(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL func_leave(__func__); -done: return async_obj; error: // Restore async operation state @@ -8384,15 +8172,15 @@ async_dataset_open_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -8636,35 +8424,7 @@ async_dataset_open(task_type_t qtype, async_instance_t *aid, H5VL_async_t *paren /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -8745,15 +8505,15 @@ async_dataset_read_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } } @@ -9039,35 +8799,7 @@ async_dataset_read(async_instance_t *aid, size_t count, H5VL_async_t **parent_ob } if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -9285,15 +9017,15 @@ async_dataset_read_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -9519,35 +9251,7 @@ async_dataset_read(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_ty } if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -9675,15 +9379,15 @@ async_dataset_write_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } } @@ -10061,35 +9765,7 @@ async_dataset_write(async_instance_t *aid, size_t count, H5VL_async_t **parent_o } if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -10369,15 +10045,15 @@ async_dataset_write_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -10681,35 +10357,7 @@ async_dataset_write(async_instance_t *aid, H5VL_async_t *parent_obj, hid_t mem_t } if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -10789,15 +10437,15 @@ async_dataset_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -11013,35 +10661,7 @@ async_dataset_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -11120,15 +10740,15 @@ async_dataset_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -11340,35 +10960,7 @@ async_dataset_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t *p /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -11447,15 +11039,15 @@ async_dataset_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -11662,35 +11254,7 @@ async_dataset_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t *p /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -11769,15 +11333,15 @@ async_dataset_close_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -11998,35 +11562,7 @@ async_dataset_close(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -12105,15 +11641,15 @@ async_datatype_commit_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -12367,35 +11903,7 @@ async_datatype_commit(async_instance_t *aid, H5VL_async_t *parent_obj, const H5V /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -12474,15 +11982,15 @@ async_datatype_open_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -12723,40 +12231,12 @@ async_datatype_open(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_ /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); + if (block_and_wait_task(async_task, __func__) < 0) + goto error; + } - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) - goto error; - } - - // Restore async operation state - async_instance_g->start_abt_push = async_instance_g->prev_push_state; + // Restore async operation state + async_instance_g->start_abt_push = async_instance_g->prev_push_state; func_leave(__func__); @@ -12830,15 +12310,15 @@ async_datatype_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -13048,35 +12528,7 @@ async_datatype_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *paren /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -13155,15 +12607,15 @@ async_datatype_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -13371,35 +12823,7 @@ async_datatype_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t * /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -13478,15 +12902,15 @@ async_datatype_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -13694,35 +13118,7 @@ async_datatype_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t * /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -13801,15 +13197,15 @@ async_datatype_close_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -14020,35 +13416,7 @@ async_datatype_close(task_type_t qtype, async_instance_t *aid, H5VL_async_t *par aid->start_abt_push = true; /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -14254,8 +13622,6 @@ async_file_create(async_instance_t *aid, const char *name, unsigned flags, hid_t async_file_create_args_t *args = NULL; bool lock_self = false; bool is_blocking = false; - hbool_t acquired = false; - unsigned int mutex_count = 1; func_enter(__func__, name); @@ -14370,35 +13736,7 @@ async_file_create(async_instance_t *aid, const char *name, unsigned flags, hid_t /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -14407,7 +13745,6 @@ async_file_create(async_instance_t *aid, const char *name, unsigned flags, hid_t func_leave(__func__); -done: return async_obj; error: // Restore async operation state @@ -14604,8 +13941,6 @@ async_file_open(task_type_t qtype, async_instance_t *aid, const char *name, unsi async_file_open_args_t *args = NULL; bool lock_self = false; bool is_blocking = false; - hbool_t acquired = false; - unsigned int mutex_count = 1; func_enter(__func__, name); @@ -14711,35 +14046,7 @@ async_file_open(task_type_t qtype, async_instance_t *aid, const char *name, unsi /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -14748,7 +14055,6 @@ async_file_open(task_type_t qtype, async_instance_t *aid, const char *name, unsi func_leave(__func__); -done: return async_obj; error: // Restore async operation state @@ -14818,15 +14124,15 @@ async_file_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -15036,35 +14342,7 @@ async_file_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_ob /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -15143,15 +14421,15 @@ async_file_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -15370,35 +14648,7 @@ async_file_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -15477,15 +14727,15 @@ async_file_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -15701,35 +14951,7 @@ async_file_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -15808,15 +15030,15 @@ async_file_close_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -16075,35 +15297,7 @@ async_file_close(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_ aid->start_abt_push = true; /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -16179,15 +15373,15 @@ async_group_create_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -16444,35 +15638,7 @@ async_group_create(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_l /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -16481,7 +15647,6 @@ async_group_create(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_l func_leave(__func__); -done: return async_obj; error: // Restore async operation state @@ -16554,15 +15719,15 @@ async_group_open_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -16803,35 +15968,7 @@ async_group_open(async_instance_t *aid, H5VL_async_t *parent_obj, const H5VL_loc /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -16910,15 +16047,15 @@ async_group_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -17133,35 +16270,7 @@ async_group_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_o /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -17240,15 +16349,15 @@ async_group_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -17455,35 +16564,7 @@ async_group_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t *par /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -17562,15 +16643,15 @@ async_group_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -17777,35 +16858,7 @@ async_group_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t *par /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -17884,15 +16937,15 @@ async_group_close_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -18118,36 +17171,7 @@ async_group_close(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -18229,15 +17253,15 @@ async_link_create_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -18473,49 +17497,21 @@ async_link_create(task_type_t qtype, async_instance_t *aid, H5VL_link_create_arg add_task_to_queue(&aid->qhead, async_task, COLLECTIVE); else add_task_to_queue(&aid->qhead, async_task, REGULAR); - } - - if (ABT_mutex_unlock(parent_obj->obj_mutex) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); - goto error; - } - lock_parent = false; - if (aid->ex_delay == false && !async_instance_g->pause) { - if (get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - } - - /* Wait if blocking is needed */ - if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); + } + + if (ABT_mutex_unlock(parent_obj->obj_mutex) != ABT_SUCCESS) { + fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_mutex_unlock\n", __func__); + goto error; + } + lock_parent = false; + if (aid->ex_delay == false && !async_instance_g->pause) { + if (get_n_running_task_in_queue(async_task, __func__) == 0) + push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); + } - /* Failed background thread execution */ - if (async_task->err_stack != 0) + /* Wait if blocking is needed */ + if (is_blocking) { + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -18845,35 +17841,7 @@ async_link_copy(async_instance_t *aid, H5VL_async_t *parent_obj1, const H5VL_loc /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -19204,35 +18172,7 @@ async_link_move(async_instance_t *aid, H5VL_async_t *parent_obj1, const H5VL_loc /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -19311,15 +18251,15 @@ async_link_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -19538,35 +18478,7 @@ async_link_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_ob /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -19645,15 +18557,15 @@ async_link_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -19876,35 +18788,7 @@ async_link_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -19983,15 +18867,15 @@ async_link_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -20211,35 +19095,7 @@ async_link_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pare /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -20318,15 +19174,15 @@ async_object_open_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -20565,35 +19421,7 @@ async_object_open(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -20925,35 +19753,7 @@ async_object_copy(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -21032,15 +19832,15 @@ async_object_get_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -21263,35 +20063,7 @@ async_object_get(task_type_t qtype, async_instance_t *aid, H5VL_async_t *parent_ /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -21370,15 +20142,15 @@ async_object_specific_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -21606,35 +20378,7 @@ async_object_specific(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pa /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -21713,15 +20457,15 @@ async_object_optional_fn(void *foo) goto done; } - func_log(__func__, "parent object is NULL, re-insert to pool"); + /* func_log(__func__, "parent object is NULL, re-insert to pool"); */ - if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, - &task->abt_thread) != ABT_SUCCESS) { - fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, - task->func); - } + /* if (ABT_thread_create(*task->async_obj->pool_ptr, task->func, task, ABT_THREAD_ATTR_NULL, */ + /* &task->abt_thread) != ABT_SUCCESS) { */ + /* fprintf(fout_g, " [ ABT ERROR] %s ABT_thread_create failed for %p\n", __func__, */ + /* task->func); */ + /* } */ - goto done; + /* goto done; */ } } @@ -21941,37 +20685,7 @@ async_object_optional(task_type_t qtype, async_instance_t *aid, H5VL_async_t *pa /* Wait if blocking is needed */ if (is_blocking) { - func_log(__func__, "task is set to be blocking"); - - if (async_instance_g->start_abt_push || get_n_running_task_in_queue(async_task, __func__) == 0) - push_task_to_abt_pool(&aid->qhead, aid->pool, __func__); - - func_log(__func__, "release global lock"); - - func_log(__func__, "release global lock"); - if (H5TSmutex_release(&mutex_count) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_release failed\n", __func__); - } - - func_log(__func__, "waiting to finish all previous tasks, SYNC MODE now!"); - - if (ABT_eventual_wait(async_task->eventual, NULL) != ABT_SUCCESS) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s with ABT_eventual_wait\n", __func__); - goto error; - } - - func_log(__func__, "finished all previous tasks"); - - while (acquired == false && mutex_count > 0) { - if (H5TSmutex_acquire(mutex_count, &acquired) < 0) { - fprintf(fout_g, " [ASYNC VOL ERROR] %s H5TSmutex_acquire failed\n", __func__); - goto done; - } - } - func_log_int1(__func__, "acquired global lock, count", mutex_count); - - /* Failed background thread execution */ - if (async_task->err_stack != 0) + if (block_and_wait_task(async_task, __func__) < 0) goto error; } @@ -23988,7 +22702,7 @@ H5VL_async_file_specific(void *file, H5VL_file_specific_args_t *args, hid_t dxpl H5Pclose(new_fapl_id); /* Release copy of our VOL info */ - if (info) + if (ret_value == 0) H5VL_async_info_free(info); } /* end if */ else {