Skip to content

Commit

Permalink
error responses: drain writer if an exn arrives while handling an err…
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed May 11, 2020
1 parent 7b7a432 commit e1db9a3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 66 deletions.
1 change: 1 addition & 0 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ let set_error_and_handle ?request t error =
* reported (we're already handling an error and e.g. the writing channel
* is closed). Just shut down the connection in that case.
*)
Writer.close_and_drain t.writer;
shutdown t
end

Expand Down
14 changes: 4 additions & 10 deletions lib_test/test_client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ let reader_yielded t =
`Yield (next_read_operation t :> [`Close | `Read | `Yield]);
;;

let reader_closed t =
Alcotest.check read_operation "Reader is closed"
`Close (next_read_operation t :> [`Close | `Read | `Yield])

let write_string ?(msg="output written") t str =
let len = String.length str in
Alcotest.(check (option string)) msg
Expand All @@ -57,10 +61,6 @@ let writer_closed t =
(`Close 0) (next_write_operation t);
;;

let reader_closed t =
Alcotest.check read_operation "Reader is closed"
`Close (next_read_operation t :> [`Close | `Read | `Yield])

let connection_is_shutdown t =
reader_closed t;
writer_closed t;
Expand Down Expand Up @@ -641,9 +641,3 @@ let tests =
; "Fixed body doesn't shut down the writer if connection is persistent",`Quick, test_fixed_body_persistent_connection
; "Client support for upgrading a connection", `Quick, test_client_upgrade
]

(*
* TODO:
* - test client connection error handling
*
*)
98 changes: 42 additions & 56 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ let read_request t r =
read_string t request_string
;;

let reader_ready t =
Alcotest.check read_operation "Reader is ready"
let reader_ready ?(msg="Reader is ready") t =
Alcotest.check read_operation msg
`Read (next_read_operation t);
;;

Expand All @@ -27,6 +27,13 @@ let reader_yielded t =
`Yield (next_read_operation t);
;;

let reader_closed ?(msg="Reader is closed") t =
Alcotest.check read_operation msg
`Close (next_read_operation t);
;;

let reader_errored = reader_closed ~msg:"Error shuts down the reader"

let write_string ?(msg="output written") t str =
let len = String.length str in
Alcotest.(check (option string)) msg
Expand Down Expand Up @@ -55,8 +62,7 @@ let writer_closed ?(unread = 0) t =
;;

let connection_is_shutdown t =
Alcotest.check read_operation "Reader is closed"
`Close (next_read_operation t);
reader_closed t;
writer_closed t;
;;

Expand Down Expand Up @@ -108,8 +114,7 @@ let error_handler ?request:_ _error start_response =

let test_initial_reader_state () =
let t = create default_request_handler in
Alcotest.check read_operation "A new reader wants input"
`Read (next_read_operation t);
reader_ready ~msg:"A new reader wants input" t
;;

let test_reader_is_closed_after_eof () =
Expand Down Expand Up @@ -398,8 +403,7 @@ let test_synchronous_error () =
let t = create ~error_handler synchronous_raise in
yield_writer t (fun () -> writer_woken_up := true);
read_request t (Request.create `GET "/");
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
write_response t
Expand All @@ -419,8 +423,7 @@ let test_synchronous_error_asynchronous_handling () =
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
read_request t (Request.create `GET "/");
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
writer_yielded t;
!continue ();
Alcotest.(check bool) "Writer woken up"
Expand All @@ -445,8 +448,7 @@ let test_asynchronous_error () =
writer_yielded t;
reader_yielded t;
!continue ();
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
write_response t
Expand Down Expand Up @@ -475,8 +477,7 @@ let test_asynchronous_error_asynchronous_handling () =
!continue_request ();
writer_yielded t;
!continue_error ();
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
write_response t
Expand Down Expand Up @@ -510,8 +511,7 @@ let test_chunked_encoding () =
write_string t
~msg:"Final chunk written"
"0\r\n\r\n";
Alcotest.check read_operation "Keep-alive"
`Read (next_read_operation t);
reader_ready ~msg:"Keep-alive" t;
;;

let test_blocked_write_on_chunked_encoding () =
Expand Down Expand Up @@ -573,8 +573,7 @@ Connection: close\r\n\
Accept: application/json, text/plain, */*\r\n\
Accept-Language: en-US,en;q=0.5\r\n\r\n";
writer_yielded t;
Alcotest.check read_operation "reader closed"
`Close (next_read_operation t);
reader_closed t;
!continue_response ();
writer_closed t;
;;
Expand All @@ -593,18 +592,15 @@ let basic_handler body reqd =

let test_malformed conn =
let writer_woken_up = ref false in
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation conn);
writer_yielded conn;
yield_writer conn (fun () -> writer_woken_up := true);
let len = String.length malformed_request_string in
let input = Bigstringaf.of_string malformed_request_string ~off:0 ~len in
let c = read conn input ~off:0 ~len in
Alcotest.(check bool) "read doesn't consume all input"
true (c < String.length malformed_request_string);
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation conn);
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up
reader_errored conn;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up

let test_malformed_request () =
let t = create ~error_handler (basic_handler "") in
Expand Down Expand Up @@ -666,10 +662,12 @@ let test_malformed_request_eof () =

let streaming_error_handler continue_error ?request:_ _error start_response =
let resp_body = start_response Headers.empty in
Body.write_string resp_body "got an error\n";
continue_error := (fun () ->
Body.write_string resp_body "more output";
Body.close_writer resp_body)
Body.write_string resp_body "got an error\n";
Body.flush resp_body (fun () ->
continue_error := (fun () ->
Body.write_string resp_body "more output";
Body.close_writer resp_body)))
;;

