Skip to content

Commit

Permalink
HTTP/2 message stream (#344)
Browse files Browse the repository at this point in the history
Use HTTP/2 message instead of always translating HTTP/1 message to HTTP/2
  • Loading branch information
TingDaoK authored Nov 3, 2021
1 parent 236ce08 commit 0426a06
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 225 deletions.
20 changes: 15 additions & 5 deletions bin/elasticurl/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,23 +390,33 @@ static void s_on_stream_complete_fn(struct aws_http_stream *stream, int error_co
}

static struct aws_http_message *s_build_http_request(struct elasticurl_ctx *app_ctx) {
struct aws_http_message *request = aws_http_message_new_request(app_ctx->allocator);

struct aws_http_message *request = app_ctx->required_http_version == AWS_HTTP_VERSION_2
? aws_http2_message_new_request(app_ctx->allocator)
: aws_http_message_new_request(app_ctx->allocator);
if (request == NULL) {
fprintf(stderr, "failed to allocate request\n");
exit(1);
}

aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str(app_ctx->verb));
aws_http_message_set_request_path(request, app_ctx->uri.path_and_query);
if (app_ctx->required_http_version == AWS_HTTP_VERSION_2) {
struct aws_http_headers *h2_headers = aws_http_message_get_headers(request);
aws_http2_headers_set_request_scheme(h2_headers, app_ctx->uri.scheme);
aws_http2_headers_set_request_authority(h2_headers, app_ctx->uri.host_name);
} else {
struct aws_http_header host_header = {
.name = aws_byte_cursor_from_c_str("host"),
.value = app_ctx->uri.host_name,
};
aws_http_message_add_header(request, host_header);
}
struct aws_http_header accept_header = {
.name = aws_byte_cursor_from_c_str("accept"),
.value = aws_byte_cursor_from_c_str("*/*"),
};
aws_http_message_add_header(request, accept_header);

struct aws_http_header host_header = {.name = aws_byte_cursor_from_c_str("host"), .value = app_ctx->uri.host_name};
aws_http_message_add_header(request, host_header);

struct aws_http_header user_agent_header = {
.name = aws_byte_cursor_from_c_str("user-agent"),
.value = aws_byte_cursor_from_c_str("elasticurl 1.0, Powered by the AWS Common Runtime.")};
Expand Down
6 changes: 0 additions & 6 deletions include/aws/http/private/h2_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,6 @@ struct aws_http_connection *aws_http_connection_new_http2_client(
bool manual_window_management,
const struct aws_http2_connection_options *http2_options);

/* Transform the request to h2 style headers */
AWS_HTTP_API
struct aws_http_headers *aws_h2_create_headers_from_request(
struct aws_http_message *request,
struct aws_allocator *alloc);

AWS_EXTERN_C_END

/* Private functions called from multiple .c files... */
Expand Down
6 changes: 6 additions & 0 deletions include/aws/http/private/h2_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ struct aws_h2_stream {

/* Store the received reset HTTP/2 error code, set to -1, if none has received so far */
int64_t received_reset_error_code;

/**
* Back up the message if we create a new message from it to keep the underlying input stream alive.
* TODO: remove this once we have input stream refcounted
*/
struct aws_http_message *backup_outgoing_message;
};

const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state);
Expand Down
16 changes: 16 additions & 0 deletions include/aws/http/private/request_response_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,20 @@ struct aws_http_stream {
struct aws_http_stream_server_data *server_data;
};

AWS_EXTERN_C_BEGIN

/**
* Create an HTTP/2 message from HTTP/1.1 message.
* pseudo headers will be created from the context and added to the headers of new message.
* Normal headers will be copied to the headers of new message.
* TODO: (Maybe more, connection-specific header will be removed, etc...)
* TODO: REFCOUNT INPUT_STREAMS!!! And make it public.
*/
AWS_HTTP_API
struct aws_http_message *aws_http2_message_new_from_http1(
struct aws_http_message *http1_msg,
struct aws_allocator *alloc);

AWS_EXTERN_C_END

