Skip to content

Commit

Permalink
Added aws_http2_stream_write_data, allowing H2 data frames to be writ…
Browse files Browse the repository at this point in the history
…ten at any time (#338)

Adds support for a new H2 API: aws_http2_stream_write_data which allows applications to supply data as it comes in, rather than supplying a stream that is constantly polled and is usually empty. This should allow use cases like event streams and transcribe to be efficient and scalable.

Co-authored-by: Justin Boswell <[email protected]>
Co-authored-by: Dengke Tang <[email protected]>
  • Loading branch information
3 people authored Jun 4, 2022
1 parent 9717eed commit 1a3614f
Show file tree
Hide file tree
Showing 9 changed files with 658 additions and 69 deletions.
13 changes: 11 additions & 2 deletions include/aws/http/private/h2_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ struct aws_h2_connection {
* Waiting for WINDOW_UPDATE to set them free */
struct aws_linked_list stalled_window_streams_list;

/* List using aws_h2_stream.node.
* Contains all streams that are open, but are only sending data when notified, rather than polling
* for it (e.g. event streams)
* Streams are moved to the outgoing_streams_list until they send pending data, then are moved back
* to this list to sleep until more data comes in
*/
struct aws_linked_list waiting_streams_list;

/* List using aws_h2_frame.node.
* Queues all frames (except DATA frames) for connection to send.
* When queue is empty, then we send DATA frames from the outgoing_streams_list */
Expand Down Expand Up @@ -200,8 +208,9 @@ enum aws_h2_stream_closed_when {
enum aws_h2_data_encode_status {
AWS_H2_DATA_ENCODE_COMPLETE,
AWS_H2_DATA_ENCODE_ONGOING,
AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED,
AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED,
AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED, /* stalled reading from body stream */
AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES, /* waiting for next manual write */
AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED, /* stalled due to reduced window size */
};

/* When window size is too small to fit the possible padding into it, we stop sending data and wait for WINDOW_UPDATE */
Expand Down
44 changes: 37 additions & 7 deletions include/aws/http/private/h2_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ enum aws_h2_stream_api_state {
AWS_H2_STREAM_API_STATE_COMPLETE,
};

/* Indicates the state of the body of the HTTP/2 stream */
enum aws_h2_stream_body_state {
AWS_H2_STREAM_BODY_STATE_NONE, /* Has no body for the HTTP/2 stream */
AWS_H2_STREAM_BODY_STATE_WAITING_WRITES, /* Has no active body, but waiting for more to be
write */
AWS_H2_STREAM_BODY_STATE_ONGOING, /* Has active ongoing body */
};

/* represents a write operation, which will be turned into a data frame */
struct aws_h2_stream_data_write {
struct aws_linked_list_node node;
struct aws_input_stream *data_stream;
aws_http2_stream_write_data_complete_fn *on_complete;
void *user_data;
bool end_stream;
};

struct aws_h2_stream {
struct aws_http_stream base;

Expand All @@ -68,7 +85,16 @@ struct aws_h2_stream {
* We leave it up to the remote peer to detect whether the max window size has been exceeded. */
int64_t window_size_self;
struct aws_http_message *outgoing_message;
/* All queued writes. If the message provides a body stream, it will be first in this list
* This list can drain, which results in the stream being put to sleep (moved to waiting_streams_list in
* h2_connection). */
struct aws_linked_list outgoing_writes; /* aws_http2_stream_data_write */
bool received_main_headers;

/* Indicates that the stream is currently in the waiting_streams_list and is
* asleep. When stream needs to be awaken, moving the stream back to the outgoing_streams_list and set this bool
* to false */
bool waiting_for_writes;
} thread_data;

/* Any thread may touch this data, but the lock must be held (unless it's an atomic) */
Expand All @@ -84,10 +110,15 @@ struct aws_h2_stream {
* code we want to inform user about. */
struct aws_h2err reset_error;
bool reset_called;
bool manual_write_ended;

/* Simplified stream state. */
enum aws_h2_stream_api_state api_state;

/* any data streams sent manually via aws_http2_stream_write_data */
struct aws_linked_list pending_write_list; /* aws_h2_stream_pending_data */
} synced_data;
bool manual_write;

/* Store the sent reset HTTP/2 error code, set to -1, if none has sent so far */
int64_t sent_reset_error_code;
Expand All @@ -113,16 +144,15 @@ enum aws_h2_stream_state aws_h2_stream_get_state(const struct aws_h2_stream *str
struct aws_h2err aws_h2_stream_window_size_change(struct aws_h2_stream *stream, int32_t size_changed, bool self);

/* Connection is ready to send frames from stream now */
int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgoing_data);
int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_body_state *body_state);

/* Completes stream for one reason or another, clean up any pending writes/resources. */
void aws_h2_stream_complete(struct aws_h2_stream *stream, int error_code);

/* Connection is ready to send data from stream now.
* Stream may complete itself during this call.
* data_encode_status:
* AWS_H2_DATA_ENCODE_COMPLETE: Finished encoding data for the stream
* AWS_H2_DATA_ENCODE_ONGOING: Stream has more data to send.
* AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED: Stream has more data to send, but it's not ready right now
* AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED: Stream has more data to send but its window size is too small, and stream
* will be moved to stalled_window_stream_list */
* data_encode_status: see `aws_h2_data_encode_status`
*/
int aws_h2_stream_encode_data_frame(
struct aws_h2_stream *stream,
struct aws_h2_frame_encoder *encoder,
Expand Down
3 changes: 3 additions & 0 deletions include/aws/http/private/request_response_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ struct aws_http_stream_vtable {
int (*http2_reset_stream)(struct aws_http_stream *http2_stream, uint32_t http2_error);
int (*http2_get_received_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error);
int (*http2_get_sent_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error);
int (*http2_write_data)(
struct aws_http_stream *http2_stream,
const struct aws_http2_stream_write_data_options *options);
};

/**
Expand Down
98 changes: 95 additions & 3 deletions include/aws/http/request_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ struct aws_http_make_request_options {
* See `aws_http_on_stream_complete_fn`.
*/
aws_http_on_stream_complete_fn *on_complete;

/**
* When using HTTP/2, request body data will be provided over time. The stream will only be polled for writing
* when data has been supplied via `aws_http2_stream_write_data`
*/
bool http2_use_manual_data_writes;
};

struct aws_http_request_handler_options {
Expand Down Expand Up @@ -286,6 +292,21 @@ struct aws_http_request_handler_options {
aws_http_on_stream_complete_fn *on_complete;
};

/**
* Invoked when the data stream of an outgoing HTTP write operation is no longer in use.
* This is always invoked on the HTTP connection's event-loop thread.
*
* @param stream HTTP-stream this write operation was submitted to.
* @param error_code If error_code is AWS_ERROR_SUCCESS (0), the data was successfully sent.
* Any other error_code indicates that the HTTP-stream is in the process of terminating.
* If the error_code is AWS_ERROR_HTTP_STREAM_HAS_COMPLETED,
* the stream's termination has nothing to do with this write operation.
* Any other non-zero error code indicates a problem with this particular write
* operation's data.
* @param user_data User data for this write operation.
*/
typedef void aws_http_stream_write_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data);

/**
* Invoked when the data of an outgoing HTTP/1.1 chunk is no longer in use.
* This is always invoked on the HTTP connection's event-loop thread.
Expand All @@ -298,7 +319,7 @@ struct aws_http_request_handler_options {
* Any other non-zero error code indicates a problem with this particular chunk's data.
* @param user_data User data for this chunk.
*/
typedef void aws_http1_stream_write_chunk_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data);
typedef aws_http_stream_write_complete_fn aws_http1_stream_write_chunk_complete_fn;

/**
* HTTP/1.1 chunk extension for chunked encoding.
Expand Down Expand Up @@ -356,6 +377,50 @@ struct aws_http1_chunk_options {
void *user_data;
};

/**
* Invoked when the data of an outgoing HTTP2 data frame is no longer in use.
* This is always invoked on the HTTP connection's event-loop thread.
*
* @param stream HTTP2-stream this write was submitted to.
* @param error_code If error_code is AWS_ERROR_SUCCESS (0), the data was successfully sent.
* Any other error_code indicates that the HTTP-stream is in the process of terminating.
* If the error_code is AWS_ERROR_HTTP_STREAM_HAS_COMPLETED,
* the stream's termination has nothing to do with this write.
* Any other non-zero error code indicates a problem with this particular write's data.
* @param user_data User data for this write.
*/
typedef aws_http_stream_write_complete_fn aws_http2_stream_write_data_complete_fn;

/**
* Encoding options for manual H2 data frame writes
*/
struct aws_http2_stream_write_data_options {
/**
* The data to be sent.
* Optional.
* If not set, input stream with length 0 will be used.
*/
struct aws_input_stream *data;

/**
* Set true when it's the last chunk to be sent.
* After a write with end_stream, no more data write will be accepted.
*/
bool end_stream;

/**
* Invoked when the data stream is no longer in use, whether or not it was successfully sent.
* Optional.
* See `aws_http2_stream_write_data_complete_fn`.
*/
aws_http2_stream_write_data_complete_fn *on_complete;

/**
* User provided data passed to the on_complete callback on its invocation.
*/
void *user_data;
};

#define AWS_HTTP_REQUEST_HANDLER_OPTIONS_INIT \
{ .self_size = sizeof(struct aws_http_request_handler_options), }

Expand Down Expand Up @@ -728,9 +793,37 @@ AWS_HTTP_API int aws_http1_stream_write_chunk(
struct aws_http_stream *http1_stream,
const struct aws_http1_chunk_options *options);

/**
* The stream must have specified `http2_use_manual_data_writes` during request creation.
* For client streams, activate() must be called before any frames are submitted.
* For server streams, the response headers must be submitted before any frames.
* A write with options that has end_stream set to be true will end the stream and prevent any further write.
*
* @return AWS_OP_SUCCESS if the write was queued
* AWS_OP_ERROR indicating the attempt raised an error code.
* AWS_ERROR_INVALID_STATE will be raised for invalid usage.
* AWS_ERROR_HTTP_STREAM_HAS_COMPLETED will be raised if the stream ended for reasons behind the scenes.
*
* Typical usage will be something like:
* options.http2_use_manual_data_writes = true;
* stream = aws_http_connection_make_request(connection, &options);
* aws_http_stream_activate(stream);
* ...
* struct aws_http2_stream_write_data_options write;
* aws_http2_stream_write_data(stream, &write);
* ...
* struct aws_http2_stream_write_data_options last_write;
* last_write.end_stream = true;
* aws_http2_stream_write_data(stream, &write);
* ...
* aws_http_stream_release(stream);
*/
AWS_HTTP_API int aws_http2_stream_write_data(
struct aws_http_stream *http2_stream,
const struct aws_http2_stream_write_data_options *options);

/**
* Add a list of headers to be added as trailing headers sent after the last chunk is sent.
* The stream must have specified "chunked" in a "transfer-encoding" header. The stream should also have
* a "Trailer" header field which indicates the fields present in the trailer.
*
* Certain headers are forbidden in the trailer (e.g., Transfer-Encoding, Content-Length, Host). See RFC-7541
Expand All @@ -750,7 +843,6 @@ AWS_HTTP_API int aws_http1_stream_add_chunked_trailer(
const struct aws_http_headers *trailing_headers);

/**
* Get the message's aws_http_headers.
*
* This datastructure has more functions for inspecting and modifying headers than
* are available on the aws_http_message datastructure.
Expand Down
32 changes: 21 additions & 11 deletions source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ static struct aws_h2_connection *s_connection_new(
aws_linked_list_init(&connection->thread_data.pending_settings_queue);
aws_linked_list_init(&connection->thread_data.pending_ping_queue);
aws_linked_list_init(&connection->thread_data.stalled_window_streams_list);
aws_linked_list_init(&connection->thread_data.waiting_streams_list);
aws_linked_list_init(&connection->thread_data.outgoing_frames_queue);

if (aws_mutex_init(&connection->synced_data.lock)) {
Expand Down Expand Up @@ -453,6 +454,7 @@ static void s_handler_destroy(struct aws_channel_handler *handler) {
!aws_hash_table_is_valid(&connection->thread_data.active_streams_map) ||
aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0);

AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.waiting_streams_list));
AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.stalled_window_streams_list));
AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.outgoing_streams_list));
AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_stream_list));
Expand Down Expand Up @@ -794,6 +796,7 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect
AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list;
struct aws_linked_list *stalled_window_streams_list = &connection->thread_data.stalled_window_streams_list;
struct aws_linked_list *waiting_streams_list = &connection->thread_data.waiting_streams_list;

/* If a stream stalls, put it in this list until the function ends so we don't keep trying to read from it.
* We put it back at the end of function. */
Expand Down Expand Up @@ -851,9 +854,13 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect
case AWS_H2_DATA_ENCODE_ONGOING:
aws_linked_list_push_back(outgoing_streams_list, node);
break;
case AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED:
case AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED:
aws_linked_list_push_back(&stalled_streams_list, node);
break;
case AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES:
stream->thread_data.waiting_for_writes = true;
aws_linked_list_push_back(waiting_streams_list, node);
break;
case AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED:
aws_linked_list_push_back(stalled_window_streams_list, node);
AWS_H2_STREAM_LOG(
Expand Down Expand Up @@ -1761,10 +1768,7 @@ static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h
aws_linked_list_remove(&stream->node);
}

/* Invoke callback */
if (stream->base.on_complete) {
stream->base.on_complete(&stream->base, error_code, stream->base.user_data);
}
aws_h2_stream_complete(stream, error_code);

/* release connection's hold on stream */
aws_http_stream_release(&stream->base);
Expand Down Expand Up @@ -1859,15 +1863,20 @@ static void s_move_stream_to_thread(
goto error;
}

bool has_outgoing_data = false;
if (aws_h2_stream_on_activated(stream, &has_outgoing_data)) {
enum aws_h2_stream_body_state body_state = AWS_H2_STREAM_BODY_STATE_NONE;
if (aws_h2_stream_on_activated(stream, &body_state)) {
goto error;
}

if (has_outgoing_data) {
aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
switch (body_state) {
case AWS_H2_STREAM_BODY_STATE_WAITING_WRITES:
aws_linked_list_push_back(&connection->thread_data.waiting_streams_list, &stream->node);
break;
case AWS_H2_STREAM_BODY_STATE_ONGOING:
aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node);
break;
default:
break;
}

return;
error:
/* If the stream got into any datastructures, s_stream_complete() will remove it */
Expand Down Expand Up @@ -1954,6 +1963,7 @@ static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, e
s_send_goaway(connection, goaway->http2_error, goaway->allow_more_streams, &goaway->debug_data);
aws_mem_release(connection->base.alloc, goaway);
}

/* It's likely that frames were queued while processing cross-thread work.
* If so, try writing them now */
aws_h2_try_write_outgoing_frames(connection);
Expand Down
Loading

0 comments on commit 1a3614f

Please sign in to comment.