let test_malformed_request_streaming_error_response () =
Expand All @@ -680,25 +678,24 @@ let test_malformed_request_streaming_error_response () =
streaming_error_handler continue_error ?request error start_response)
in
let t = create ~error_handler (basic_handler "") in
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
let c = read_string_eof t eof_request_string in
Alcotest.(check int) "read consumes all input"
(String.length eof_request_string) c;
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
!continue_error ();
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
writer_woken_up := false;
write_response t
(Response.create `Bad_request ~headers:Headers.empty);
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
!continue_error ();
write_string t ~msg:"First part of the response body written" "got an error\n";
Alcotest.(check bool) "Writer woken up once more input is available"
true !writer_woken_up;
!continue_error ();
write_string t ~msg:"Rest of the error response written" "more output";
writer_closed t;
Alcotest.(check bool) "Connection is shutdown" true (is_closed t);
Expand All @@ -724,14 +721,12 @@ let test_malformed_request_chunked_error_response () =
chunked_error_handler continue_error ?request error start_response)
in
let t = create ~error_handler (basic_handler "") in
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
let c = read_string_eof t eof_request_string in
Alcotest.(check int) "read consumes all input"
(String.length eof_request_string) c;
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
Alcotest.(check bool) "Writer hasn't woken up yet" false !writer_woken_up;
!continue_error ();
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
Expand All @@ -741,8 +736,7 @@ let test_malformed_request_chunked_error_response () =
~body:"8\r\nchunk 1\n\r\n"
(Response.create `Bad_request
~headers:(Headers.of_list ["transfer-encoding", "chunked"]));
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
!continue_error ();
write_string t
Expand Down Expand Up @@ -771,24 +765,20 @@ let test_malformed_request_double_report_exn () =
streaming_error_handler continue_error ?request error start_response)
in
let t = create ~error_handler (basic_handler "") in
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
let c = read_string_eof t eof_request_string in
Alcotest.(check int) "read consumes all input"
(String.length eof_request_string) c;
Alcotest.check read_operation "Error shuts down the reader"
`Close (next_read_operation t);
reader_errored t;
Alcotest.(check bool) "Writer hasn't woken up yet" false !writer_woken_up;
!continue_error ();
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
writer_woken_up := false;
write_response t
~body:"got an error\n"
(Response.create `Bad_request ~headers:Headers.empty);
Alcotest.check write_operation "Writer is in a yield state"
`Yield (next_write_operation t);
writer_yielded t;
write_eof t;
report_exn t (Failure "broken pipe");
writer_closed t ~unread:28;
Alcotest.(check bool) "Connection is shutdown" true (is_closed t);
;;

Expand All @@ -803,13 +793,11 @@ let test_immediate_flush_empty_body () =
in
let t = create ~error_handler request_handler in
reader_ready t;
yield_writer t (fun () ->
write_response t ~body:"" response);
yield_writer t (fun () -> write_response t ~body:"" response);
read_request t (Request.create `GET "/");
yield_reader t (fun () -> reader_woken_up := true);
writer_yielded t;
Alcotest.(check bool) "Reader woken up"
true !reader_woken_up;
Alcotest.(check bool) "Reader woken up" true !reader_woken_up;
;;

let test_empty_body_no_immediate_flush () =
Expand All @@ -828,8 +816,7 @@ let test_empty_body_no_immediate_flush () =
yield_reader t (fun () -> reader_woken_up := true);
write_response t ~body:"" response;
writer_yielded t;
Alcotest.(check bool) "Reader woken up"
true !reader_woken_up;
Alcotest.(check bool) "Reader woken up" true !reader_woken_up;
;;

let test_yield_before_starting_a_response () =
Expand All @@ -850,8 +837,7 @@ let test_yield_before_starting_a_response () =
!continue_response ();
write_response t ~body:"" response;
writer_yielded t;
Alcotest.(check bool) "Reader woken up"
true !reader_woken_up;
Alcotest.(check bool) "Reader woken up" true !reader_woken_up;
;;

let tests =
Expand Down

0 comments on commit e1db9a3

Please sign in to comment.