From 8c373a5b2905259849c8c8449013376791bd7c5b Mon Sep 17 00:00:00 2001 From: Matthew Larson Date: Tue, 25 Jul 2023 16:14:03 -0500 Subject: [PATCH] Remove delay, remove special case single r/w The delay drastically impacted time taken for local/low dset ops, and using regular curl for one dataset read/writes didn't save any time. --- src/rest_vol_dataset.c | 546 +++++++++++++++++++---------------------- 1 file changed, 251 insertions(+), 295 deletions(-) diff --git a/src/rest_vol_dataset.c b/src/rest_vol_dataset.c index 4c13ff5e..8605fc39 100644 --- a/src/rest_vol_dataset.c +++ b/src/rest_vol_dataset.c @@ -80,7 +80,7 @@ const char *external_storage_keys[] = {"externalStorage", (const char *)0} #define BACKOFF_INITIAL_DURATION 10000000 /* 10,000,000 ns -> 0.01 sec */ #define BACKOFF_SCALE_FACTOR 1.5 #define BACKOFF_MAX_BEFORE_FAIL 30000000000 /* 30,000,000,000 ns -> 30 sec */ -#define DELAY_BETWEEN_HANDLE_CHECKS 10000000 /* 10,000,000 ns -> 0.01 sec */ +#define DELAY_BETWEEN_HANDLE_CHECKS 500000 /* 10,000,000 ns -> 0.01 sec */ /* Default sizes for strings formed when dealing with turning a * representation of an HDF5 dataspace and a selection within one into JSON @@ -763,7 +763,7 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't set cURL request URL: %s", curl_err_bufs[i]); - if ((count > 1) && (CURLM_OK != curl_multi_add_handle(curl_multi_handle, curl_easy_handles[i]))) + if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, curl_easy_handles[i])) FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't add cURL handle: %s", curl_err_buf); } @@ -775,89 +775,52 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac printf(" \\***************************************/\n\n"); #endif - /* Avoid the multi curl overhead in the single dataset case */ - if (count == 1) { - CURL_PERFORM_NO_GLOBAL(curl_easy_handles[0], response_buffers[0], H5E_DATASET, H5E_READERROR, FAIL); - - size_t handle_index = 0; - - /* Post-processing */ - if ((H5T_REFERENCE != dtype_class) && (H5T_VLEN != dtype_class) && !is_variable_str) { - size_t dtype_size; - - if (0 == (dtype_size = H5Tget_size(mem_type_id[handle_index]))) - FUNC_GOTO_ERROR(H5E_DATATYPE, H5E_BADVALUE, FAIL, "memory datatype is invalid"); - - /* Scatter the read data out to the supplied read buffer according to the - * mem_type_id and mem_space_id given */ - read_data_size = (size_t)file_select_npoints[handle_index] * dtype_size; - struct response_read_info resp_info; - resp_info.response_buf = &response_buffers[handle_index]; - resp_info.read_size = &read_data_size; - -#ifdef RV_CONNECTOR_DEBUG - printf(" Attempting to scatter data from read #%zu\n", handle_index); -#endif - - if (H5Dscatter(dataset_read_scatter_op, &resp_info, mem_type_id[handle_index], - mem_space_id[handle_index], buf[handle_index]) < 0) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "can't scatter data to read buffer"); - } - else { - if (H5T_STD_REF_OBJ == mem_type_id[handle_index]) { - /* Convert the received binary buffer into a buffer of rest_obj_ref_t's */ - if (RV_convert_buffer_to_obj_refs(response_buffers[handle_index].buffer, - (size_t)file_select_npoints[handle_index], - (rv_obj_ref_t **)&obj_ref_buf, &read_data_size) < 0) - FUNC_GOTO_ERROR(H5E_DATATYPE, H5E_CANTCONVERT, FAIL, - "can't convert ref string/s to object ref array"); - - memcpy(buf[handle_index], obj_ref_buf, read_data_size); - } - } - } - else { /* Count > 1 */ - if (CURLM_OK != - curl_multi_setopt(curl_multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, NUM_MAX_HOST_CONNS)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "failed to set max concurrent streams: %s", - curl_err_buf); - - int still_running = 1; - int num_curlm_msgs = 0; - CURLMsg *curl_multi_msg = NULL; - size_t fail_count = 0; - size_t succeed_count = 0; - int events_occurred = 0; - size_t num_finished = 0; - size_t handle_index = 0; - - struct timespec delay; - delay.tv_sec = 0; - delay.tv_nsec = DELAY_BETWEEN_HANDLE_CHECKS; - - while (num_finished < count) { - fail_count = 0; - succeed_count = 0; - if (CURLM_OK != curl_multi_perform(curl_multi_handle, &still_running)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "cURL multi perform error"); + if (CURLM_OK != + curl_multi_setopt(curl_multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, NUM_MAX_HOST_CONNS)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "failed to set max concurrent streams: %s", + curl_err_buf); - if (CURLM_OK != curl_multi_poll(curl_multi_handle, NULL, 0, 0, &events_occurred)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "cURL multi poll error"); + int still_running = 1; + int num_curlm_msgs = 0; + CURLMsg *curl_multi_msg = NULL; + size_t fail_count = 0; + size_t succeed_count = 0; + int events_occurred = 0; + size_t num_finished = 0; + size_t handle_index = 0; + + /* + Lowers CPU usage dramatically, but also vastly increases time taken for requests to local storage + when the number of datasets is small. + */ + + struct timespec delay; + delay.tv_sec = 0; + delay.tv_nsec = DELAY_BETWEEN_HANDLE_CHECKS; + + while (num_finished < count) { + fail_count = 0; + succeed_count = 0; + if (CURLM_OK != curl_multi_perform(curl_multi_handle, &still_running)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "cURL multi perform error"); + + if (CURLM_OK != curl_multi_poll(curl_multi_handle, NULL, 0, 0, &events_occurred)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "cURL multi poll error"); #ifdef RV_CONNECTOR_DEBUG - printf("%zu events occurred\n", events_occurred); + printf("%zu events occurred\n", events_occurred); #endif - while (curl_multi_msg = curl_multi_info_read(curl_multi_handle, &num_curlm_msgs)) { - long response_code; + while (curl_multi_msg = curl_multi_info_read(curl_multi_handle, &num_curlm_msgs)) { + long response_code; - if (curl_multi_msg->msg == CURLMSG_DONE) { - if (CURLE_OK != curl_easy_getinfo(curl_multi_msg->easy_handle, CURLINFO_RESPONSE_CODE, - &response_code)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "can't get HTTP response code"); + if (curl_multi_msg->msg == CURLMSG_DONE) { + if (CURLE_OK != curl_easy_getinfo(curl_multi_msg->easy_handle, CURLINFO_RESPONSE_CODE, + &response_code)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, "can't get HTTP response code"); #ifdef RV_CONNECTOR_DEBUG - printf(" RV_dataset_read to %zu elements received %zu response\n", count, response_code); + printf(" RV_dataset_read to %zu elements received %zu response\n", count, response_code); #endif /* Gracefully handle 503 Error, which can result from sending too many simultaneous * requests */ @@ -865,147 +828,146 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac wait_count++; /* Identify the handle by its original index in the easy handle array*/ - if (RV_get_index_of_matching_handle(curl_easy_handles, count, - curl_multi_msg->easy_handle, &handle_index) < 0) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "can't get handle information for retry"); + if (RV_get_index_of_matching_handle(curl_easy_handles, count, + curl_multi_msg->easy_handle, &handle_index) < 0) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "can't get handle information for retry"); #ifdef RV_CONNECTOR_DEBUG - printf("Read handle %zu failed - waiting %zuns to retry\n", handle_index, - current_backoff_duration[handle_index]); + printf("Read handle %zu failed - waiting %zuns to retry\n", handle_index, + current_backoff_duration[handle_index]); #endif - if (CURLM_OK != - curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "failed to remove denied cURL handle"); + if (CURLM_OK != + curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "failed to remove denied cURL handle"); - response_buffers[handle_index].curr_buf_ptr = response_buffers[handle_index].buffer; + response_buffers[handle_index].curr_buf_ptr = response_buffers[handle_index].buffer; - failed_handles_to_retry[handle_index] = curl_multi_msg->easy_handle; + failed_handles_to_retry[handle_index] = curl_multi_msg->easy_handle; - struct timespec tms; + struct timespec tms; - clock_gettime(CLOCK_MONOTONIC, &tms); + clock_gettime(CLOCK_MONOTONIC, &tms); - time_of_fail[handle_index] = - (size_t)tms.tv_sec * 1000 * 1000 * 1000 + (size_t)tms.tv_nsec; + time_of_fail[handle_index] = + (size_t)tms.tv_sec * 1000 * 1000 * 1000 + (size_t)tms.tv_nsec; - current_backoff_duration[handle_index] = - (current_backoff_duration[handle_index] == 0) - ? BACKOFF_INITIAL_DURATION - : (size_t)((double)current_backoff_duration[handle_index] * - BACKOFF_SCALE_FACTOR); + current_backoff_duration[handle_index] = + (current_backoff_duration[handle_index] == 0) + ? BACKOFF_INITIAL_DURATION + : (size_t)((double)current_backoff_duration[handle_index] * + BACKOFF_SCALE_FACTOR); - /* Randomize time to avoid doing all retry attempts at once */ - int random_factor = rand(); - current_backoff_duration[handle_index] = - (size_t)((double)current_backoff_duration[handle_index] * - (1.0 + ((double)random_factor / (double)RAND_MAX))); + /* Randomize time to avoid doing all retry attempts at once */ + int random_factor = rand(); + current_backoff_duration[handle_index] = + (size_t)((double)current_backoff_duration[handle_index] * + (1.0 + ((double)random_factor / (double)RAND_MAX))); - if (current_backoff_duration[handle_index] >= BACKOFF_MAX_BEFORE_FAIL) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "Unable to reach server for write: 503 service unavailable"); + if (current_backoff_duration[handle_index] >= BACKOFF_MAX_BEFORE_FAIL) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "Unable to reach server for write: 503 service unavailable"); - fail_count++; - } - else if (response_code == 200) { - /* Do post-processing now so we can clean up early to improve performance */ - num_finished++; - succeed_count++; + fail_count++; + } + else if (response_code == 200) { + /* Do post-processing now so we can clean up early to improve performance */ + num_finished++; + succeed_count++; - if (RV_get_index_of_matching_handle(curl_easy_handles, count, - curl_multi_msg->easy_handle, &handle_index) < 0) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "can't get handle information for retry"); + if (RV_get_index_of_matching_handle(curl_easy_handles, count, + curl_multi_msg->easy_handle, &handle_index) < 0) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "can't get handle information for retry"); - /* Post-processing */ - if ((H5T_REFERENCE != dtype_class) && (H5T_VLEN != dtype_class) && !is_variable_str) { - size_t dtype_size; + /* Post-processing */ + if ((H5T_REFERENCE != dtype_class) && (H5T_VLEN != dtype_class) && !is_variable_str) { + size_t dtype_size; - if (0 == (dtype_size = H5Tget_size(mem_type_id[handle_index]))) - FUNC_GOTO_ERROR(H5E_DATATYPE, H5E_BADVALUE, FAIL, - "memory datatype is invalid"); + if (0 == (dtype_size = H5Tget_size(mem_type_id[handle_index]))) + FUNC_GOTO_ERROR(H5E_DATATYPE, H5E_BADVALUE, FAIL, + "memory datatype is invalid"); - /* Scatter the read data out to the supplied read buffer according to the - * mem_type_id and mem_space_id given */ - read_data_size = (size_t)file_select_npoints[handle_index] * dtype_size; - struct response_read_info resp_info; - resp_info.response_buf = &response_buffers[handle_index]; - resp_info.read_size = &read_data_size; + /* Scatter the read data out to the supplied read buffer according to the + * mem_type_id and mem_space_id given */ + read_data_size = (size_t)file_select_npoints[handle_index] * dtype_size; + struct response_read_info resp_info; + resp_info.response_buf = &response_buffers[handle_index]; + resp_info.read_size = &read_data_size; #ifdef RV_CONNECTOR_DEBUG - printf(" Attempting to scatter data from read #%zu\n", handle_index); + printf(" Attempting to scatter data from read #%zu\n", handle_index); #endif - if (H5Dscatter(dataset_read_scatter_op, &resp_info, mem_type_id[handle_index], - mem_space_id[handle_index], buf[handle_index]) < 0) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "can't scatter data to read buffer"); - } - else { - if (H5T_STD_REF_OBJ == mem_type_id[handle_index]) { - /* Convert the received binary buffer into a buffer of rest_obj_ref_t's */ - if (RV_convert_buffer_to_obj_refs(response_buffers[handle_index].buffer, - (size_t)file_select_npoints[handle_index], - (rv_obj_ref_t **)&obj_ref_buf, - &read_data_size) < 0) - FUNC_GOTO_ERROR(H5E_DATATYPE, H5E_CANTCONVERT, FAIL, - "can't convert ref string/s to object ref array"); - - memcpy(buf[handle_index], obj_ref_buf, read_data_size); - } + if (H5Dscatter(dataset_read_scatter_op, &resp_info, mem_type_id[handle_index], + mem_space_id[handle_index], buf[handle_index]) < 0) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "can't scatter data to read buffer"); + } + else { + if (H5T_STD_REF_OBJ == mem_type_id[handle_index]) { + /* Convert the received binary buffer into a buffer of rest_obj_ref_t's */ + if (RV_convert_buffer_to_obj_refs(response_buffers[handle_index].buffer, + (size_t)file_select_npoints[handle_index], + (rv_obj_ref_t **)&obj_ref_buf, + &read_data_size) < 0) + FUNC_GOTO_ERROR(H5E_DATATYPE, H5E_CANTCONVERT, FAIL, + "can't convert ref string/s to object ref array"); + + memcpy(buf[handle_index], obj_ref_buf, read_data_size); } + } - /* Clean up */ - if (CURLM_OK != - curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "failed to remove finished cURL handle"); + /* Clean up */ + if (CURLM_OK != + curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "failed to remove finished cURL handle"); - curl_easy_cleanup(curl_multi_msg->easy_handle); + curl_easy_cleanup(curl_multi_msg->easy_handle); - curl_easy_handles[handle_index] = NULL; + curl_easy_handles[handle_index] = NULL; - RV_free(request_urls[handle_index]); - request_urls[handle_index] = NULL; + RV_free(request_urls[handle_index]); + request_urls[handle_index] = NULL; - RV_free(response_buffers[handle_index].buffer); - response_buffers[handle_index].buffer = NULL; + RV_free(response_buffers[handle_index].buffer); + response_buffers[handle_index].buffer = NULL; - RV_free(curl_err_bufs[handle_index]); - curl_err_bufs[handle_index] = NULL; - } - else { - HANDLE_RESPONSE(response_code, H5E_DATASET, H5E_READERROR, FAIL); - } + RV_free(curl_err_bufs[handle_index]); + curl_err_bufs[handle_index] = NULL; } - } /* while (curl_multi_msg);*/ - - /* TODO: Replace with an epoll-like structure of some kind, manually iterating this will probably - * be slow */ - struct timespec curr_time; - clock_gettime(CLOCK_MONOTONIC, &curr_time); - size_t curr_time_ns = (size_t)curr_time.tv_sec * 1000 * 1000 * 1000 + (size_t)curr_time.tv_nsec; - - for (size_t i = 0; i < count; i++) { - if (failed_handles_to_retry[i] && - ((curr_time_ns - time_of_fail[i]) >= current_backoff_duration[i])) { - if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, failed_handles_to_retry[i])) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, - "failed to re-add denied cURL handle"); - - failed_handles_to_retry[i] = NULL; + else { + HANDLE_RESPONSE(response_code, H5E_DATASET, H5E_READERROR, FAIL); } } - - nanosleep(&delay, NULL); + } /* while (curl_multi_msg);*/ + + /* TODO: Replace with an epoll-like structure of some kind, manually iterating this will probably + * be slow */ + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + size_t curr_time_ns = (size_t)curr_time.tv_sec * 1000 * 1000 * 1000 + (size_t)curr_time.tv_nsec; + + for (size_t i = 0; i < count; i++) { + if (failed_handles_to_retry[i] && + ((curr_time_ns - time_of_fail[i]) >= current_backoff_duration[i])) { + if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, failed_handles_to_retry[i])) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL, + "failed to re-add denied cURL handle"); + + failed_handles_to_retry[i] = NULL; + } } + + nanosleep(&delay, NULL); } done: - fprintf(stderr, "\nWait count during read: %zu\n", wait_count); + //fprintf(stderr, "\nWait count during read: %zu\n", wait_count); if (obj_ref_buf) RV_free(obj_ref_buf); @@ -1437,7 +1399,7 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa if (write_len[i] > 0) { expected_num_writes++; - if ((count > 1) && CURLM_OK != curl_multi_add_handle(curl_multi_handle, curl_easy_handles[i])) + if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, curl_easy_handles[i])) FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "can't add cURL handle: %s", curl_err_buf); } @@ -1457,66 +1419,61 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa size_t wait_count = 0; - /* Avoid the multi curl overhead in the single dataset case */ - if (count == 1 && expected_num_writes == 1) { - CURL_PERFORM_NO_GLOBAL(curl_easy_handles[0], response_buffers[0], H5E_DATASET, H5E_WRITEERROR, FAIL); - } - else { - - if (CURLM_OK != - curl_multi_setopt(curl_multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, NUM_MAX_HOST_CONNS)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "failed to set max concurrent streams: %s", - curl_err_buf); + if (CURLM_OK != + curl_multi_setopt(curl_multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, NUM_MAX_HOST_CONNS)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTSET, FAIL, "failed to set max concurrent streams: %s", + curl_err_buf); - int still_running = 0; - int num_curlm_msgs = 0; - int events_occurred = 0; - CURLMsg *curl_multi_msg = NULL; - size_t fail_count = 0; - size_t succeed_count = 0; - size_t num_finished = 0; - size_t handle_index = 0; - struct timespec delay; - delay.tv_sec = 0; - delay.tv_nsec = DELAY_BETWEEN_HANDLE_CHECKS; - - if ((failed_handles_to_retry = calloc(count, sizeof(CURL *))) == NULL) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, - "can't allocate space for cURL headers to be retried"); + int still_running = 0; + int num_curlm_msgs = 0; + int events_occurred = 0; + CURLMsg *curl_multi_msg = NULL; + size_t fail_count = 0; + size_t succeed_count = 0; + size_t num_finished = 0; + size_t handle_index = 0; + + struct timespec delay; + delay.tv_sec = 0; + delay.tv_nsec = DELAY_BETWEEN_HANDLE_CHECKS; + + if ((failed_handles_to_retry = calloc(count, sizeof(CURL *))) == NULL) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "can't allocate space for cURL headers to be retried"); - if ((current_backoff_duration = calloc(count, sizeof(size_t))) == NULL) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, - "can't allocate space for backoff durations of handles"); + if ((current_backoff_duration = calloc(count, sizeof(size_t))) == NULL) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "can't allocate space for backoff durations of handles"); - if ((time_of_fail = calloc(count, sizeof(size_t))) == NULL) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, - "can't allocate space for time of rejected cURL connections"); + if ((time_of_fail = calloc(count, sizeof(size_t))) == NULL) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL, + "can't allocate space for time of rejected cURL connections"); - memset(failed_handles_to_retry, 0, sizeof(CURL *) * count); + memset(failed_handles_to_retry, 0, sizeof(CURL *) * count); - while (num_finished < expected_num_writes) { - fail_count = 0; - succeed_count = 0; + while (num_finished < expected_num_writes) { + fail_count = 0; + succeed_count = 0; - if (CURLM_OK != curl_multi_perform(curl_multi_handle, &still_running)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "cURL multi perform error"); + if (CURLM_OK != curl_multi_perform(curl_multi_handle, &still_running)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "cURL multi perform error"); - if (CURLM_OK != curl_multi_poll(curl_multi_handle, NULL, 0, 0, &events_occurred)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "cURL multi poll error"); + if (CURLM_OK != curl_multi_poll(curl_multi_handle, NULL, 0, 0, &events_occurred)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "cURL multi poll error"); #ifdef RV_CONNECTOR_DEBUG - printf("%zu events occurred\n", events_occurred); + printf("%zu events occurred\n", events_occurred); #endif - while (curl_multi_msg = curl_multi_info_read(curl_multi_handle, &num_curlm_msgs)) { - long response_code; + while (curl_multi_msg = curl_multi_info_read(curl_multi_handle, &num_curlm_msgs)) { + long response_code; - if (curl_multi_msg && (curl_multi_msg->msg == CURLMSG_DONE)) { - if (CURLE_OK != curl_easy_getinfo(curl_multi_msg->easy_handle, CURLINFO_RESPONSE_CODE, - &response_code)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't get HTTP response code"); + if (curl_multi_msg && (curl_multi_msg->msg == CURLMSG_DONE)) { + if (CURLE_OK != curl_easy_getinfo(curl_multi_msg->easy_handle, CURLINFO_RESPONSE_CODE, + &response_code)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't get HTTP response code"); #ifdef RV_CONNECTOR_DEBUG - printf(" RV_dataset_write to %zu elements received %zu response\n", count, response_code); + printf(" RV_dataset_write to %zu elements received %zu response\n", count, response_code); #endif /* Gracefully handle 503 Error, which can result from sending too many simultaneous @@ -1525,28 +1482,28 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa wait_count++; - if (RV_get_index_of_matching_handle(curl_easy_handles, count, - curl_multi_msg->easy_handle, &handle_index) < 0) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, - "can't get handle information for retry"); + if (RV_get_index_of_matching_handle(curl_easy_handles, count, + curl_multi_msg->easy_handle, &handle_index) < 0) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "can't get handle information for retry"); - /* Restart request next time */ - uinfo_arr[handle_index].bytes_sent = 0; + /* Restart request next time */ + uinfo_arr[handle_index].bytes_sent = 0; - if (CURLM_OK != - curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, - "failed to remove denied cURL handle"); + if (CURLM_OK != + curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "failed to remove denied cURL handle"); - /* Identify the handle by its original index */ - failed_handles_to_retry[handle_index] = curl_multi_msg->easy_handle; + /* Identify the handle by its original index */ + failed_handles_to_retry[handle_index] = curl_multi_msg->easy_handle; - struct timespec tms; + struct timespec tms; - clock_gettime(CLOCK_MONOTONIC, &tms); + clock_gettime(CLOCK_MONOTONIC, &tms); - time_of_fail[handle_index] = - (size_t)tms.tv_sec * 1000 * 1000 * 1000 + (size_t)tms.tv_nsec; + time_of_fail[handle_index] = + (size_t)tms.tv_sec * 1000 * 1000 * 1000 + (size_t)tms.tv_nsec; if (current_backoff_duration[handle_index] >= BACKOFF_MAX_BEFORE_FAIL) FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, @@ -1558,48 +1515,48 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa : (size_t)((double)current_backoff_duration[handle_index] * BACKOFF_SCALE_FACTOR); - /* Randomize time to avoid doing all retry attempts at once */ - int random_factor = rand(); - current_backoff_duration[handle_index] = - (size_t)((double)current_backoff_duration[handle_index] * - (1.0 + ((double)random_factor / (double)RAND_MAX))); + /* Randomize time to avoid doing all retry attempts at once */ + int random_factor = rand(); + current_backoff_duration[handle_index] = + (size_t)((double)current_backoff_duration[handle_index] * + (1.0 + ((double)random_factor / (double)RAND_MAX))); #ifdef RV_CONNECTOR_DEBUG - printf("Write handle %zu failed - waiting %zuns to retry\n", handle_index, - current_backoff_duration[handle_index]); + printf("Write handle %zu failed - waiting %zuns to retry\n", handle_index, + current_backoff_duration[handle_index]); #endif - fail_count++; - } - else if (response_code == 200) { - num_finished++; - succeed_count++; - /* Clean up resources used for the completed handle */ + fail_count++; + } + else if (response_code == 200) { + num_finished++; + succeed_count++; + /* Clean up resources used for the completed handle */ - if (RV_get_index_of_matching_handle(curl_easy_handles, count, - curl_multi_msg->easy_handle, &handle_index) < 0) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, - "can't get handle information for retry"); + if (RV_get_index_of_matching_handle(curl_easy_handles, count, + curl_multi_msg->easy_handle, &handle_index) < 0) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "can't get handle information for retry"); - if (CURLM_OK != - curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, - "failed to remove finished cURL handle"); + if (CURLM_OK != + curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle)) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "failed to remove finished cURL handle"); - curl_easy_cleanup(curl_multi_msg->easy_handle); + curl_easy_cleanup(curl_multi_msg->easy_handle); - curl_easy_handles[handle_index] = NULL; + curl_easy_handles[handle_index] = NULL; - RV_free(write_bodies[handle_index]); - write_bodies[handle_index] = NULL; + RV_free(write_bodies[handle_index]); + write_bodies[handle_index] = NULL; - RV_free(request_urls[handle_index]); - request_urls[handle_index] = NULL; + RV_free(request_urls[handle_index]); + request_urls[handle_index] = NULL; - RV_free(base64_encoded_values[handle_index]); - base64_encoded_values[handle_index] = NULL; + RV_free(base64_encoded_values[handle_index]); + base64_encoded_values[handle_index] = NULL; - RV_free(response_buffers[handle_index].buffer); - response_buffers[handle_index].buffer = NULL; + RV_free(response_buffers[handle_index].buffer); + response_buffers[handle_index].buffer = NULL; RV_free(curl_err_bufs[handle_index]); curl_err_bufs[handle_index] = NULL; @@ -1612,29 +1569,28 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa } /* end while (curl_multi_msg); */ #ifdef RV_CONNECTOR_DEBUG - printf("%zu total complete, %zu handles succeeded this iteration, %zu handles failed%s\n", - num_finished, succeed_count, fail_count, (fail_count > 0) ? ", retrying..." : ""); + printf("%zu total complete, %zu handles succeeded this iteration, %zu handles failed%s\n", + num_finished, succeed_count, fail_count, (fail_count > 0) ? ", retrying..." : ""); #endif - /* TODO: Replace with an epoll-like structure of some kind, manually iterating this will probably - * be slow */ - struct timespec curr_time; - clock_gettime(CLOCK_MONOTONIC, &curr_time); - size_t curr_time_ns = (size_t)curr_time.tv_sec * 1000 * 1000 * 1000 + (size_t)curr_time.tv_nsec; + /* TODO: Replace with an epoll-like structure of some kind, manually iterating this will probably + * be slow */ + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + size_t curr_time_ns = (size_t)curr_time.tv_sec * 1000 * 1000 * 1000 + (size_t)curr_time.tv_nsec; - for (size_t i = 0; i < count; i++) { - if (failed_handles_to_retry[i] && - ((curr_time_ns - time_of_fail[i]) >= current_backoff_duration[i])) { - if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, failed_handles_to_retry[i])) - FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, - "failed to re-add denied cURL handle"); + for (size_t i = 0; i < count; i++) { + if (failed_handles_to_retry[i] && + ((curr_time_ns - time_of_fail[i]) >= current_backoff_duration[i])) { + if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, failed_handles_to_retry[i])) + FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, + "failed to re-add denied cURL handle"); - failed_handles_to_retry[i] = NULL; - } + failed_handles_to_retry[i] = NULL; } - - nanosleep(&delay, NULL); } + + nanosleep(&delay, NULL); } done: @@ -1642,7 +1598,7 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa printf("-> Dataset write response buffer:\n%s\n\n", response_buffer.buffer); #endif - fprintf(stderr, "\nWait count during write: %zu\n", wait_count); + //fprintf(stderr, "\nWait count during write: %zu\n", wait_count); for (size_t i = 0; i < count; i++) { if (curl_headers_arr[i]) {