Skip to content

Commit

Permalink
Merge branch 'multi_write' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjala committed Jul 31, 2023
2 parents b47ab4a + 06564fd commit b42de8a
Show file tree
Hide file tree
Showing 3 changed files with 1,254 additions and 417 deletions.
65 changes: 61 additions & 4 deletions src/rest_vol.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
/* Default size for buffer used when transforming an HDF5 dataspace into JSON. */
#define DATASPACE_SHAPE_BUFFER_DEFAULT_SIZE 256

/* Default initial size for the response buffer allocated which cURL writes
* its responses into
*/
#define CURL_RESPONSE_BUFFER_DEFAULT_SIZE 1024
/*
* The VOL connector identification number.
*/
Expand Down Expand Up @@ -1414,6 +1410,67 @@ H5_rest_curl_write_data_callback(char *buffer, size_t size, size_t nmemb, void *
return ret_value;
} /* end H5_rest_curl_write_data_callback() */

/*-------------------------------------------------------------------------
* Function: H5_rest_curl_write_data_callback_no_global
*
* Purpose: A callback for cURL which allows cURL to write its
* responses from the server into a growing string buffer
* which is processed by this VOL connector after each server
* interaction.
*
* This callback use userp to find a buffer to write to, instead
* of using the global buffer. This allows it to safely be used by
* multiple curl handles at the same time.
*
* Return: Amount of bytes equal to the amount given to this callback
* by cURL on success/differing amount of bytes on failure
*
* Programmer: Matthew Larson
* June, 2023
*/
size_t
H5_rest_curl_write_data_callback_no_global(char *buffer, size_t size, size_t nmemb, void *userp)
{
ptrdiff_t buf_ptrdiff;
size_t data_size = size * nmemb;
size_t ret_value = 0;
struct response_buffer *local_response_buffer = (struct response_buffer *)userp;

/* If the server response is larger than the currently allocated amount for the
* response buffer, grow the response buffer by a factor of 2
*/
buf_ptrdiff = (local_response_buffer->curr_buf_ptr + data_size) - local_response_buffer->buffer;
if (buf_ptrdiff < 0)
FUNC_GOTO_ERROR(
H5E_INTERNAL, H5E_BADVALUE, 0,
"unsafe cast: response buffer pointer difference was negative - this should not happen!");

/* Avoid using the 'CHECKED_REALLOC' macro here because we don't necessarily
* want to free the connector's response buffer if the reallocation fails.
*/
while ((size_t)(buf_ptrdiff + 1) > local_response_buffer->buffer_size) {
char *tmp_realloc;

if (NULL == (tmp_realloc = (char *)RV_realloc(local_response_buffer->buffer,
2 * local_response_buffer->buffer_size)))
FUNC_GOTO_ERROR(H5E_RESOURCE, H5E_CANTALLOC, 0, "can't reallocate space for response buffer");

local_response_buffer->curr_buf_ptr =
tmp_realloc + (local_response_buffer->curr_buf_ptr - local_response_buffer->buffer);
local_response_buffer->buffer = tmp_realloc;
local_response_buffer->buffer_size *= 2;
} /* end while */

memcpy(local_response_buffer->curr_buf_ptr, buffer, data_size);
local_response_buffer->curr_buf_ptr += data_size;
*local_response_buffer->curr_buf_ptr = '\0';

ret_value = data_size;

done:
return ret_value;
} /* end H5_rest_curl_write_data_callback_no_global() */

/*-------------------------------------------------------------------------
* Function: H5_rest_basename
*
Expand Down
98 changes: 95 additions & 3 deletions src/rest_vol.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,79 @@
#define CURL_PERFORM_NO_ERR(curl_ptr, ret_value) \
CURL_PERFORM_INTERNAL(curl_ptr, FALSE, H5E_NONE_MAJOR, H5E_NONE_MINOR, ret_value)

/* Helper macro to find the matching JSON '}' symbol for a given '{' symbol. This macro is
* used to extract out all of the JSON within a JSON object so that processing can be done
* on it.
/* Counterpart of CURL_PERFORM that takes a response_buffer argument,
* instead of using the global response buffer.
* Currently not used. */
#define CURL_PERFORM_NO_GLOBAL(curl_ptr, local_response_buffer, ERR_MAJOR, ERR_MINOR, ret_value) \
CURL_PERFORM_INTERNAL_NO_GLOBAL(curl_ptr, response_buffer, TRUE, ERR_MAJOR, ERR_MINOR, ret_value)

