From e1db9a366c1ef394f58440e67feb51abff662c53 Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sat, 18 Apr 2020 19:19:33 -0700 Subject: [PATCH] error responses: drain writer if an exn arrives while handling an error (#57) --- lib/server_connection.ml | 1 + lib_test/test_client_connection.ml | 14 ++--- lib_test/test_server_connection.ml | 98 +++++++++++++----------------- 3 files changed, 47 insertions(+), 66 deletions(-) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 550bbb12..f3212f8c 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -218,6 +218,7 @@ let set_error_and_handle ?request t error = * reported (we're already handling an error and e.g. the writing channel * is closed). Just shut down the connection in that case. *) + Writer.close_and_drain t.writer; shutdown t end diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index c933e279..8e5c52dd 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -34,6 +34,10 @@ let reader_yielded t = `Yield (next_read_operation t :> [`Close | `Read | `Yield]); ;; +let reader_closed t = + Alcotest.check read_operation "Reader is closed" + `Close (next_read_operation t :> [`Close | `Read | `Yield]) + let write_string ?(msg="output written") t str = let len = String.length str in Alcotest.(check (option string)) msg @@ -57,10 +61,6 @@ let writer_closed t = (`Close 0) (next_write_operation t); ;; -let reader_closed t = - Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield]) - let connection_is_shutdown t = reader_closed t; writer_closed t; @@ -641,9 +641,3 @@ let tests = ; "Fixed body doesn't shut down the writer if connection is persistent",`Quick, test_fixed_body_persistent_connection ; "Client support for upgrading a connection", `Quick, test_client_upgrade ] - -(* - * TODO: - * - test client connection error handling - * - *) diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 9289ac11..133567bd 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -17,8 +17,8 @@ let read_request t r = read_string t request_string ;; -let reader_ready t = - Alcotest.check read_operation "Reader is ready" +let reader_ready ?(msg="Reader is ready") t = + Alcotest.check read_operation msg `Read (next_read_operation t); ;; @@ -27,6 +27,13 @@ let reader_yielded t = `Yield (next_read_operation t); ;; +let reader_closed ?(msg="Reader is closed") t = + Alcotest.check read_operation msg + `Close (next_read_operation t); +;; + +let reader_errored = reader_closed ~msg:"Error shuts down the reader" + let write_string ?(msg="output written") t str = let len = String.length str in Alcotest.(check (option string)) msg @@ -55,8 +62,7 @@ let writer_closed ?(unread = 0) t = ;; let connection_is_shutdown t = - Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t); + reader_closed t; writer_closed t; ;; @@ -108,8 +114,7 @@ let error_handler ?request:_ _error start_response = let test_initial_reader_state () = let t = create default_request_handler in - Alcotest.check read_operation "A new reader wants input" - `Read (next_read_operation t); + reader_ready ~msg:"A new reader wants input" t ;; let test_reader_is_closed_after_eof () = @@ -398,8 +403,7 @@ let test_synchronous_error () = let t = create ~error_handler synchronous_raise in yield_writer t (fun () -> writer_woken_up := true); read_request t (Request.create `GET "/"); - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t @@ -419,8 +423,7 @@ let test_synchronous_error_asynchronous_handling () = writer_yielded t; yield_writer t (fun () -> writer_woken_up := true); read_request t (Request.create `GET "/"); - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; writer_yielded t; !continue (); Alcotest.(check bool) "Writer woken up" @@ -445,8 +448,7 @@ let test_asynchronous_error () = writer_yielded t; reader_yielded t; !continue (); - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t @@ -475,8 +477,7 @@ let test_asynchronous_error_asynchronous_handling () = !continue_request (); writer_yielded t; !continue_error (); - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t @@ -510,8 +511,7 @@ let test_chunked_encoding () = write_string t ~msg:"Final chunk written" "0\r\n\r\n"; - Alcotest.check read_operation "Keep-alive" - `Read (next_read_operation t); + reader_ready ~msg:"Keep-alive" t; ;; let test_blocked_write_on_chunked_encoding () = @@ -573,8 +573,7 @@ Connection: close\r\n\ Accept: application/json, text/plain, */*\r\n\ Accept-Language: en-US,en;q=0.5\r\n\r\n"; writer_yielded t; - Alcotest.check read_operation "reader closed" - `Close (next_read_operation t); + reader_closed t; !continue_response (); writer_closed t; ;; @@ -593,18 +592,15 @@ let basic_handler body reqd = let test_malformed conn = let writer_woken_up = ref false in - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation conn); + writer_yielded conn; yield_writer conn (fun () -> writer_woken_up := true); let len = String.length malformed_request_string in let input = Bigstringaf.of_string malformed_request_string ~off:0 ~len in let c = read conn input ~off:0 ~len in Alcotest.(check bool) "read doesn't consume all input" true (c < String.length malformed_request_string); - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation conn); - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up + reader_errored conn; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up let test_malformed_request () = let t = create ~error_handler (basic_handler "") in @@ -666,10 +662,12 @@ let test_malformed_request_eof () = let streaming_error_handler continue_error ?request:_ _error start_response = let resp_body = start_response Headers.empty in - Body.write_string resp_body "got an error\n"; continue_error := (fun () -> - Body.write_string resp_body "more output"; - Body.close_writer resp_body) + Body.write_string resp_body "got an error\n"; + Body.flush resp_body (fun () -> + continue_error := (fun () -> + Body.write_string resp_body "more output"; + Body.close_writer resp_body))) ;; let test_malformed_request_streaming_error_response () = @@ -680,25 +678,24 @@ let test_malformed_request_streaming_error_response () = streaming_error_handler continue_error ?request error start_response) in let t = create ~error_handler (basic_handler "") in - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + writer_yielded t; yield_writer t (fun () -> writer_woken_up := true); let c = read_string_eof t eof_request_string in Alcotest.(check int) "read consumes all input" (String.length eof_request_string) c; - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; !continue_error (); Alcotest.(check bool) "Writer woken up" true !writer_woken_up; writer_woken_up := false; write_response t (Response.create `Bad_request ~headers:Headers.empty); - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + writer_yielded t; yield_writer t (fun () -> writer_woken_up := true); !continue_error (); + write_string t ~msg:"First part of the response body written" "got an error\n"; Alcotest.(check bool) "Writer woken up once more input is available" true !writer_woken_up; + !continue_error (); write_string t ~msg:"Rest of the error response written" "more output"; writer_closed t; Alcotest.(check bool) "Connection is shutdown" true (is_closed t); @@ -724,14 +721,12 @@ let test_malformed_request_chunked_error_response () = chunked_error_handler continue_error ?request error start_response) in let t = create ~error_handler (basic_handler "") in - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + writer_yielded t; yield_writer t (fun () -> writer_woken_up := true); let c = read_string_eof t eof_request_string in Alcotest.(check int) "read consumes all input" (String.length eof_request_string) c; - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; Alcotest.(check bool) "Writer hasn't woken up yet" false !writer_woken_up; !continue_error (); Alcotest.(check bool) "Writer woken up" true !writer_woken_up; @@ -741,8 +736,7 @@ let test_malformed_request_chunked_error_response () = ~body:"8\r\nchunk 1\n\r\n" (Response.create `Bad_request ~headers:(Headers.of_list ["transfer-encoding", "chunked"])); - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + writer_yielded t; yield_writer t (fun () -> writer_woken_up := true); !continue_error (); write_string t @@ -771,24 +765,20 @@ let test_malformed_request_double_report_exn () = streaming_error_handler continue_error ?request error start_response) in let t = create ~error_handler (basic_handler "") in - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + writer_yielded t; yield_writer t (fun () -> writer_woken_up := true); let c = read_string_eof t eof_request_string in Alcotest.(check int) "read consumes all input" (String.length eof_request_string) c; - Alcotest.check read_operation "Error shuts down the reader" - `Close (next_read_operation t); + reader_errored t; Alcotest.(check bool) "Writer hasn't woken up yet" false !writer_woken_up; !continue_error (); Alcotest.(check bool) "Writer woken up" true !writer_woken_up; writer_woken_up := false; - write_response t - ~body:"got an error\n" - (Response.create `Bad_request ~headers:Headers.empty); - Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + writer_yielded t; + write_eof t; report_exn t (Failure "broken pipe"); + writer_closed t ~unread:28; Alcotest.(check bool) "Connection is shutdown" true (is_closed t); ;; @@ -803,13 +793,11 @@ let test_immediate_flush_empty_body () = in let t = create ~error_handler request_handler in reader_ready t; - yield_writer t (fun () -> - write_response t ~body:"" response); + yield_writer t (fun () -> write_response t ~body:"" response); read_request t (Request.create `GET "/"); yield_reader t (fun () -> reader_woken_up := true); writer_yielded t; - Alcotest.(check bool) "Reader woken up" - true !reader_woken_up; + Alcotest.(check bool) "Reader woken up" true !reader_woken_up; ;; let test_empty_body_no_immediate_flush () = @@ -828,8 +816,7 @@ let test_empty_body_no_immediate_flush () = yield_reader t (fun () -> reader_woken_up := true); write_response t ~body:"" response; writer_yielded t; - Alcotest.(check bool) "Reader woken up" - true !reader_woken_up; + Alcotest.(check bool) "Reader woken up" true !reader_woken_up; ;; let test_yield_before_starting_a_response () = @@ -850,8 +837,7 @@ let test_yield_before_starting_a_response () = !continue_response (); write_response t ~body:"" response; writer_yielded t; - Alcotest.(check bool) "Reader woken up" - true !reader_woken_up; + Alcotest.(check bool) "Reader woken up" true !reader_woken_up; ;; let tests =