Skip to content

Commit

Permalink
Implement H5Dread_multi/H5Dwrite_multi (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjala authored Sep 6, 2023
1 parent bb0d06a commit b61a048
Show file tree
Hide file tree
Showing 3 changed files with 1,059 additions and 430 deletions.
298 changes: 293 additions & 5 deletions src/rest_vol.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
/* Default size for buffer used when transforming an HDF5 dataspace into JSON. */
#define DATASPACE_SHAPE_BUFFER_DEFAULT_SIZE 256

/* Default initial size for the response buffer allocated which cURL writes
* its responses into
*/
#define CURL_RESPONSE_BUFFER_DEFAULT_SIZE 1024
/* Defines for multi-curl settings */
#define BACKOFF_INITIAL_DURATION 10000000 /* 10,000,000 ns -> 0.01 sec */
#define BACKOFF_SCALE_FACTOR 1.5
#define BACKOFF_MAX_BEFORE_FAIL 3000000000 /* 30,000,000,000 ns -> 30 sec */

/*
* The VOL connector identification number.
*/
Expand Down Expand Up @@ -169,6 +170,10 @@ herr_t RV_parse_type(char *HTTP_response, void *callback_data_in, void *callback
/* Helper function to parse an object's creation properties from server response */
herr_t RV_parse_creation_properties_callback(yajl_val parse_tree, char **GCPL_buf);

/* Return the index of the curl handle into the array of handles */
herr_t RV_get_index_of_matching_handle(dataset_transfer_info *transfer_info, size_t count, CURL *handle,
size_t *handle_index);

/* The REST VOL connector's class structure. */
static const H5VL_class_t H5VL_rest_g = {
HDF5_VOL_REST_VERSION, /* Connector struct version number */
Expand Down Expand Up @@ -1414,6 +1419,67 @@ H5_rest_curl_write_data_callback(char *buffer, size_t size, size_t nmemb, void *
return ret_value;
} /* end H5_rest_curl_write_data_callback() */

/*-------------------------------------------------------------------------
* Function: H5_rest_curl_write_data_callback_no_global
*
* Purpose: A callback for cURL which allows cURL to write its
* responses from the server into a growing string buffer
* which is processed by this VOL connector after each server
* interaction.
*
* This callback use userp to find a buffer to write to, instead
* of using the global buffer. This allows it to safely be used by
* multiple curl handles at the same time.
*
* Return: Amount of bytes equal to the amount given to this callback
* by cURL on success/differing amount of bytes on failure
*
* Programmer: Matthew Larson
* June, 2023
*/
size_t
H5_rest_curl_write_data_callback_no_global(char *buffer, size_t size, size_t nmemb, void *userp)
{
ptrdiff_t buf_ptrdiff;
size_t data_size = size * nmemb;
size_t ret_value = 0;
struct response_buffer *local_response_buffer = (struct response_buffer *)userp;

/* If the server response is larger than the currently allocated amount for the
* response buffer, grow the response buffer by a factor of 2
*/
buf_ptrdiff = (local_response_buffer->curr_buf_ptr + data_size) - local_response_buffer->buffer;
if (buf_ptrdiff < 0)
FUNC_GOTO_ERROR(
H5E_INTERNAL, H5E_BADVALUE, 0,
"unsafe cast: response buffer pointer difference was negative - this should not happen!");

/* Avoid using the 'CHECKED_REALLOC' macro here because we don't necessarily
* want to free the connector's response buffer if the reallocation fails.
*/
while ((size_t)(buf_ptrdiff + 1) > local_response_buffer->buffer_size) {
char *tmp_realloc;

if (NULL == (tmp_realloc = (char *)RV_realloc(local_response_buffer->buffer,
2 * local_response_buffer->buffer_size)))
FUNC_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, 0, "can't reallocate space for response buffer");

local_response_buffer->curr_buf_ptr =
tmp_realloc + (local_response_buffer->curr_buf_ptr - local_response_buffer->buffer);
local_response_buffer->buffer = tmp_realloc;
local_response_buffer->buffer_size *= 2;
} /* end while */

memcpy(local_response_buffer->curr_buf_ptr, buffer, data_size);
local_response_buffer->curr_buf_ptr += data_size;
*local_response_buffer->curr_buf_ptr = '\0';

ret_value = data_size;

done:
return ret_value;
} /* end H5_rest_curl_write_data_callback_no_global() */

/*-------------------------------------------------------------------------
* Function: H5_rest_basename
*
Expand Down Expand Up @@ -3248,7 +3314,7 @@ RV_base64_decode(const char *in, size_t in_size, char **out, size_t *out_size)
four_byte_set = (((uint32_t)c0) << 18) | (((uint32_t)c1) << 12) | (((uint32_t)c2) << 6) |
(((uint32_t)c3) << 0);

c0 = (uint8_t)(four_byte_set >> 24); // 0
c0 = (uint8_t)(four_byte_set >> 24);
c1 = (uint8_t)(four_byte_set >> 16);
c2 = (uint8_t)(four_byte_set >> 8);
c3 = (uint8_t)(four_byte_set >> 0);
Expand Down Expand Up @@ -3387,6 +3453,228 @@ RV_set_object_type_header(H5I_type_t parent_obj_type, const char **parent_obj_ty
return (ret_value);
} /* end RV_set_object_type_header */

/* Return the index of the curl handle into the array of handles */
herr_t
RV_get_index_of_matching_handle(dataset_transfer_info *transfer_info, size_t count, CURL *handle,
size_t *handle_index)
{
herr_t ret_value = SUCCEED;

if (!handle)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_BADVALUE, FAIL, "cURL handle provided for index match is NULL");

*handle_index = count + 1;

for (size_t i = 0; i < count; i++) {
/* May have been cleaned up early after successful request */
if (!transfer_info[i].curl_easy_handle) {
continue;
}

if (transfer_info[i].curl_easy_handle == handle) {
*handle_index = i;
break;
}
}
done:
return ret_value;
}

herr_t
RV_curl_multi_perform(CURL *curl_multi_handle, dataset_transfer_info *transfer_info, size_t count,
herr_t(success_callback)(hid_t mem_type_id, hid_t mem_space_id, void *buf,
struct response_buffer resp_buffer))
{

herr_t ret_value = SUCCEED;
int num_still_running = 0;
int num_prev_running = 0;
int num_curlm_msgs = 0;
int events_occurred = 0;
CURLMsg *curl_multi_msg = NULL;
CURL **failed_handles_to_retry = NULL;
size_t fail_count = 0;
size_t succeed_count = 0;
size_t num_finished = 0;
size_t handle_index = 0;
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd = -1;
long timeout_ms = 0;
struct timeval timeout;

if ((failed_handles_to_retry = calloc(count, sizeof(CURL *))) == NULL)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTALLOC, FAIL,
"can't allocate space for cURL headers to be retried");
/*
Lowers CPU usage dramatically, but also vastly increases time taken for requests to local storage
when the number of datasets is small.
struct timespec delay;
delay.tv_sec = 0;
delay.tv_nsec = DELAY_BETWEEN_HANDLE_CHECKS;
*/

memset(failed_handles_to_retry, 0, sizeof(CURL *) * count);

do {
maxfd = -1;
fail_count = 0;
succeed_count = 0;
timeout_ms = 0;
timeout.tv_sec = 0;
timeout.tv_usec = 0;

if (CURLM_OK != curl_multi_timeout(curl_multi_handle, &timeout_ms))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "unable to get curl timeout");

timeout_ms = ((timeout_ms < 0) || (timeout_ms > DEFAULT_POLL_TIMEOUT_MS)) ? DEFAULT_POLL_TIMEOUT_MS
: timeout_ms;

timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;

FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);

