Skip to content

Commit

Permalink
Websocket automatically responds to PING with PONG (#417)
Browse files Browse the repository at this point in the history
**Issue:**
The Websocket was not automatically responding to a PING with a PONG, which violates [RFC-6455 Section 5.5.2](https://www.rfc-editor.org/rfc/rfc6455#section-5.5.2)

**Description of changes:**
Now it does

Also, remove some things:
- Remove concept of `high_priority` frames.
  - Nothing in the RFC mentions letting control frames cut in front of data frames. Remove this complexity until there's some proven need (spoiler: there will never be a need)
- Remove public access to the RSV (reserved) bits.
  -  These are only used by extensions, which we do not currently support. Keeping these accessible in the public API can only bite us later if we ever choose to support extensions. Better to keep these private unless there's a proven need to expose them.
  • Loading branch information
graebm authored Jan 3, 2023
1 parent 4f43dba commit 9bd58ca
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 120 deletions.
17 changes: 2 additions & 15 deletions include/aws/http/websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct aws_http_message;

/* TODO: Document lifetime stuff */
/* TODO: Document CLOSE frame behavior (when auto-sent during close, when auto-closed) */
/* TODO: Document auto-pong behavior */
/* TODO: Accept payload as aws_input_stream */

/**
* A websocket connection.
Expand Down Expand Up @@ -82,7 +82,6 @@ struct aws_websocket_incoming_frame {
uint64_t payload_length;
uint8_t opcode;
bool fin;
bool rsv[3];
};

/**
Expand Down Expand Up @@ -309,7 +308,7 @@ typedef void(
/**
* Options for sending a websocket frame.
* This structure is copied immediately by aws_websocket_send().
* For descriptions of opcode, fin, rsv, and payload_length see in RFC-6455 Section 5.2.
* For descriptions of opcode, fin, and payload_length see in RFC-6455 Section 5.2.
*/
struct aws_websocket_send_frame_options {
/**
Expand Down Expand Up @@ -346,18 +345,6 @@ struct aws_websocket_send_frame_options {
* Indicates that this is the final fragment in a message. The first fragment MAY also be the final fragment.
*/
bool fin;

/**
* If true, frame will be sent before those with normal priority.
* Useful for opcodes like PING and PONG where low latency is important.
* This feature may only be used with "control" opcodes, not "data" opcodes like BINARY and TEXT.
*/
bool high_priority;

/**
* MUST be 0 unless an extension is negotiated that defines meanings for non-zero values.
*/
bool rsv[3];
};

AWS_EXTERN_C_BEGIN
Expand Down
131 changes: 94 additions & 37 deletions source/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
# pragma warning(disable : 4204) /* non-constant aggregate initializer */
#endif

/* TODO: echo payload of peer CLOSE */

/* TODO: If something goes wrong during normal shutdown, do I change the error_code? */

struct outgoing_frame {
Expand Down Expand Up @@ -71,6 +69,10 @@ struct aws_websocket {
struct aws_websocket_incoming_frame *current_incoming_frame;
struct aws_websocket_incoming_frame incoming_frame_storage;

/* Payload of incoming PING frame.
* The PONG frame we send in response must have an identical payload */
struct aws_byte_buf incoming_ping_payload;

/* If current incoming frame is CONTINUATION, this is the data type it is a continuation of. */
enum aws_websocket_opcode continuation_of_opcode;

Expand Down Expand Up @@ -298,6 +300,7 @@ struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handl

aws_linked_list_init(&websocket->thread_data.outgoing_frame_list);
aws_linked_list_init(&websocket->thread_data.write_completion_frames);
aws_byte_buf_init(&websocket->thread_data.incoming_ping_payload, websocket->alloc, 0);

aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket);

Expand Down Expand Up @@ -343,6 +346,7 @@ static void s_handler_destroy(struct aws_channel_handler *handler) {

AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Destroying websocket.", (void *)websocket);

aws_byte_buf_clean_up(&websocket->thread_data.incoming_ping_payload);
aws_mutex_clean_up(&websocket->synced_data.lock);
aws_mem_release(websocket->alloc, websocket);
}
Expand Down Expand Up @@ -415,22 +419,6 @@ int aws_websocket_convert_to_midchannel_handler(struct aws_websocket *websocket)
return AWS_OP_SUCCESS;
}

/* Insert frame into list, sorting by priority, then by age (high-priority and older frames towards the front) */
static void s_enqueue_prioritized_frame(struct aws_linked_list *list, struct outgoing_frame *to_add) {
/* Iterate in reverse so that common case (a bunch of low-priority frames) is O(1) */
struct aws_linked_list_node *rev_iter = aws_linked_list_rbegin(list);
const struct aws_linked_list_node *rev_end = aws_linked_list_rend(list);
while (rev_iter != rev_end) {
struct outgoing_frame *frame_i = AWS_CONTAINER_OF(rev_iter, struct outgoing_frame, node);
if (to_add->def.high_priority == frame_i->def.high_priority) {
break;
}
rev_iter = aws_linked_list_prev(rev_iter);
}

aws_linked_list_insert_after(rev_iter, &to_add->node);
}

static int s_send_frame(
struct aws_websocket *websocket,
const struct aws_websocket_send_frame_options *options,
Expand All @@ -440,10 +428,6 @@ static int s_send_frame(
AWS_ASSERT(options);

/* Check for bad input. Log about non-obvious errors. */
if (options->high_priority && aws_websocket_is_data_frame(options->opcode)) {
AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Data frames cannot be sent as high-priority.", (void *)websocket);
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
if (options->payload_length > 0 && !options->stream_outgoing_payload) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_WEBSOCKET,
Expand Down Expand Up @@ -495,13 +479,12 @@ static int s_send_frame(

AWS_LOGF_DEBUG(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s priority=%s",
"id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s",
(void *)websocket,
options->opcode,
aws_websocket_opcode_str(options->opcode),
options->payload_length,
options->fin ? "T" : "F",
options->high_priority ? "high" : "normal");
options->fin ? "T" : "F");

if (should_schedule_task) {
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling synced data task.", (void *)websocket);
Expand Down Expand Up @@ -536,12 +519,7 @@ static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, voi
/* END CRITICAL SECTION */

if (!aws_linked_list_empty(&tmp_list)) {
do {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&tmp_list);
struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
s_enqueue_prioritized_frame(&websocket->thread_data.outgoing_frame_list, frame);
} while (!aws_linked_list_empty(&tmp_list));

aws_linked_list_move_all_back(&websocket->thread_data.outgoing_frame_list, &tmp_list);
s_try_write_outgoing_frames(websocket);
}
}
Expand Down Expand Up @@ -1313,9 +1291,6 @@ static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *use
websocket->thread_data.current_incoming_frame->payload_length = frame->payload_length;
websocket->thread_data.current_incoming_frame->opcode = frame->opcode;
websocket->thread_data.current_incoming_frame->fin = frame->fin;
websocket->thread_data.current_incoming_frame->rsv[0] = frame->rsv[0];
websocket->thread_data.current_incoming_frame->rsv[1] = frame->rsv[1];
websocket->thread_data.current_incoming_frame->rsv[2] = frame->rsv[2];

/* If CONTINUATION frames are expected, remember which type of data is being continued.
* RFC-6455 Section 5.4 Fragmentation */
Expand All @@ -1327,6 +1302,15 @@ static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *use
websocket->thread_data.continuation_of_opcode = frame->opcode;
}
}
} else if (frame->opcode == AWS_WEBSOCKET_OPCODE_PING) {
/* Prepare to store payload of PING so we can echo it back in the PONG */
aws_byte_buf_reset(&websocket->thread_data.incoming_ping_payload, false /*zero_contents*/);
/* Note: we are NOT calling aws_byte_buf_reserve().
* This works around an attack where a malicious peer CLAIMS they'll send a huge frame,
* which would case OOM if we did the reserve immediately.
* If a malicious peer wants to run us out of memory, they'll need to do
* it the costly way and actually send a billion bytes.
* Or we could impose our own internal limits, but for now this is simpler */
}

/* Invoke user cb */
Expand All @@ -1351,6 +1335,11 @@ static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data) {
AWS_ASSERT(websocket->thread_data.current_incoming_frame);
AWS_ASSERT(!websocket->thread_data.is_reading_stopped);

/* Store payload of PING so we can echo it back in the PONG */
if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_PING) {
aws_byte_buf_append_dynamic(&websocket->thread_data.incoming_ping_payload, &data);
}

