From 1a3614f4ba21bbd1db636315fd9f089a8ede9f6e Mon Sep 17 00:00:00 2001 From: Justin Boswell Date: Fri, 3 Jun 2022 17:36:15 -0700 Subject: [PATCH] Added aws_http2_stream_write_data, allowing H2 data frames to be written at any time (#338) Adds support for a new H2 API: aws_http2_stream_write_data which allows applications to supply data as it comes in, rather than supplying a stream that is constantly polled and is usually empty. This should allow use cases like event streams and transcribe to be efficient and scalable. Co-authored-by: Justin Boswell Co-authored-by: Dengke Tang --- include/aws/http/private/h2_connection.h | 13 +- include/aws/http/private/h2_stream.h | 44 ++- .../aws/http/private/request_response_impl.h | 3 + include/aws/http/request_response.h | 98 +++++- source/h2_connection.c | 32 +- source/h2_stream.c | 286 +++++++++++++++--- source/request_response.c | 11 + tests/CMakeLists.txt | 3 + tests/test_h2_client.c | 237 +++++++++++++++ 9 files changed, 658 insertions(+), 69 deletions(-) diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index 366d0dd5f..5be9eb81d 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -70,6 +70,14 @@ struct aws_h2_connection { * Waiting for WINDOW_UPDATE to set them free */ struct aws_linked_list stalled_window_streams_list; + /* List using aws_h2_stream.node. + * Contains all streams that are open, but are only sending data when notified, rather than polling + * for it (e.g. event streams) + * Streams are moved to the outgoing_streams_list until they send pending data, then are moved back + * to this list to sleep until more data comes in + */ + struct aws_linked_list waiting_streams_list; + /* List using aws_h2_frame.node. * Queues all frames (except DATA frames) for connection to send. * When queue is empty, then we send DATA frames from the outgoing_streams_list */ @@ -200,8 +208,9 @@ enum aws_h2_stream_closed_when { enum aws_h2_data_encode_status { AWS_H2_DATA_ENCODE_COMPLETE, AWS_H2_DATA_ENCODE_ONGOING, - AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED, - AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED, + AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED, /* stalled reading from body stream */ + AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES, /* waiting for next manual write */ + AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED, /* stalled due to reduced window size */ }; /* When window size is too small to fit the possible padding into it, we stop sending data and wait for WINDOW_UPDATE */ diff --git a/include/aws/http/private/h2_stream.h b/include/aws/http/private/h2_stream.h index 3011dd856..05ecbc7c5 100644 --- a/include/aws/http/private/h2_stream.h +++ b/include/aws/http/private/h2_stream.h @@ -53,6 +53,23 @@ enum aws_h2_stream_api_state { AWS_H2_STREAM_API_STATE_COMPLETE, }; +/* Indicates the state of the body of the HTTP/2 stream */ +enum aws_h2_stream_body_state { + AWS_H2_STREAM_BODY_STATE_NONE, /* Has no body for the HTTP/2 stream */ + AWS_H2_STREAM_BODY_STATE_WAITING_WRITES, /* Has no active body, but waiting for more to be + write */ + AWS_H2_STREAM_BODY_STATE_ONGOING, /* Has active ongoing body */ +}; + +/* represents a write operation, which will be turned into a data frame */ +struct aws_h2_stream_data_write { + struct aws_linked_list_node node; + struct aws_input_stream *data_stream; + aws_http2_stream_write_data_complete_fn *on_complete; + void *user_data; + bool end_stream; +}; + struct aws_h2_stream { struct aws_http_stream base; @@ -68,7 +85,16 @@ struct aws_h2_stream { * We leave it up to the remote peer to detect whether the max window size has been exceeded. */ int64_t window_size_self; struct aws_http_message *outgoing_message; + /* All queued writes. If the message provides a body stream, it will be first in this list + * This list can drain, which results in the stream being put to sleep (moved to waiting_streams_list in + * h2_connection). */ + struct aws_linked_list outgoing_writes; /* aws_http2_stream_data_write */ bool received_main_headers; + + /* Indicates that the stream is currently in the waiting_streams_list and is + * asleep. When stream needs to be awaken, moving the stream back to the outgoing_streams_list and set this bool + * to false */ + bool waiting_for_writes; } thread_data; /* Any thread may touch this data, but the lock must be held (unless it's an atomic) */ @@ -84,10 +110,15 @@ struct aws_h2_stream { * code we want to inform user about. */ struct aws_h2err reset_error; bool reset_called; + bool manual_write_ended; /* Simplified stream state. */ enum aws_h2_stream_api_state api_state; + + /* any data streams sent manually via aws_http2_stream_write_data */ + struct aws_linked_list pending_write_list; /* aws_h2_stream_pending_data */ } synced_data; + bool manual_write; /* Store the sent reset HTTP/2 error code, set to -1, if none has sent so far */ int64_t sent_reset_error_code; @@ -113,16 +144,15 @@ enum aws_h2_stream_state aws_h2_stream_get_state(const struct aws_h2_stream *str struct aws_h2err aws_h2_stream_window_size_change(struct aws_h2_stream *stream, int32_t size_changed, bool self); /* Connection is ready to send frames from stream now */ -int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgoing_data); +int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_body_state *body_state); + +/* Completes stream for one reason or another, clean up any pending writes/resources. */ +void aws_h2_stream_complete(struct aws_h2_stream *stream, int error_code); /* Connection is ready to send data from stream now. * Stream may complete itself during this call. - * data_encode_status: - * AWS_H2_DATA_ENCODE_COMPLETE: Finished encoding data for the stream - * AWS_H2_DATA_ENCODE_ONGOING: Stream has more data to send. - * AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED: Stream has more data to send, but it's not ready right now - * AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED: Stream has more data to send but its window size is too small, and stream - * will be moved to stalled_window_stream_list */ + * data_encode_status: see `aws_h2_data_encode_status` + */ int aws_h2_stream_encode_data_frame( struct aws_h2_stream *stream, struct aws_h2_frame_encoder *encoder, diff --git a/include/aws/http/private/request_response_impl.h b/include/aws/http/private/request_response_impl.h index 39ad91b3c..2f4901330 100644 --- a/include/aws/http/private/request_response_impl.h +++ b/include/aws/http/private/request_response_impl.h @@ -23,6 +23,9 @@ struct aws_http_stream_vtable { int (*http2_reset_stream)(struct aws_http_stream *http2_stream, uint32_t http2_error); int (*http2_get_received_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error); int (*http2_get_sent_error_code)(struct aws_http_stream *http2_stream, uint32_t *http2_error); + int (*http2_write_data)( + struct aws_http_stream *http2_stream, + const struct aws_http2_stream_write_data_options *options); }; /** diff --git a/include/aws/http/request_response.h b/include/aws/http/request_response.h index 81d993df3..af72360da 100644 --- a/include/aws/http/request_response.h +++ b/include/aws/http/request_response.h @@ -233,6 +233,12 @@ struct aws_http_make_request_options { * See `aws_http_on_stream_complete_fn`. */ aws_http_on_stream_complete_fn *on_complete; + + /** + * When using HTTP/2, request body data will be provided over time. The stream will only be polled for writing + * when data has been supplied via `aws_http2_stream_write_data` + */ + bool http2_use_manual_data_writes; }; struct aws_http_request_handler_options { @@ -286,6 +292,21 @@ struct aws_http_request_handler_options { aws_http_on_stream_complete_fn *on_complete; }; +/** + * Invoked when the data stream of an outgoing HTTP write operation is no longer in use. + * This is always invoked on the HTTP connection's event-loop thread. + * + * @param stream HTTP-stream this write operation was submitted to. + * @param error_code If error_code is AWS_ERROR_SUCCESS (0), the data was successfully sent. + * Any other error_code indicates that the HTTP-stream is in the process of terminating. + * If the error_code is AWS_ERROR_HTTP_STREAM_HAS_COMPLETED, + * the stream's termination has nothing to do with this write operation. + * Any other non-zero error code indicates a problem with this particular write + * operation's data. + * @param user_data User data for this write operation. + */ +typedef void aws_http_stream_write_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data); + /** * Invoked when the data of an outgoing HTTP/1.1 chunk is no longer in use. * This is always invoked on the HTTP connection's event-loop thread. @@ -298,7 +319,7 @@ struct aws_http_request_handler_options { * Any other non-zero error code indicates a problem with this particular chunk's data. * @param user_data User data for this chunk. */ -typedef void aws_http1_stream_write_chunk_complete_fn(struct aws_http_stream *stream, int error_code, void *user_data); +typedef aws_http_stream_write_complete_fn aws_http1_stream_write_chunk_complete_fn; /** * HTTP/1.1 chunk extension for chunked encoding. @@ -356,6 +377,50 @@ struct aws_http1_chunk_options { void *user_data; }; +/** + * Invoked when the data of an outgoing HTTP2 data frame is no longer in use. + * This is always invoked on the HTTP connection's event-loop thread. + * + * @param stream HTTP2-stream this write was submitted to. + * @param error_code If error_code is AWS_ERROR_SUCCESS (0), the data was successfully sent. + * Any other error_code indicates that the HTTP-stream is in the process of terminating. + * If the error_code is AWS_ERROR_HTTP_STREAM_HAS_COMPLETED, + * the stream's termination has nothing to do with this write. + * Any other non-zero error code indicates a problem with this particular write's data. + * @param user_data User data for this write. + */ +typedef aws_http_stream_write_complete_fn aws_http2_stream_write_data_complete_fn; + +/** + * Encoding options for manual H2 data frame writes + */ +struct aws_http2_stream_write_data_options { + /** + * The data to be sent. + * Optional. + * If not set, input stream with length 0 will be used. + */ + struct aws_input_stream *data; + + /** + * Set true when it's the last chunk to be sent. + * After a write with end_stream, no more data write will be accepted. + */ + bool end_stream; + + /** + * Invoked when the data stream is no longer in use, whether or not it was successfully sent. + * Optional. + * See `aws_http2_stream_write_data_complete_fn`. + */ + aws_http2_stream_write_data_complete_fn *on_complete; + + /** + * User provided data passed to the on_complete callback on its invocation. + */ + void *user_data; +}; + #define AWS_HTTP_REQUEST_HANDLER_OPTIONS_INIT \ { .self_size = sizeof(struct aws_http_request_handler_options), } @@ -728,9 +793,37 @@ AWS_HTTP_API int aws_http1_stream_write_chunk( struct aws_http_stream *http1_stream, const struct aws_http1_chunk_options *options); +/** + * The stream must have specified `http2_use_manual_data_writes` during request creation. + * For client streams, activate() must be called before any frames are submitted. + * For server streams, the response headers must be submitted before any frames. + * A write with options that has end_stream set to be true will end the stream and prevent any further write. + * + * @return AWS_OP_SUCCESS if the write was queued + * AWS_OP_ERROR indicating the attempt raised an error code. + * AWS_ERROR_INVALID_STATE will be raised for invalid usage. + * AWS_ERROR_HTTP_STREAM_HAS_COMPLETED will be raised if the stream ended for reasons behind the scenes. + * + * Typical usage will be something like: + * options.http2_use_manual_data_writes = true; + * stream = aws_http_connection_make_request(connection, &options); + * aws_http_stream_activate(stream); + * ... + * struct aws_http2_stream_write_data_options write; + * aws_http2_stream_write_data(stream, &write); + * ... + * struct aws_http2_stream_write_data_options last_write; + * last_write.end_stream = true; + * aws_http2_stream_write_data(stream, &write); + * ... + * aws_http_stream_release(stream); + */ +AWS_HTTP_API int aws_http2_stream_write_data( + struct aws_http_stream *http2_stream, + const struct aws_http2_stream_write_data_options *options); + /** * Add a list of headers to be added as trailing headers sent after the last chunk is sent. - * The stream must have specified "chunked" in a "transfer-encoding" header. The stream should also have * a "Trailer" header field which indicates the fields present in the trailer. * * Certain headers are forbidden in the trailer (e.g., Transfer-Encoding, Content-Length, Host). See RFC-7541 @@ -750,7 +843,6 @@ AWS_HTTP_API int aws_http1_stream_add_chunked_trailer( const struct aws_http_headers *trailing_headers); /** - * Get the message's aws_http_headers. * * This datastructure has more functions for inspecting and modifying headers than * are available on the aws_http_message datastructure. diff --git a/source/h2_connection.c b/source/h2_connection.c index d05c4a8b8..b50fdb2ec 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -331,6 +331,7 @@ static struct aws_h2_connection *s_connection_new( aws_linked_list_init(&connection->thread_data.pending_settings_queue); aws_linked_list_init(&connection->thread_data.pending_ping_queue); aws_linked_list_init(&connection->thread_data.stalled_window_streams_list); + aws_linked_list_init(&connection->thread_data.waiting_streams_list); aws_linked_list_init(&connection->thread_data.outgoing_frames_queue); if (aws_mutex_init(&connection->synced_data.lock)) { @@ -453,6 +454,7 @@ static void s_handler_destroy(struct aws_channel_handler *handler) { !aws_hash_table_is_valid(&connection->thread_data.active_streams_map) || aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0); + AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.waiting_streams_list)); AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.stalled_window_streams_list)); AWS_ASSERT(aws_linked_list_empty(&connection->thread_data.outgoing_streams_list)); AWS_ASSERT(aws_linked_list_empty(&connection->synced_data.pending_stream_list)); @@ -794,6 +796,7 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list; struct aws_linked_list *stalled_window_streams_list = &connection->thread_data.stalled_window_streams_list; + struct aws_linked_list *waiting_streams_list = &connection->thread_data.waiting_streams_list; /* If a stream stalls, put it in this list until the function ends so we don't keep trying to read from it. * We put it back at the end of function. */ @@ -851,9 +854,13 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect case AWS_H2_DATA_ENCODE_ONGOING: aws_linked_list_push_back(outgoing_streams_list, node); break; - case AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED: + case AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED: aws_linked_list_push_back(&stalled_streams_list, node); break; + case AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES: + stream->thread_data.waiting_for_writes = true; + aws_linked_list_push_back(waiting_streams_list, node); + break; case AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED: aws_linked_list_push_back(stalled_window_streams_list, node); AWS_H2_STREAM_LOG( @@ -1761,10 +1768,7 @@ static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h aws_linked_list_remove(&stream->node); } - /* Invoke callback */ - if (stream->base.on_complete) { - stream->base.on_complete(&stream->base, error_code, stream->base.user_data); - } + aws_h2_stream_complete(stream, error_code); /* release connection's hold on stream */ aws_http_stream_release(&stream->base); @@ -1859,15 +1863,20 @@ static void s_move_stream_to_thread( goto error; } - bool has_outgoing_data = false; - if (aws_h2_stream_on_activated(stream, &has_outgoing_data)) { + enum aws_h2_stream_body_state body_state = AWS_H2_STREAM_BODY_STATE_NONE; + if (aws_h2_stream_on_activated(stream, &body_state)) { goto error; } - - if (has_outgoing_data) { - aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node); + switch (body_state) { + case AWS_H2_STREAM_BODY_STATE_WAITING_WRITES: + aws_linked_list_push_back(&connection->thread_data.waiting_streams_list, &stream->node); + break; + case AWS_H2_STREAM_BODY_STATE_ONGOING: + aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node); + break; + default: + break; } - return; error: /* If the stream got into any datastructures, s_stream_complete() will remove it */ @@ -1954,6 +1963,7 @@ static void s_cross_thread_work_task(struct aws_channel_task *task, void *arg, e s_send_goaway(connection, goaway->http2_error, goaway->allow_more_streams, &goaway->debug_data); aws_mem_release(connection->base.alloc, goaway); } + /* It's likely that frames were queued while processing cross-thread work. * If so, try writing them now */ aws_h2_try_write_outgoing_frames(connection); diff --git a/source/h2_stream.c b/source/h2_stream.c index 7d3a1864f..e1fe088f3 100644 --- a/source/h2_stream.c +++ b/source/h2_stream.c @@ -10,6 +10,7 @@ #include #include #include +#include /* Apple toolchains such as xcode and swiftpm define the DEBUG symbol. undef it here so we can actually use the token */ #undef DEBUG @@ -19,6 +20,9 @@ static void s_stream_update_window(struct aws_http_stream *stream_base, size_t i static int s_stream_reset_stream(struct aws_http_stream *stream_base, uint32_t http2_error); static int s_stream_get_received_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error); static int s_stream_get_sent_error_code(struct aws_http_stream *stream_base, uint32_t *out_http2_error); +static int s_stream_write_data( + struct aws_http_stream *stream_base, + const struct aws_http2_stream_write_data_options *options); static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status); static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream, struct aws_h2err stream_error); @@ -32,6 +36,7 @@ struct aws_http_stream_vtable s_h2_stream_vtable = { .http2_reset_stream = s_stream_reset_stream, .http2_get_received_error_code = s_stream_get_received_error_code, .http2_get_sent_error_code = s_stream_get_sent_error_code, + .http2_write_data = s_stream_write_data, }; const char *aws_h2_stream_state_to_str(enum aws_h2_stream_state state) { @@ -238,15 +243,16 @@ struct aws_h2_stream *aws_h2_stream_new_request( stream->base.on_complete = options->on_complete; stream->base.client_data = &stream->base.client_or_server_data.client; stream->base.client_data->response_status = AWS_HTTP_STATUS_CODE_UNKNOWN; + aws_linked_list_init(&stream->thread_data.outgoing_writes); + aws_linked_list_init(&stream->synced_data.pending_write_list); /* Stream refcount starts at 1, and gets incremented again for the connection upon a call to activate() */ aws_atomic_init_int(&stream->base.refcount, 1); - /* Init H2 specific stuff */ - stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE; enum aws_http_version message_version = aws_http_message_get_protocol_version(options->request); switch (message_version) { case AWS_HTTP_VERSION_1_1: + /* TODO: don't automatic transform HTTP/1 message. Let user explicitly pass in HTTP/2 request */ stream->thread_data.outgoing_message = aws_http2_message_new_from_http1(options->request, stream->base.alloc); if (!stream->thread_data.outgoing_message) { @@ -266,9 +272,24 @@ struct aws_h2_stream *aws_h2_stream_new_request( goto error; } + /* Init H2 specific stuff */ + stream->thread_data.state = AWS_H2_STREAM_STATE_IDLE; + /* stream end is implicit if the request isn't using manual data writes */ + stream->synced_data.manual_write_ended = !options->http2_use_manual_data_writes; + stream->manual_write = options->http2_use_manual_data_writes; + + /* if there's a request body to write, add it as the first outgoing write */ + struct aws_input_stream *body_stream = aws_http_message_get_body_stream(options->request); + if (body_stream) { + struct aws_h2_stream_data_write *body_write = + aws_mem_calloc(stream->base.alloc, 1, sizeof(struct aws_h2_stream_data_write)); + body_write->data_stream = aws_input_stream_acquire(body_stream); + body_write->end_stream = true; + aws_linked_list_push_back(&stream->thread_data.outgoing_writes, &body_write->node); + } + stream->sent_reset_error_code = -1; stream->received_reset_error_code = -1; - stream->synced_data.reset_error.h2_code = AWS_HTTP2_ERR_COUNT; stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_INIT; if (aws_mutex_init(&stream->synced_data.lock)) { @@ -307,6 +328,9 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void size_t window_update_size; struct aws_h2err reset_error; + struct aws_linked_list pending_writes; + aws_linked_list_init(&pending_writes); + { /* BEGIN CRITICAL SECTION */ s_lock_synced_data(stream); stream->synced_data.is_cross_thread_work_task_scheduled = false; @@ -317,6 +341,9 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void reset_called = stream->synced_data.reset_called; reset_error = stream->synced_data.reset_error; + /* copy out pending writes */ + aws_linked_list_swap_contents(&pending_writes, &stream->synced_data.pending_write_list); + s_unlock_synced_data(stream); } /* END CRITICAL SECTION */ @@ -338,6 +365,15 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void } } + if (stream->thread_data.waiting_for_writes && !aws_linked_list_empty(&pending_writes)) { + /* Got more to write, move the stream back to outgoing list */ + aws_linked_list_remove(&stream->node); + aws_linked_list_push_back(&connection->thread_data.outgoing_streams_list, &stream->node); + stream->thread_data.waiting_for_writes = false; + } + /* move any pending writes to the outgoing write queue */ + aws_linked_list_move_all_back(&stream->thread_data.outgoing_writes, &pending_writes); + /* It's likely that frames were queued while processing cross-thread work. * If so, try writing them now */ aws_h2_try_write_outgoing_frames(connection); @@ -346,10 +382,46 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void aws_http_stream_release(&stream->base); } +static void s_stream_data_write_destroy( + struct aws_h2_stream *stream, + struct aws_h2_stream_data_write *write, + int error_code) { + + AWS_PRECONDITION(stream); + AWS_PRECONDITION(write); + if (write->on_complete) { + write->on_complete(&stream->base, error_code, write->user_data); + } + if (write->data_stream) { + aws_input_stream_release(write->data_stream); + } + aws_mem_release(stream->base.alloc, write); +} + +static void s_h2_stream_destroy_pending_writes(struct aws_h2_stream *stream) { + /** + * Only called when stream is not active and will never be active afterward (destroying). + * Under this assumption, we can safely touch `stream->synced_data.pending_write_list` without + * lock, as the user can only add write to the list when the stream is ACTIVE + */ + AWS_ASSERT(stream->synced_data.api_state != AWS_H2_STREAM_API_STATE_ACTIVE); + aws_linked_list_move_all_back( + &stream->thread_data.outgoing_writes, + &stream->synced_data.pending_write_list); /* clean up any outgoing writes */ + while (!aws_linked_list_empty(&stream->thread_data.outgoing_writes)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.outgoing_writes); + struct aws_h2_stream_data_write *write = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node); + AWS_LOGF_DEBUG(AWS_LS_HTTP_STREAM, "Stream closing, cancelling write of stream %p", (void *)write->data_stream); + s_stream_data_write_destroy(stream, write, AWS_ERROR_HTTP_STREAM_HAS_COMPLETED); + } +} + static void s_stream_destroy(struct aws_http_stream *stream_base) { AWS_PRECONDITION(stream_base); struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base); + s_h2_stream_destroy_pending_writes(stream); + AWS_H2_STREAM_LOG(DEBUG, stream, "Destroying stream"); aws_mutex_clean_up(&stream->synced_data.lock); aws_http_message_release(stream->thread_data.outgoing_message); @@ -358,6 +430,23 @@ static void s_stream_destroy(struct aws_http_stream *stream_base) { aws_mem_release(stream->base.alloc, stream); } +void aws_h2_stream_complete(struct aws_h2_stream *stream, int error_code) { + { /* BEGIN CRITICAL SECTION */ + /* clean up any pending writes */ + s_lock_synced_data(stream); + /* The stream is complete now, this will prevent further writes from being queued */ + stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_COMPLETE; + s_unlock_synced_data(stream); + } /* END CRITICAL SECTION */ + + s_h2_stream_destroy_pending_writes(stream); + + /* Invoke callback */ + if (stream->base.on_complete) { + stream->base.on_complete(&stream->base, error_code, stream->base.user_data); + } +} + static void s_stream_update_window(struct aws_http_stream *stream_base, size_t increment_size) { AWS_PRECONDITION(stream_base); struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base); @@ -514,11 +603,6 @@ static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream struct aws_h2_connection *connection = s_get_h2_connection(stream); stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED; - { /* BEGIN CRITICAL SECTION */ - s_lock_synced_data(stream); - stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_COMPLETE; - s_unlock_synced_data(stream); - } /* END CRITICAL SECTION */ AWS_H2_STREAM_LOGF( DEBUG, stream, @@ -529,10 +613,7 @@ static struct aws_h2err s_send_rst_and_close_stream(struct aws_h2_stream *stream /* Send RST_STREAM */ struct aws_h2_frame *rst_stream_frame = aws_h2_frame_new_rst_stream(stream->base.alloc, stream->base.id, stream_error.h2_code); - if (!rst_stream_frame) { - AWS_H2_STREAM_LOGF(ERROR, stream, "Error creating RST_STREAM frame, %s", aws_error_name(aws_last_error())); - return aws_h2err_from_last_error(); - } + AWS_FATAL_ASSERT(rst_stream_frame != NULL); aws_h2_connection_enqueue_outgoing_frame(connection, rst_stream_frame); /* connection takes ownership of frame */ stream->sent_reset_error_code = stream_error.h2_code; @@ -560,7 +641,42 @@ struct aws_h2err aws_h2_stream_window_size_change(struct aws_h2_stream *stream, return AWS_H2ERR_SUCCESS; } -int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgoing_data) { +static inline bool s_h2_stream_has_outgoing_writes(struct aws_h2_stream *stream) { + return !aws_linked_list_empty(&stream->thread_data.outgoing_writes); +} + +static void s_h2_stream_write_data_complete(struct aws_h2_stream *stream, bool *waiting_writes) { + AWS_PRECONDITION(waiting_writes); + AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream)); + + /* finish/clean up the current write operation */ + struct aws_linked_list_node *node = aws_linked_list_pop_front(&stream->thread_data.outgoing_writes); + struct aws_h2_stream_data_write *write_op = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node); + const bool ending_stream = write_op->end_stream; + s_stream_data_write_destroy(stream, write_op, AWS_OP_SUCCESS); + + /* check to see if there are more queued writes or stream_end was called */ + *waiting_writes = !ending_stream && !s_h2_stream_has_outgoing_writes(stream); +} + +static struct aws_h2_stream_data_write *s_h2_stream_get_current_write(struct aws_h2_stream *stream) { + AWS_PRECONDITION(s_h2_stream_has_outgoing_writes(stream)); + struct aws_linked_list_node *node = aws_linked_list_front(&stream->thread_data.outgoing_writes); + struct aws_h2_stream_data_write *write = AWS_CONTAINER_OF(node, struct aws_h2_stream_data_write, node); + return write; +} + +static struct aws_input_stream *s_h2_stream_get_data_stream(struct aws_h2_stream *stream) { + struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream); + return write->data_stream; +} + +static bool s_h2_stream_does_current_write_end_stream(struct aws_h2_stream *stream) { + struct aws_h2_stream_data_write *write = s_h2_stream_get_current_write(stream); + return write->end_stream; +} + +int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_body_state *body_state) { AWS_PRECONDITION_ON_CHANNEL_THREAD(stream); struct aws_h2_connection *connection = s_get_h2_connection(stream); @@ -569,14 +685,16 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgo struct aws_http_message *msg = stream->thread_data.outgoing_message; /* Should be ensured when the stream is created */ AWS_ASSERT(aws_http_message_get_protocol_version(msg) == AWS_HTTP_VERSION_2); - bool has_body_stream = aws_http_message_get_body_stream(msg) != NULL; + /* If manual write, always has data to be sent. */ + bool with_data = aws_http_message_get_body_stream(msg) != NULL || stream->manual_write; + struct aws_http_headers *h2_headers = aws_http_message_get_headers(msg); struct aws_h2_frame *headers_frame = aws_h2_frame_new_headers( stream->base.alloc, stream->base.id, h2_headers, - !has_body_stream /* end_stream */, + !with_data /* end_stream */, 0 /* padding - not currently configurable via public API */, NULL /* priority - not currently configurable via public API */); @@ -591,7 +709,7 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgo stream->thread_data.window_size_self = connection->thread_data.settings_self[AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if (has_body_stream) { + if (with_data) { /* If stream has DATA to send, put it in the outgoing_streams_list, and we'll send data later */ stream->thread_data.state = AWS_H2_STREAM_STATE_OPEN; AWS_H2_STREAM_LOG(TRACE, stream, "Sending HEADERS. State -> OPEN"); @@ -601,7 +719,16 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, bool *out_has_outgo AWS_H2_STREAM_LOG(TRACE, stream, "Sending HEADERS with END_STREAM. State -> HALF_CLOSED_LOCAL"); } - *out_has_outgoing_data = has_body_stream; + if (s_h2_stream_has_outgoing_writes(stream)) { + *body_state = AWS_H2_STREAM_BODY_STATE_ONGOING; + } else { + if (stream->manual_write) { + stream->thread_data.waiting_for_writes = true; + *body_state = AWS_H2_STREAM_BODY_STATE_WAITING_WRITES; + } else { + *body_state = AWS_H2_STREAM_BODY_STATE_NONE; + } + } aws_h2_connection_enqueue_outgoing_frame(connection, headers_frame); return AWS_OP_SUCCESS; @@ -629,22 +756,23 @@ int aws_h2_stream_encode_data_frame( } *data_encode_status = AWS_H2_DATA_ENCODE_COMPLETE; - struct aws_input_stream *body = aws_http_message_get_body_stream(stream->thread_data.outgoing_message); - AWS_ASSERT(body); + struct aws_input_stream *input_stream = s_h2_stream_get_data_stream(stream); + AWS_ASSERT(input_stream); - bool body_complete; - bool body_stalled; + bool input_stream_complete = false; + bool input_stream_stalled = false; + bool ends_stream = s_h2_stream_does_current_write_end_stream(stream); if (aws_h2_encode_data_frame( encoder, stream->base.id, - body, - true /*body_ends_stream*/, + input_stream, + ends_stream, 0 /*pad_length*/, &stream->thread_data.window_size_peer, &connection->thread_data.window_size_peer, output, - &body_complete, - &body_stalled)) { + &input_stream_complete, + &input_stream_stalled)) { /* Failed to write DATA, treat it as a Stream Error */ AWS_H2_STREAM_LOGF(ERROR, stream, "Error encoding stream DATA, %s", aws_error_name(aws_last_error())); @@ -655,16 +783,21 @@ int aws_h2_stream_encode_data_frame( return AWS_OP_SUCCESS; } - if (body_complete) { + bool waiting_writes = false; + if (input_stream_complete) { + s_h2_stream_write_data_complete(stream, &waiting_writes); + } + + /* + * input_stream_complete for manual writes just means the current outgoing_write is complete. The body is not + * complete for real until the stream is told to close + */ + if (input_stream_complete && ends_stream) { + /* Done sending data. No more data will be sent. */ if (stream->thread_data.state == AWS_H2_STREAM_STATE_HALF_CLOSED_REMOTE) { /* Both sides have sent END_STREAM */ stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED; AWS_H2_STREAM_LOG(TRACE, stream, "Sent END_STREAM. State -> CLOSED"); - { /* BEGIN CRITICAL SECTION */ - s_lock_synced_data(stream); - stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_COMPLETE; - s_unlock_synced_data(stream); - } /* END CRITICAL SECTION */ /* Tell connection that stream is now closed */ if (aws_h2_connection_on_stream_closed( connection, stream, AWS_H2_STREAM_CLOSED_WHEN_BOTH_SIDES_END_STREAM, AWS_ERROR_SUCCESS)) { @@ -676,16 +809,22 @@ int aws_h2_stream_encode_data_frame( AWS_H2_STREAM_LOG(TRACE, stream, "Sent END_STREAM. State -> HALF_CLOSED_LOCAL"); } } else { - /* Body not complete */ *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING; - if (body_stalled) { - *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_BODY_STALLED; + if (input_stream_stalled) { + AWS_ASSERT(!input_stream_complete); + *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_BODY_STREAM_STALLED; } if (stream->thread_data.window_size_peer <= AWS_H2_MIN_WINDOW_SIZE) { - /* if body and window both stalled, we take the window stalled status, which will take the stream out from - * outgoing list */ + /* if body and window both stalled, we take the window stalled status, which will take the stream out + * from outgoing list */ *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WINDOW_STALLED; } + if (waiting_writes) { + /* if window stalled and we waiting for manual writes, we take waiting writes status, which will be handled + * properly if more writes coming, but windows is still stalled. But not the other way around. */ + AWS_ASSERT(input_stream_complete); + *data_encode_status = AWS_H2_DATA_ENCODE_ONGOING_WAITING_FOR_WRITES; + } } return AWS_OP_SUCCESS; @@ -979,11 +1118,6 @@ struct aws_h2err aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *strea /* Both sides have sent END_STREAM */ stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED; AWS_H2_STREAM_LOG(TRACE, stream, "Received END_STREAM. State -> CLOSED"); - { /* BEGIN CRITICAL SECTION */ - s_lock_synced_data(stream); - stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_COMPLETE; - s_unlock_synced_data(stream); - } /* END CRITICAL SECTION */ /* Tell connection that stream is now closed */ if (aws_h2_connection_on_stream_closed( s_get_h2_connection(stream), @@ -1033,11 +1167,6 @@ struct aws_h2err aws_h2_stream_on_decoder_rst_stream(struct aws_h2_stream *strea } stream->thread_data.state = AWS_H2_STREAM_STATE_CLOSED; - { /* BEGIN CRITICAL SECTION */ - s_lock_synced_data(stream); - stream->synced_data.api_state = AWS_H2_STREAM_API_STATE_COMPLETE; - s_unlock_synced_data(stream); - } /* END CRITICAL SECTION */ stream->received_reset_error_code = h2_error_code; AWS_H2_STREAM_LOGF( @@ -1054,3 +1183,68 @@ struct aws_h2err aws_h2_stream_on_decoder_rst_stream(struct aws_h2_stream *strea return AWS_H2ERR_SUCCESS; } + +static int s_stream_write_data( + struct aws_http_stream *stream_base, + const struct aws_http2_stream_write_data_options *options) { + struct aws_h2_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h2_stream, base); + struct aws_h2_connection *connection = s_get_h2_connection(stream); + + /* queue this new write into the pending write list for the stream */ + struct aws_h2_stream_data_write *pending_write = + aws_mem_calloc(stream->base.alloc, 1, sizeof(struct aws_h2_stream_data_write)); + if (options->data) { + pending_write->data_stream = aws_input_stream_acquire(options->data); + } else { + struct aws_byte_cursor empty_cursor; + AWS_ZERO_STRUCT(empty_cursor); + pending_write->data_stream = aws_input_stream_new_from_cursor(stream->base.alloc, &empty_cursor); + } + bool schedule_cross_thread_work = false; + { /* BEGIN CRITICAL SECTION */ + s_lock_synced_data(stream); + { + if (stream->synced_data.api_state != AWS_H2_STREAM_API_STATE_ACTIVE) { + s_unlock_synced_data(stream); + s_stream_data_write_destroy(stream, pending_write, AWS_ERROR_INVALID_STATE); + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "Cannot write DATA frames to an inactive or closed stream, stream=%p", + (void *)stream_base); + return aws_raise_error(AWS_ERROR_INVALID_STATE); + } + + if (stream->synced_data.manual_write_ended) { + s_unlock_synced_data(stream); + s_stream_data_write_destroy(stream, pending_write, AWS_ERROR_INVALID_STATE); + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, + "Cannot write DATA frames to a stream after end, stream=%p", + (void *)stream_base); + /* Fail with error, otherwise, people can wait for on_complete callback that will never be invoked. */ + return aws_raise_error(AWS_ERROR_INVALID_STATE); + } + /* Not setting this until we're sure we succeeded, so that callback doesn't fire on cleanup if we fail */ + if (options->end_stream) { + stream->synced_data.manual_write_ended = true; + } + pending_write->end_stream = options->end_stream; + pending_write->on_complete = options->on_complete; + pending_write->user_data = options->user_data; + + aws_linked_list_push_back(&stream->synced_data.pending_write_list, &pending_write->node); + schedule_cross_thread_work = !stream->synced_data.is_cross_thread_work_task_scheduled; + stream->synced_data.is_cross_thread_work_task_scheduled = true; + } + s_unlock_synced_data(stream); + } /* END CRITICAL SECTION */ + + if (schedule_cross_thread_work) { + AWS_H2_STREAM_LOG(TRACE, stream, "Scheduling stream cross-thread work task"); + /* increment the refcount of stream to keep it alive until the task runs */ + aws_atomic_fetch_add(&stream->base.refcount, 1); + aws_channel_schedule_task_now(connection->base.channel_slot->channel, &stream->cross_thread_work_task); + } + + return AWS_OP_SUCCESS; +} diff --git a/source/request_response.c b/source/request_response.c index 1c42860ff..7bb407f58 100644 --- a/source/request_response.c +++ b/source/request_response.c @@ -751,6 +751,17 @@ int aws_http1_stream_write_chunk(struct aws_http_stream *http1_stream, const str return http1_stream->vtable->http1_write_chunk(http1_stream, options); } +int aws_http2_stream_write_data( + struct aws_http_stream *http2_stream, + const struct aws_http2_stream_write_data_options *options) { + AWS_PRECONDITION(http2_stream); + AWS_PRECONDITION(http2_stream->vtable); + AWS_PRECONDITION(http2_stream->vtable->http2_write_data); + AWS_PRECONDITION(options); + + return http2_stream->vtable->http2_write_data(http2_stream, options); +} + int aws_http1_stream_add_chunked_trailer( struct aws_http_stream *http1_stream, const struct aws_http_headers *trailing_headers) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2c86a0657..2096b77a7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -457,6 +457,9 @@ add_test_case(h2_client_error_from_outgoing_body_callback_reset_stream) add_test_case(h2_client_error_from_incoming_headers_callback_reset_stream) add_test_case(h2_client_error_from_incoming_headers_done_callback_reset_stream) add_test_case(h2_client_error_from_incoming_body_callback_reset_stream) +add_test_case(h2_client_manual_data_write) +add_test_case(h2_client_manual_data_write_no_data) +add_test_case(h2_client_manual_data_write_connection_close) add_test_case(server_new_destroy) add_test_case(connection_setup_shutdown) diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 0b625236d..a3becd02a 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -5112,3 +5112,240 @@ TEST_CASE(h2_client_error_from_incoming_body_callback_reset_stream) { ASSERT_SUCCESS(s_test_error_from_callback(allocator, ctx, REQUEST_CALLBACK_INCOMING_BODY)); return AWS_OP_SUCCESS; } + +struct h2_client_manual_data_write_ctx { + struct aws_allocator *allocator; + struct aws_byte_buf data; + int complete_error_code; +}; + +static struct aws_input_stream *s_h2_client_manual_data_write_generate_data( + struct h2_client_manual_data_write_ctx *ctx) { + struct aws_byte_cursor data = aws_byte_cursor_from_buf(&ctx->data); + data.len = aws_max_size(rand() % ctx->data.capacity, 1); + return aws_input_stream_new_from_cursor(ctx->allocator, &data); +} + +TEST_CASE(h2_client_manual_data_write) { + + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + size_t frame_count = h2_decode_tester_frame_count(&s_tester.peer.decode); + + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .http2_use_manual_data_writes = true, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &request_options); + ASSERT_NOT_NULL(stream); + + aws_http_stream_activate(stream); + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + uint32_t stream_id = aws_http_stream_get_id(stream); + + struct aws_byte_buf payload; + aws_byte_buf_init(&payload, allocator, 1024); + + struct h2_client_manual_data_write_ctx test_ctx = { + .allocator = allocator, + .data = payload, + }; + size_t total_length = 0; + + /* Simulate writes coming in over time */ + for (int idx = 0; idx < 1000; ++idx) { + struct aws_input_stream *data_stream = s_h2_client_manual_data_write_generate_data(&test_ctx); + int64_t stream_length = 0; + ASSERT_SUCCESS(aws_input_stream_get_length(data_stream, &stream_length)); + total_length += (size_t)stream_length; + struct aws_http2_stream_write_data_options write = { + .data = data_stream, + .on_complete = NULL, + .user_data = NULL, + }; + ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &write)); + /* fake peer sends WINDOW_UPDATE */ + struct aws_h2_frame *peer_frame = aws_h2_frame_new_window_update(allocator, stream_id, (uint32_t)stream_length); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame)); + /* Connection level window update */ + peer_frame = aws_h2_frame_new_window_update(allocator, 0, (uint32_t)stream_length); + ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, peer_frame)); + if (idx % 10 == 0) { + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + } + aws_input_stream_release(data_stream); + } + struct aws_http2_stream_write_data_options last_write = {.end_stream = true}; + + ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &last_write)); + + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + size_t frame_count2 = h2_decode_tester_frame_count(&s_tester.peer.decode); + /* Peer should received header frame without end_stream and mutiple data frames and combined payload length should + * be the same as total length sent. */ + struct h2_decoded_frame *header_frame = h2_decode_tester_get_frame(&s_tester.peer.decode, frame_count); + ASSERT_UINT_EQUALS(AWS_H2_FRAME_T_HEADERS, header_frame->type); + ASSERT_FALSE(header_frame->end_stream); + size_t received_length = 0; + for (size_t i = frame_count + 1; i < frame_count2; i++) { + struct h2_decoded_frame *data_frame = h2_decode_tester_get_frame(&s_tester.peer.decode, i); + ASSERT_UINT_EQUALS(AWS_H2_FRAME_T_DATA, data_frame->type); + received_length += data_frame->data_payload_len; + if (i == frame_count2 - 1) { + ASSERT_TRUE(data_frame->end_stream); + } else { + ASSERT_FALSE(data_frame->end_stream); + } + } + ASSERT_UINT_EQUALS(received_length, total_length); + + aws_http_message_release(request); + aws_http_stream_release(stream); + + /* close the connection */ + aws_http_connection_close(s_tester.connection); + + aws_byte_buf_clean_up(&test_ctx.data); + + /* clean up */ + return s_tester_clean_up(); +} + +TEST_CASE(h2_client_manual_data_write_no_data) { + + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + size_t frame_count = h2_decode_tester_frame_count(&s_tester.peer.decode); + + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .http2_use_manual_data_writes = true, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &request_options); + ASSERT_NOT_NULL(stream); + + aws_http_stream_activate(stream); + + struct aws_http2_stream_write_data_options last_write = {.end_stream = true}; + ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &last_write)); + + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + size_t frame_count_2 = h2_decode_tester_frame_count(&s_tester.peer.decode); + /* Peer should received header frame without end_stream and empty data frame with end_stream */ + ASSERT_UINT_EQUALS(frame_count + 2, frame_count_2); + struct h2_decoded_frame *header_frame = h2_decode_tester_get_frame(&s_tester.peer.decode, frame_count); + ASSERT_UINT_EQUALS(AWS_H2_FRAME_T_HEADERS, header_frame->type); + ASSERT_FALSE(header_frame->end_stream); + struct h2_decoded_frame *empty_data_frame = h2_decode_tester_get_frame(&s_tester.peer.decode, frame_count + 1); + ASSERT_UINT_EQUALS(AWS_H2_FRAME_T_DATA, empty_data_frame->type); + ASSERT_UINT_EQUALS(0, empty_data_frame->data_payload_len); + ASSERT_TRUE(empty_data_frame->end_stream); + aws_http_message_release(request); + aws_http_stream_release(stream); + + /* close the connection */ + aws_http_connection_close(s_tester.connection); + + /* clean up */ + return s_tester_clean_up(); +} + +static void s_on_manual_data_stream_complete(struct aws_http_stream *stream, int error_code, void *user_data) { + (void)stream; + struct h2_client_manual_data_write_ctx *test_ctx = (struct h2_client_manual_data_write_ctx *)user_data; + test_ctx->complete_error_code = error_code; +} + +/* Close the connection before finishes writing data */ +TEST_CASE(h2_client_manual_data_write_connection_close) { + + ASSERT_SUCCESS(s_tester_init(allocator, ctx)); + /* get connection preface and acks out of the way */ + ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer)); + ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); + + struct aws_http_message *request = aws_http2_message_new_request(allocator); + ASSERT_NOT_NULL(request); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + DEFINE_HEADER(":scheme", "https"), + DEFINE_HEADER(":path", "/"), + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + + struct aws_byte_buf payload; + aws_byte_buf_init(&payload, allocator, 1024); + + struct h2_client_manual_data_write_ctx test_ctx = { + .allocator = allocator, + .data = payload, + }; + + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .http2_use_manual_data_writes = true, + .on_complete = s_on_manual_data_stream_complete, + .user_data = &test_ctx, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(s_tester.connection, &request_options); + ASSERT_NOT_NULL(stream); + + struct aws_input_stream *data_stream = s_h2_client_manual_data_write_generate_data(&test_ctx); + struct aws_http2_stream_write_data_options write = { + .data = data_stream, + .on_complete = NULL, + .user_data = NULL, + }; + /* Cannot write before activate the stream */ + ASSERT_FAILS(aws_http2_stream_write_data(stream, &write)); + aws_http_stream_activate(stream); + ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &write)); + + /* close connection */ + aws_http_connection_close(s_tester.connection); + + ASSERT_SUCCESS(aws_http2_stream_write_data(stream, &write)); + + testing_channel_drain_queued_tasks(&s_tester.testing_channel); + /* Cannot write after stream closed */ + ASSERT_FAILS(aws_http2_stream_write_data(stream, &write)); + + ASSERT_INT_EQUALS(AWS_ERROR_HTTP_CONNECTION_CLOSED, test_ctx.complete_error_code); + + aws_http_message_release(request); + aws_http_stream_release(stream); + + /* clean up */ + aws_byte_buf_clean_up(&test_ctx.data); + aws_input_stream_release(data_stream); + return s_tester_clean_up(); +}