if (CURLM_OK != curl_multi_fdset(curl_multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_CANTGET, FAIL, "unable to get curl fd set");

if (maxfd != -1)
select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);

if (CURLM_OK != curl_multi_perform(curl_multi_handle, &num_still_running))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "cURL multi perform error");

while ((num_prev_running != num_still_running) &&
(curl_multi_msg = curl_multi_info_read(curl_multi_handle, &num_curlm_msgs))) {
long response_code;

if (curl_multi_msg && (curl_multi_msg->msg == CURLMSG_DONE)) {
if (CURLE_OK !=
curl_easy_getinfo(curl_multi_msg->easy_handle, CURLINFO_RESPONSE_CODE, &response_code))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "can't get HTTP response code");

/* Gracefully handle 503 Error, which can result from sending too many simultaneous
* requests */
if (response_code == 503) {

if (RV_get_index_of_matching_handle(transfer_info, count, curl_multi_msg->easy_handle,
&handle_index) < 0)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"can't get handle information for retry");

/* Restart request next time for writes */
if (transfer_info[handle_index].transfer_type == WRITE)
transfer_info[handle_index].u.write_info.uinfo.bytes_sent = 0;
/* Restart request next time for reads */
transfer_info[handle_index].resp_buffer.curr_buf_ptr =
transfer_info[handle_index].resp_buffer.buffer;

if (CURLM_OK != curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"failed to remove denied cURL handle");

/* Identify the handle by its original index */
failed_handles_to_retry[handle_index] = curl_multi_msg->easy_handle;

struct timespec tms;

clock_gettime(CLOCK_MONOTONIC, &tms);

transfer_info[handle_index].time_of_fail =
(size_t)tms.tv_sec * 1000 * 1000 * 1000 + (size_t)tms.tv_nsec;

transfer_info[handle_index].current_backoff_duration =
(transfer_info[handle_index].current_backoff_duration == 0)
? BACKOFF_INITIAL_DURATION
: (size_t)((double)transfer_info[handle_index].current_backoff_duration *
BACKOFF_SCALE_FACTOR);

/* Randomize time to avoid doing all retry attempts at once */
int random_factor = rand();
transfer_info[handle_index].current_backoff_duration =
(size_t)((double)transfer_info[handle_index].current_backoff_duration *
(1.0 + ((double)random_factor / (double)RAND_MAX)));

if (transfer_info[handle_index].current_backoff_duration >= BACKOFF_MAX_BEFORE_FAIL)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"Unable to reach server for write: 503 service unavailable");
fail_count++;
}
else if (response_code == 200) {
num_finished++;
succeed_count++;

if (RV_get_index_of_matching_handle(transfer_info, count, curl_multi_msg->easy_handle,
&handle_index) < 0)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"can't get handle information for retry");