if (websocket->thread_data.is_midchannel_handler) {
return s_decoder_on_midchannel_payload(websocket, data);
}
Expand Down Expand Up @@ -1455,11 +1444,57 @@ static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, stru
return AWS_OP_ERR;
}

/* When the websocket sends a frame automatically (PONG, CLOSE),
* this holds the payload. */
struct aws_websocket_autopayload {
struct aws_allocator *alloc;
struct aws_byte_buf buf;
struct aws_byte_cursor advancing_cursor;
};

static struct aws_websocket_autopayload *s_autopayload_new(
struct aws_allocator *alloc,
const struct aws_byte_buf *src) {

struct aws_websocket_autopayload *autopayload = aws_mem_calloc(alloc, 1, sizeof(struct aws_websocket_autopayload));
autopayload->alloc = alloc;
if (src->len > 0) {
aws_byte_buf_init_copy(&autopayload->buf, alloc, src);
autopayload->advancing_cursor = aws_byte_cursor_from_buf(&autopayload->buf);
}

return autopayload;
}

static void s_autopayload_destroy(struct aws_websocket_autopayload *autopayload) {
aws_byte_buf_clean_up(&autopayload->buf);
aws_mem_release(autopayload->alloc, autopayload);
}

static void s_autopayload_send_complete(struct aws_websocket *websocket, int error_code, void *user_data) {
(void)websocket;
(void)error_code;

struct aws_websocket_autopayload *autopayload = user_data;
s_autopayload_destroy(autopayload);
}