#endif /* AWS_HTTP_REQUEST_RESPONSE_IMPL_H */
104 changes: 0 additions & 104 deletions source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1721,110 +1721,6 @@ static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h
aws_http_stream_release(&stream->base);
}

struct aws_http_headers *aws_h2_create_headers_from_request(
struct aws_http_message *request,
struct aws_allocator *alloc) {

struct aws_http_headers *old_headers = aws_http_message_get_headers(request);
bool is_pseudoheader = false;
struct aws_http_headers *result = aws_http_headers_new(alloc);
struct aws_http_header header_iter;
struct aws_byte_buf lower_name_buf;
AWS_ZERO_STRUCT(lower_name_buf);

/* Check whether the old_headers have pseudo header or not */
if (aws_http_headers_count(old_headers)) {
if (aws_http_headers_get_index(old_headers, 0, &header_iter)) {
goto error;
}
is_pseudoheader = header_iter.name.ptr[0] == ':';
}
if (!is_pseudoheader) {
/* TODO: Set pseudo headers all from message, which will lead an API change to aws_http_message */
/* No pseudoheader detected, we set them from the request */
/* Set pseudo headers */
struct aws_byte_cursor method;
if (aws_http_message_get_request_method(request, &method)) {
/* error will happen when the request is invalid */
aws_raise_error(AWS_ERROR_HTTP_INVALID_METHOD);
goto error;
}
if (aws_http_headers_add(result, aws_http_header_method, method)) {
goto error;
}
/* we set a default value, "https", for now */
struct aws_byte_cursor scheme_cursor = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("https");
if (aws_http_headers_add(result, aws_http_header_scheme, scheme_cursor)) {
goto error;
}
/* Set an empty authority for now, if host header field is found, we set it as the value of host */
struct aws_byte_cursor authority_cursor;
struct aws_byte_cursor host_cursor = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("host");
if (!aws_http_headers_get(old_headers, host_cursor, &authority_cursor)) {
if (aws_http_headers_add(result, aws_http_header_authority, authority_cursor)) {
goto error;
}
}
struct aws_byte_cursor path_cursor;
if (aws_http_message_get_request_path(request, &path_cursor)) {
aws_raise_error(AWS_ERROR_HTTP_INVALID_PATH);
goto error;
}
if (aws_http_headers_add(result, aws_http_header_path, path_cursor)) {
goto error;
}
}
/* if pseudoheader is included in message, we just convert all the headers from old_headers to result */
if (aws_byte_buf_init(&lower_name_buf, alloc, 256)) {
goto error;
}
for (size_t iter = 0; iter < aws_http_headers_count(old_headers); iter++) {
/* name should be converted to lower case */
if (aws_http_headers_get_index(old_headers, iter, &header_iter)) {
goto error;
}
/* append lower case name to the buffer */
aws_byte_buf_append_with_lookup(&lower_name_buf, &header_iter.name, aws_lookup_table_to_lower_get());
struct aws_byte_cursor lower_name_cursor = aws_byte_cursor_from_buf(&lower_name_buf);
enum aws_http_header_name name_enum = aws_http_lowercase_str_to_header_name(lower_name_cursor);
switch (name_enum) {
case AWS_HTTP_HEADER_COOKIE:
/* split cookie if USE CACHE */
if (header_iter.compression == AWS_HTTP_HEADER_COMPRESSION_USE_CACHE) {
struct aws_byte_cursor cookie_chunk;
AWS_ZERO_STRUCT(cookie_chunk);
while (aws_byte_cursor_next_split(&header_iter.value, ';', &cookie_chunk)) {
if (aws_http_headers_add(
result, lower_name_cursor, aws_strutil_trim_http_whitespace(cookie_chunk))) {
goto error;
}
}
} else {
if (aws_http_headers_add(result, lower_name_cursor, header_iter.value)) {
goto error;
}
}
break;
case AWS_HTTP_HEADER_HOST:
/* host header has been converted to :authority, do nothing here */
break;
/* TODO: handle connection-specific header field (RFC7540 8.1.2.2) */
default:
if (aws_http_headers_add(result, lower_name_cursor, header_iter.value)) {
goto error;
}
break;
}
aws_byte_buf_reset(&lower_name_buf, false);
}
aws_byte_buf_clean_up(&lower_name_buf);
return result;
error:
aws_http_headers_release(result);
aws_byte_buf_clean_up(&lower_name_buf);
return NULL;
}

