Skip to content

Commit

Permalink
refcount input stream (#342)
Browse files Browse the repository at this point in the history
Refcount the input stream and adapt the new api change
  • Loading branch information
TingDaoK authored Apr 27, 2022
1 parent 4333519 commit b76ebf2
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 105 deletions.
2 changes: 1 addition & 1 deletion bin/elasticurl/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ int main(int argc, char **argv) {
}

if (app_ctx.input_body) {
aws_input_stream_destroy(app_ctx.input_body);
aws_input_stream_release(app_ctx.input_body);
}

if (app_ctx.input_file) {
Expand Down
3 changes: 2 additions & 1 deletion builder.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
{ "name": "aws-c-compression" }
],
"downstream": [
{ "name": "aws-c-auth" }
{ "name": "aws-c-auth" },
{ "name": "aws-c-mqtt" }
],
"test_steps": [
["bash", "./.builder/action/aws-c-http-test.sh"],
Expand Down
2 changes: 1 addition & 1 deletion codebuild/linux-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ phases:
pre_build:
commands:
- export CC=gcc-7
- export BUILDER_VERSION=v0.8.27
- export BUILDER_VERSION=v0.9.12
- export BUILDER_SOURCE=releases
- export BUILDER_HOST=https://d19elf31gohf1l.cloudfront.net
build:
Expand Down
9 changes: 5 additions & 4 deletions source/h1_encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ int aws_h1_encoder_message_init_from_request(

AWS_ZERO_STRUCT(*message);

message->body = aws_http_message_get_body_stream(request);
message->body = aws_input_stream_acquire(aws_http_message_get_body_stream(request));
message->pending_chunk_list = pending_chunk_list;

struct aws_byte_cursor method;
Expand Down Expand Up @@ -352,7 +352,7 @@ int aws_h1_encoder_message_init_from_response(

AWS_ZERO_STRUCT(*message);

message->body = aws_http_message_get_body_stream(response);
message->body = aws_input_stream_acquire(aws_http_message_get_body_stream(response));
message->pending_chunk_list = pending_chunk_list;

struct aws_byte_cursor version = aws_http_version_to_str(AWS_HTTP_VERSION_1_1);
Expand Down Expand Up @@ -432,6 +432,7 @@ int aws_h1_encoder_message_init_from_response(
}

void aws_h1_encoder_message_clean_up(struct aws_h1_encoder_message *message) {
aws_input_stream_release(message->body);
aws_byte_buf_clean_up(&message->outgoing_head_buf);
aws_h1_trailer_destroy(message->trailer);
AWS_ZERO_STRUCT(*message);
Expand Down Expand Up @@ -547,18 +548,18 @@ struct aws_h1_chunk *aws_h1_chunk_new(struct aws_allocator *allocator, const str
}

chunk->allocator = allocator;
chunk->data = options->chunk_data;
chunk->data = aws_input_stream_acquire(options->chunk_data);
chunk->data_size = options->chunk_data_size;
chunk->on_complete = options->on_complete;
chunk->user_data = options->user_data;
chunk->chunk_line = aws_byte_buf_from_empty_array(chunk_line_storage, chunk_line_size);
s_populate_chunk_line_buffer(&chunk->chunk_line, options);

return chunk;
}

void aws_h1_chunk_destroy(struct aws_h1_chunk *chunk) {
AWS_PRECONDITION(chunk);
aws_input_stream_release(chunk->data);
aws_mem_release(chunk->allocator, chunk);
}

Expand Down
8 changes: 7 additions & 1 deletion source/request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ struct aws_http_message *aws_http_message_release(struct aws_http_message *messa
}

aws_http_headers_release(message->headers);

aws_input_stream_release(message->body_stream);
aws_mem_release(message->allocator, message);
} else {
AWS_ASSERT(prev_refcount != 0);
Expand Down Expand Up @@ -727,7 +727,13 @@ int aws_http_message_set_response_status(struct aws_http_message *response_messa

void aws_http_message_set_body_stream(struct aws_http_message *message, struct aws_input_stream *body_stream) {
AWS_PRECONDITION(message);
/* release previous stream, if any */
aws_input_stream_release(message->body_stream);

message->body_stream = body_stream;
if (message->body_stream) {
aws_input_stream_acquire(message->body_stream);
}
}

int aws_http1_stream_write_chunk(struct aws_http_stream *http1_stream, const struct aws_http1_chunk_options *options) {
Expand Down
8 changes: 5 additions & 3 deletions tests/fuzz/fuzz_h2_decoder_correct.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ static struct aws_http_headers *s_generate_headers(
struct aws_http_header path = {.name = aws_http_header_path, .value = aws_byte_cursor_from_c_str("/")};
aws_http_headers_add_header(headers, &path);

struct aws_http_header authority = {.name = aws_http_header_authority,
.value = aws_byte_cursor_from_c_str("example.com")};
struct aws_http_header authority = {
.name = aws_http_header_authority,
.value = aws_byte_cursor_from_c_str("example.com"),
};
aws_http_headers_add_header(headers, &authority);

} else if (header_style == HEADER_STYLE_RESPONSE) {
Expand Down Expand Up @@ -252,7 +254,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
struct aws_stream_status body_status;
aws_input_stream_get_status(body, &body_status);
AWS_FATAL_ASSERT(body_complete == body_status.is_end_of_stream)
aws_input_stream_destroy(body);
aws_input_stream_release(body);
break;
}
case AWS_H2_FRAME_T_HEADERS: {
Expand Down
47 changes: 19 additions & 28 deletions tests/h2_test_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ int h2_fake_peer_send_data_frame_with_padding_length(
ASSERT_TRUE(msg->message_data.len != 0);

ASSERT_SUCCESS(testing_channel_push_read_message(peer->testing_channel, msg));
aws_input_stream_destroy(body_stream);
aws_input_stream_release(body_stream);
return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -643,6 +643,8 @@ int h2_fake_peer_send_connection_preface_default_settings(struct h2_fake_peer *p
/******************************************************************************/

struct aws_input_stream_tester {
struct aws_input_stream base;
struct aws_allocator *allocator;
/* aws_input_stream_byte_cursor provides our actual functionality */
struct aws_input_stream *cursor_stream;

Expand All @@ -655,12 +657,12 @@ static int s_aws_input_stream_tester_seek(
int64_t offset,
enum aws_stream_seek_basis basis) {

struct aws_input_stream_tester *impl = stream->impl;
struct aws_input_stream_tester *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_tester, base);
return aws_input_stream_seek(impl->cursor_stream, offset, basis);
}

static int s_aws_input_stream_tester_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct aws_input_stream_tester *impl = stream->impl;
struct aws_input_stream_tester *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_tester, base);

if (impl->is_reading_broken) {
return aws_raise_error(AWS_IO_STREAM_READ_FAILED);
Expand All @@ -678,60 +680,49 @@ static int s_aws_input_stream_tester_read(struct aws_input_stream *stream, struc
}

static int s_aws_input_stream_tester_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct aws_input_stream_tester *impl = stream->impl;
struct aws_input_stream_tester *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_tester, base);
return aws_input_stream_get_status(impl->cursor_stream, status);
}

static int s_aws_input_stream_tester_get_length(struct aws_input_stream *stream, int64_t *out_length) {
struct aws_input_stream_tester *impl = stream->impl;
struct aws_input_stream_tester *impl = AWS_CONTAINER_OF(stream, struct aws_input_stream_tester, base);
return aws_input_stream_get_length(impl->cursor_stream, out_length);
}

static void s_aws_input_stream_tester_destroy(struct aws_input_stream *stream) {
if (stream) {
struct aws_input_stream_tester *impl = stream->impl;
aws_input_stream_destroy(impl->cursor_stream);
aws_mem_release(stream->allocator, stream);
}
static void s_aws_input_stream_tester_destroy(struct aws_input_stream_tester *impl) {
aws_input_stream_release(impl->cursor_stream);
aws_mem_release(impl->allocator, impl);
}

static struct aws_input_stream_vtable s_aws_input_stream_tester_vtable = {
.seek = s_aws_input_stream_tester_seek,
.read = s_aws_input_stream_tester_read,
.get_status = s_aws_input_stream_tester_get_status,
.get_length = s_aws_input_stream_tester_get_length,
.destroy = s_aws_input_stream_tester_destroy,
};

struct aws_input_stream *aws_input_stream_new_tester(struct aws_allocator *alloc, struct aws_byte_cursor cursor) {

struct aws_input_stream *stream = NULL;
struct aws_input_stream_tester *impl = NULL;
aws_mem_acquire_many(
alloc, 2, &stream, sizeof(struct aws_input_stream), &impl, sizeof(struct aws_input_stream_tester));
AWS_FATAL_ASSERT(stream);

AWS_ZERO_STRUCT(*stream);
AWS_ZERO_STRUCT(*impl);

stream->allocator = alloc;
stream->impl = impl;
stream->vtable = &s_aws_input_stream_tester_vtable;
struct aws_input_stream_tester *impl = aws_mem_calloc(alloc, 1, sizeof(struct aws_input_stream_tester));
AWS_FATAL_ASSERT(impl);

impl->max_bytes_per_read = SIZE_MAX;

impl->cursor_stream = aws_input_stream_new_from_cursor(alloc, &cursor);
AWS_FATAL_ASSERT(impl->cursor_stream);

return stream;
impl->allocator = alloc;
impl->base.vtable = &s_aws_input_stream_tester_vtable;
aws_ref_count_init(
&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_aws_input_stream_tester_destroy);
return &impl->base;
}

void aws_input_stream_tester_set_max_bytes_per_read(struct aws_input_stream *input_stream, size_t max_bytes) {
struct aws_input_stream_tester *impl = input_stream->impl;
struct aws_input_stream_tester *impl = AWS_CONTAINER_OF(input_stream, struct aws_input_stream_tester, base);
impl->max_bytes_per_read = max_bytes;
}

void aws_input_stream_tester_set_reading_broken(struct aws_input_stream *input_stream, bool is_broken) {
struct aws_input_stream_tester *impl = input_stream->impl;
struct aws_input_stream_tester *impl = AWS_CONTAINER_OF(input_stream, struct aws_input_stream_tester, base);
impl->is_reading_broken = is_broken;
}
2 changes: 1 addition & 1 deletion tests/test_connection_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static void s_clean_up_monitor_test(void) {
if (request_info) {
aws_http_message_destroy(request_info->request);
aws_http_stream_release(request_info->stream);
aws_input_stream_destroy(request_info->body);
aws_input_stream_release(request_info->body);
}
}

Expand Down
Loading

0 comments on commit b76ebf2

Please sign in to comment.