#define CURL_PERFORM_INTERNAL_NO_GLOBAL(curl_ptr, local_response_buffer, handle_HTTP_response, ERR_MAJOR, \
ERR_MINOR, ret_value) \
do { \
CURLcode result = curl_easy_perform(curl_ptr); \
\
/* Reset the cURL response buffer write position pointer */ \
local_response_buffer.curr_buf_ptr = local_response_buffer.buffer; \
\
if (CURLE_OK != result) \
FUNC_GOTO_ERROR(ERR_MAJOR, ERR_MINOR, ret_value, "%s", curl_easy_strerror(result)); \
\
if (handle_HTTP_response) { \
long response_code; \
\
if (CURLE_OK != curl_easy_getinfo(curl_ptr, CURLINFO_RESPONSE_CODE, &response_code)) \
FUNC_GOTO_ERROR(ERR_MAJOR, ERR_MINOR, ret_value, "can't get HTTP response code"); \
\
HANDLE_RESPONSE(response_code, ERR_MAJOR, ERR_MINOR, ret_value); \
} /* end if */ \
} while (0)

/* Counterpart of CURL_PERFORM that takes a curl multi handle,
* and waits until all requests on it have finished before returning. */

#define CURL_MULTI_PERFORM(curl_multi_ptr, ERROR_MAJOR, ERR_MINOR, ret_value) \
int still_running = 0; \
CURLMsg *curl_mult_msg = NULL; \
\
do { \
fprintf(stderr, "%zu transfers still running\n", still_running); \
fprintf(stderr, "Performing..."); \
if (CURLM_OK != curl_multi_perform(curl_multi_ptr, &still_running)) \
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, ret_value, "cURL multi perform error"); \
\
if (still_running) \
/* wait for activity, timeout or "nothing" */ \
fprintf(stderr, "Polling..."); \
if (CURLM_OK != curl_multi_poll(curl_multi_ptr, NULL, 0, 1000, NULL)) \
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, ret_value, "cURL multi poll error"); \
\
do { \
long response_code; \
int msgq = 0; \
fprintf(stderr, "Reading info..."); \
curl_mult_msg = curl_multi_info_read(curl_multi_ptr, &msgq); \
if (curl_mult_msg && (curl_mult_msg->msg == CURLMSG_DONE)) { \
if (CURLE_OK != \
curl_easy_getinfo(curl_mult_msg->easy_handle, CURLINFO_RESPONSE_CODE, &response_code)) \
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, ret_value, "can't get HTTP response code"); \
\
/* TODO: Gracefully handle 503 Error, which can result from sending too many simultaneous \
* requests */ \
if (response_code == 503) { \
fprintf(stderr, "-> 503 Received from macro \n"); \
} \
else { \
fprintf(stderr, "Response code = %ld\n", response_code); \
HANDLE_RESPONSE(response_code, H5E_DATASET, H5E_WRITEERROR, ret_value); \
} \
} \
} while (curl_mult_msg); \
\
} while (still_running);

/* Helper macro to find the matching JSON '}' symbol for a given '{' symbol. This macro is \
* used to extract out all of the JSON within a JSON object so that processing can be done \
* on it. \
*/
#define FIND_JSON_SECTION_END(start_ptr, end_ptr, ERR_MAJOR, ret_value) \
do { \
Expand Down Expand Up @@ -339,6 +409,11 @@ extern char curl_err_buf[];
*/
extern struct curl_slist *curl_headers;

/* Default initial size for the response buffer allocated which cURL writes
* its responses into
*/
#define CURL_RESPONSE_BUFFER_DEFAULT_SIZE 1024

/*
* Saved copy of the base URL for operating on
*/
Expand Down Expand Up @@ -379,6 +454,21 @@ extern const char *link_creation_time_keys[];
*/
extern const char *link_collection_keys2[];

/* JSON keys to retrieve a list of attributes */
extern const char *attributes_keys[];

/* JSON keys to retrieve a list of links */
extern const char *links_keys[];

/* JSON keys to retrieve all of the information from a link when doing link iteration */
extern const char *link_title_keys[];
extern const char *link_creation_time_keys[];

/* JSON keys to retrieve the collection that a hard link belongs to
* (the type of object it points to), "groups", "datasets" or "datatypes"
*/
extern const char *link_collection_keys2[];

/* A global struct containing the buffer which cURL will write its
* responses out to after making a call to the server. The buffer
* in this struct is allocated upon connector initialization and is
Expand Down Expand Up @@ -620,6 +710,8 @@ herr_t RV_convert_dataspace_shape_to_JSON(hid_t space_id, char **shape_body, cha
herr_t RV_base64_encode(const void *in, size_t in_size, char **out, size_t *out_size);
herr_t RV_base64_decode(const char *in, size_t in_size, char **out, size_t *out_size);

size_t H5_rest_curl_write_data_callback_no_global(char *buffer, size_t size, size_t nmemb, void *userp);

/* Comparison function to compare two keys in an rv_hash_table_t */
int H5_rest_compare_string_keys(void *value1, void *value2);

Expand Down
Loading

0 comments on commit b42de8a

Please sign in to comment.