int aws_h2_connection_on_stream_closed(
struct aws_h2_connection *connection,
struct aws_h2_stream *stream,
Expand Down
42 changes: 30 additions & 12 deletions source/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,27 @@ struct aws_h2_stream *aws_h2_stream_new_request(

/* Init H2 specific stuff */
stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE;
stream->thread_data.outgoing_message = options->request;
enum aws_http_version message_version = aws_http_message_get_protocol_version(options->request);
switch (message_version) {
case AWS_HTTP_VERSION_1_1:
stream->thread_data.outgoing_message =
aws_http2_message_new_from_http1(options->request, stream->base.alloc);
if (!stream->thread_data.outgoing_message) {
AWS_H2_STREAM_LOG(ERROR, stream, "Stream failed to create the HTTP/2 message from HTTP/1.1 message");
goto error;
}
stream->backup_outgoing_message = options->request;
aws_http_message_acquire(stream->backup_outgoing_message);
break;
case AWS_HTTP_VERSION_2:
stream->thread_data.outgoing_message = options->request;
aws_http_message_acquire(stream->thread_data.outgoing_message);
break;
default:
/* Not supported */
aws_raise_error(AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL);
goto error;
}

stream->sent_reset_error_code = -1;
stream->received_reset_error_code = -1;
Expand All @@ -257,13 +277,14 @@ struct aws_h2_stream *aws_h2_stream_new_request(
if (aws_mutex_init(&stream->synced_data.lock)) {
AWS_H2_STREAM_LOGF(
ERROR, stream, "Mutex init error %d (%s).", aws_last_error(), aws_error_name(aws_last_error()));
aws_mem_release(stream->base.alloc, stream);
return NULL;
goto error;
}
aws_http_message_acquire(stream->thread_data.outgoing_message);
aws_channel_task_init(
&stream->cross_thread_work_task, s_stream_cross_thread_work_task, stream, "HTTP/2 stream cross-thread work");
return stream;
error:
s_stream_destroy(&stream->base);
return NULL;
}

static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
Expand Down Expand Up @@ -335,6 +356,7 @@ static void s_stream_destroy(struct aws_http_stream *stream_base) {
AWS_H2_STREAM_LOG(DEBUG, stream, "Destroying stream");
aws_mutex_clean_up(&stream->synced_data.lock);
aws_http_message_release(stream->thread_data.outgoing_message);
aws_http_message_release(stream->backup_outgoing_message);

aws_mem_release(stream->base.alloc, stream);
}
Expand Down Expand Up @@ -548,13 +570,11 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgo

/* Create HEADERS frame */
struct aws_http_message *msg = stream->thread_data.outgoing_message;
/* Should be ensured when the stream is created */
AWS_ASSERT(aws_http_message_get_protocol_version(msg) == AWS_HTTP_VERSION_2);
bool has_body_stream = aws_http_message_get_body_stream(msg) != NULL;
struct aws_http_headers *h2_headers = aws_h2_create_headers_from_request(msg, stream->base.alloc);
if (!h2_headers) {
AWS_H2_STREAM_LOGF(
ERROR, stream, "Failed to create HTTP/2 style headers from request %s", aws_error_name(aws_last_error()));
goto error;
}
struct aws_http_headers *h2_headers = aws_http_message_get_headers(msg);

struct aws_h2_frame *headers_frame = aws_h2_frame_new_headers(
stream->base.alloc,
stream->base.id,
Expand All @@ -563,8 +583,6 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgo
0 /* padding - not currently configurable via public API */,
NULL /* priority - not currently configurable via public API */);

/* Release refcount of h2_headers here, let frame take the full ownership of it */
aws_http_headers_release(h2_headers);
if (!headers_frame) {
AWS_H2_STREAM_LOGF(ERROR, stream, "Failed to create HEADERS frame: %s", aws_error_name(aws_last_error()));
goto error;
Expand Down
Loading

0 comments on commit 0426a06

Please sign in to comment.