Skip to content

Commit

Permalink
Stable
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjala committed Jul 25, 2023
1 parent 091d16c commit b046863
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
29 changes: 22 additions & 7 deletions src/rest_vol.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ typedef struct H5_rest_ad_info_t {
/* For HUG '23 Demo, to track and display how much of each dataset's read/write is completed over time
during a multi read/write */
curl_off_t *total_content_lengths = NULL;
bool is_read = false;

/* Host header string for specifying the host (Domain) for requests */
const char *const host_string = "X-Hdf-domain: ";
Expand Down Expand Up @@ -1315,7 +1316,6 @@ char *progress_string(size_t current, size_t total) {
memset(prog, '-', BAR_LENGTH);
memset(prog, '#', filled_chars);

fprintf(stderr, "prog = %s", prog);
done:
return prog;
}
Expand Down Expand Up @@ -1358,21 +1358,18 @@ H5_rest_curl_read_data_callback(char *buffer, size_t size, size_t nmemb, void *i
#define CURSOR_UP fprintf(stderr, "\x1b[A");
#define CURSOR_DOWN fprintf(stderr, "\n");

if (total_content_lengths) {
if (total_content_lengths && uinfo->dset_idx < 20) {
for (size_t i = 0; i < uinfo->dset_idx; i++)
CURSOR_DOWN

char *prog = progress_string(uinfo->bytes_sent, total_content_lengths[uinfo->dset_idx]);
size_t complete_percent = (size_t) ((double) uinfo->bytes_sent);
fprintf(stderr, "[%s] %zu%", prog, complete_percent);
size_t complete_percent = (size_t) (100 * (((double) uinfo->bytes_sent) / ((double) total_content_lengths[uinfo->dset_idx])));
fprintf(stderr, "\rDataset #%zu%s Write: [%s] %zu%%", uinfo->dset_idx, (uinfo->dset_idx < 10) ? " " : "", prog, complete_percent);

RV_free(prog);
usleep(50000);

for (size_t i = 0; i < uinfo->dset_idx; i++)
CURSOR_UP

// TODO: Progress bar

}

Expand Down Expand Up @@ -1491,6 +1488,24 @@ H5_rest_curl_write_data_callback_no_global(char *buffer, size_t size, size_t nme
local_response_buffer->curr_buf_ptr += data_size;
*local_response_buffer->curr_buf_ptr = '\0';

/* HUG Demo '23*/
if (total_content_lengths[local_response_buffer->dset_idx] && is_read && local_response_buffer->dset_idx < 20) {
for (size_t i = 0; i < local_response_buffer->dset_idx; i++)
CURSOR_DOWN

size_t bytes_read = ((size_t) buf_ptrdiff) + data_size;
char *prog = progress_string(bytes_read, total_content_lengths[local_response_buffer->dset_idx]);

size_t complete_percent = (size_t) (100 * (((double) bytes_read) / ((double) total_content_lengths[local_response_buffer->dset_idx])));
fprintf(stderr, "\rDataset #%zu%s Read: [%s] %zu%%", local_response_buffer->dset_idx, (local_response_buffer->dset_idx < 10) ? " " : "", prog, complete_percent);

RV_free(prog);

for (size_t i = 0; i < local_response_buffer->dset_idx; i++)
CURSOR_UP

} /* end if */

ret_value = data_size;

done:
Expand Down
7 changes: 5 additions & 2 deletions src/rest_vol.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
FUNC_GOTO_ERROR(ERR_MAJOR, ERR_MINOR, ret_value, "504 - Gateway timeout"); \
break; \
default: \
FUNC_GOTO_ERROR(ERR_MAJOR, ERR_MINOR, ret_value, "Unknown error occurred"); \
FUNC_GOTO_ERROR(ERR_MAJOR, ERR_MINOR, ret_value, "Unknown error occurred - HTTP %d", response_code); \
break; \
} /* end switch */ \
} while (0)
Expand Down Expand Up @@ -448,14 +448,17 @@ struct response_buffer {
char *buffer;
char *curr_buf_ptr;
size_t buffer_size;
/* For HUG '23 Demo */
size_t dset_idx;
};

extern struct response_buffer response_buffer;

/* For HUG '23 Demo, to track and display how much of each dataset's read/write is completed over time
during a multi read/write */
extern curl_off_t *total_content_lengths;
extern curl_off_t *curr_content_transferred;

extern bool is_read;
/**************************
* *
* Typedefs *
Expand Down
29 changes: 23 additions & 6 deletions src/rest_vol_dataset.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ const char *external_storage_keys[] = {"externalStorage", (const char *)0}
#define OBJECT_REF_STRING_LEN 48

/* Defines for multi-CURL related settings */
#define NUM_MAX_HOST_CONNS 10
#define NUM_MAX_HOST_CONNS 40
#define BACKOFF_INITIAL_DURATION 10000000 /* 10,000,000 ns -> 0.01 sec */
#define BACKOFF_SCALE_FACTOR 1.5
#define BACKOFF_MAX_BEFORE_FAIL 3000000000 /* 30,000,000,000 ns -> 30 sec */
#define BACKOFF_MAX_BEFORE_FAIL 30000000000 /* 30,000,000,000 ns -> 30 sec */
#define DELAY_BETWEEN_HANDLE_CHECKS 10000000 /* 10,000,000 ns -> 0.01 sec */

/* Default sizes for strings formed when dealing with turning a
Expand Down Expand Up @@ -462,6 +462,9 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac
char **curl_err_bufs = NULL;
struct curl_slist **curl_headers_arr = NULL;

is_read = true;
size_t wait_count = 0;

#ifdef RV_CONNECTOR_DEBUG
printf("-> Received dataset read call with following parameters:\n");
for (size_t i = 0; i < count; i++) {
Expand Down Expand Up @@ -535,6 +538,8 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac

/* Initialize arrays and check arguments */
for (size_t i = 0; i < count; i++) {
response_buffers[i].dset_idx = i;

if (!buf[i])
FUNC_GOTO_ERROR(H5E_ARGS, H5E_BADVALUE, FAIL, "given read buffer was NULL");

Expand Down Expand Up @@ -857,6 +862,7 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac
/* Gracefully handle 503 Error, which can result from sending too many simultaneous
* requests */
if (response_code == 503) {
wait_count++;
/* Identify the handle by its original index in the easy handle array*/

if (RV_get_index_of_matching_handle(curl_easy_handles, count,
Expand Down Expand Up @@ -999,6 +1005,8 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac

done:

fprintf(stderr, "\nWait count during read: %zu\n", wait_count);

if (obj_ref_buf)
RV_free(obj_ref_buf);

Expand Down Expand Up @@ -1042,6 +1050,8 @@ RV_dataset_read(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spac
RV_free(time_of_fail);
RV_free(file_select_npoints);
RV_free(mem_select_npoints);

RV_free(total_content_lengths);
PRINT_ERROR_STACK;

return ret_value;
Expand Down Expand Up @@ -1445,6 +1455,8 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
printf(" \\**********************************/\n\n");
#endif

size_t wait_count = 0;

/* Avoid the multi curl overhead in the single dataset case */
if (count == 1 && expected_num_writes == 1) {
CURL_PERFORM_NO_GLOBAL(curl_easy_handles[0], response_buffers[0], H5E_DATASET, H5E_WRITEERROR, FAIL);
Expand Down Expand Up @@ -1510,6 +1522,8 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
/* Gracefully handle 503 Error, which can result from sending too many simultaneous
* requests */
if (response_code == 503) {

wait_count++;

if (RV_get_index_of_matching_handle(curl_easy_handles, count,
curl_multi_msg->easy_handle, &handle_index) < 0)
Expand All @@ -1534,6 +1548,10 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
time_of_fail[handle_index] =
(size_t)tms.tv_sec * 1000 * 1000 * 1000 + (size_t)tms.tv_nsec;

if (current_backoff_duration[handle_index] >= BACKOFF_MAX_BEFORE_FAIL)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"Unable to reach server for write: 503 service unavailable");

current_backoff_duration[handle_index] =
(current_backoff_duration[handle_index] == 0)
? BACKOFF_INITIAL_DURATION
Expand All @@ -1546,9 +1564,6 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
(size_t)((double)current_backoff_duration[handle_index] *
(1.0 + ((double)random_factor / (double)RAND_MAX)));

if (current_backoff_duration[handle_index] >= BACKOFF_MAX_BEFORE_FAIL)
FUNC_GOTO_ERROR(H5E_DATASET, H5E_WRITEERROR, FAIL,
"Unable to reach server for write: 503 service unavailable");
#ifdef RV_CONNECTOR_DEBUG
printf("Write handle %zu failed - waiting %zuns to retry\n", handle_index,
current_backoff_duration[handle_index]);
Expand Down Expand Up @@ -1590,6 +1605,7 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
curl_err_bufs[handle_index] = NULL;
}
else {
fprintf(stderr, "Unexpected response from server: HTTP %ld", response_code);
HANDLE_RESPONSE(response_code, H5E_DATASET, H5E_WRITEERROR, FAIL);
}
}
Expand Down Expand Up @@ -1626,6 +1642,8 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
printf("-> Dataset write response buffer:\n%s\n\n", response_buffer.buffer);
#endif

fprintf(stderr, "\nWait count during write: %zu\n", wait_count);

for (size_t i = 0; i < count; i++) {
if (curl_headers_arr[i]) {
curl_slist_free_all(curl_headers_arr[i]);
Expand Down Expand Up @@ -1669,7 +1687,6 @@ RV_dataset_write(size_t count, void *dset[], hid_t mem_type_id[], hid_t _mem_spa
RV_free(mem_select_npoints);
RV_free(file_select_npoints);

RV_free(total_content_lengths);
PRINT_ERROR_STACK;

return ret_value;
Expand Down

0 comments on commit b046863

Please sign in to comment.