if (success_callback(
transfer_info[handle_index].mem_type_id, transfer_info[handle_index].mem_space_id,
transfer_info[handle_index].buf, transfer_info[handle_index].resp_buffer) < 0)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_READERROR, FAIL,
"failed to post-process data read from dataset");

/* Clean up */
if (CURLM_OK != curl_multi_remove_handle(curl_multi_handle, curl_multi_msg->easy_handle))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"failed to remove finished cURL handle");

curl_easy_cleanup(curl_multi_msg->easy_handle);

transfer_info[handle_index].curl_easy_handle = NULL;

if (transfer_info[handle_index].transfer_type == WRITE) {
RV_free(transfer_info[handle_index].u.write_info.write_body);
transfer_info[handle_index].u.write_info.write_body = NULL;

RV_free(transfer_info[handle_index].u.write_info.base64_encoded_values);
transfer_info[handle_index].u.write_info.base64_encoded_values = NULL;
}

RV_free(transfer_info[handle_index].request_url);
transfer_info[handle_index].request_url = NULL;

RV_free(transfer_info[handle_index].resp_buffer.buffer);
transfer_info[handle_index].resp_buffer.buffer = NULL;
}
else {
HANDLE_RESPONSE(response_code, H5E_DATASET, H5E_WRITEERROR, FAIL);
}
}
} /* end while (curl_multi_msg); */

/* TODO: Replace with an epoll-like structure of some kind, manually iterating this will probably
* be slow */
struct timespec curr_time;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
size_t curr_time_ns = (size_t)curr_time.tv_sec * 1000 * 1000 * 1000 + (size_t)curr_time.tv_nsec;

for (size_t i = 0; i < count; i++) {
if (failed_handles_to_retry[i] && ((curr_time_ns - transfer_info[i].time_of_fail) >=
transfer_info[i].current_backoff_duration)) {
if (CURLM_OK != curl_multi_add_handle(curl_multi_handle, failed_handles_to_retry[i]))
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL, "failed to re-add denied cURL handle");

failed_handles_to_retry[i] = NULL;
}
}

/*
nanosleep(&delay, NULL);
*/
num_prev_running = num_still_running;
} while (num_still_running > 0);

done:
RV_free(failed_handles_to_retry);

return ret_value;
}

/* Helper function to initialize an object's name based on its parent's name.
* Allocates memory that must be closed by caller. */
herr_t
Expand Down
Loading

0 comments on commit b61a048

Please sign in to comment.