static bool s_autopayload_stream_outgoing_payload(
struct aws_websocket *websocket,
struct aws_byte_buf *out_buf,
void *user_data) {

(void)websocket;
struct aws_websocket_autopayload *autopayload = user_data;
aws_byte_buf_write_to_capacity(out_buf, &autopayload->advancing_cursor);
return true;
}

static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
AWS_ASSERT(websocket->thread_data.current_incoming_frame);

if (error_code == AWS_OP_SUCCESS) {
if (error_code == 0) {
/* If this was a CLOSE frame, don't read any more data. */
if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
AWS_LOGF_DEBUG(
Expand All @@ -1469,9 +1504,31 @@ static void s_complete_incoming_frame(struct aws_websocket *websocket, int error
websocket->thread_data.is_reading_stopped = true;

/* TODO: auto-close if there's a channel-handler to the right */
}

/* TODO: auto-respond to PING with PONG */
} else if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_PING) {
/* Automatically respond to a PING with a PONG */
if (!websocket->thread_data.is_writing_stopped) {
/* Optimization idea: avoid allocations/copies each time we send an auto-PONG.
* Maybe have a small autopayload pool, instead of allocating one each time.
* Maybe encode directly to aws_io_message, instead of copying to a buf, that's copied to a msg later.
* Maybe "std::move()" the aws_byte_bufs around instead of copying them. */
struct aws_websocket_autopayload *autopong =
s_autopayload_new(websocket->alloc, &websocket->thread_data.incoming_ping_payload);

struct aws_websocket_send_frame_options pong_frame = {
.opcode = AWS_WEBSOCKET_OPCODE_PONG,
.fin = true,
.payload_length = autopong->buf.len,
.stream_outgoing_payload = s_autopayload_stream_outgoing_payload,
.on_complete = s_autopayload_send_complete,
.user_data = autopong,
};

int send_err = s_send_frame(websocket, &pong_frame, false /*from_public_api*/);
/* Failure should be impossible. We already checked that writing is not stopped */
AWS_FATAL_ASSERT(!send_err && "Unexpected failure sending websocket PONG");
}
}
}

/* Invoke user cb */
Expand Down
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ add_test_case(websocket_handler_send_multiple_frames)
add_test_case(websocket_handler_send_huge_frame)
add_test_case(websocket_handler_send_payload_slowly)
add_test_case(websocket_handler_send_payload_with_pauses)
add_test_case(websocket_handler_send_high_priority_frame)
add_test_case(websocket_handler_sends_nothing_after_close_frame)
add_test_case(websocket_handler_send_frames_always_complete)
add_test_case(websocket_handler_send_one_io_msg_at_a_time)
Expand All @@ -209,6 +208,8 @@ add_test_case(websocket_handler_read_halts_if_payload_fn_returns_false)
add_test_case(websocket_handler_read_halts_if_complete_fn_returns_false)
add_test_case(websocket_handler_window_manual_increment)
add_test_case(websocket_handler_window_manual_increment_off_thread)
add_test_case(websocket_handler_sends_pong_automatically)
add_test_case(websocket_handler_wont_send_pong_after_close_frame)
add_test_case(websocket_midchannel_sanity_check)
add_test_case(websocket_midchannel_write_message)
add_test_case(websocket_midchannel_write_multiple_messages)
Expand Down
Loading

0 comments on commit 9bd58ca

Please sign in to comment.