diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 154c645..c63864f 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -81,7 +81,14 @@ let yield_writer t k = then failwith "yield_writer on closed conn" else if Optional_thunk.is_some t.wakeup_writer then failwith "yield_writer: only one callback can be registered at a time" - else t.wakeup_writer <- Optional_thunk.some k + else if is_active t then + let respd = current_respd_exn t in + match Respd.output_state respd with + | Wait -> t.wakeup_writer <- Optional_thunk.some k + | Consume -> Respd.on_more_output_available respd k + | Complete -> k () + else + t.wakeup_writer <- Optional_thunk.some k let wakeup_writer t = let f = t.wakeup_writer in diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 0e48aba..83e4b72 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -103,11 +103,10 @@ let yield_writer t k = | No_error -> if is_active t then let reqd = current_reqd_exn t in - begin match Reqd.output_state reqd with + match Reqd.output_state reqd with | Wait -> t.wakeup_writer <- Optional_thunk.some k | Consume -> Reqd.on_more_output_available reqd k | Complete -> k () - end else t.wakeup_writer <- Optional_thunk.some k | Error { response_state; _ } -> diff --git a/lib_test/test_client_connection.ml b/lib_test/test_client_connection.ml index 8e5c52d..c4501d6 100644 --- a/lib_test/test_client_connection.ml +++ b/lib_test/test_client_connection.ml @@ -569,8 +569,9 @@ let test_fixed_body () = ;; let test_fixed_body_persistent_connection () = + let writer_woken_up = ref false in let request' = Request.create - ~headers:(Headers.of_list ["Content-Length", "0"]) + ~headers:(Headers.of_list ["Content-Length", "2"]) `GET "/" in let response_handler response response_body = @@ -590,7 +591,11 @@ let test_fixed_body_persistent_connection () = in write_request t request'; writer_yielded t; + yield_writer t (fun () -> writer_woken_up := true); + Body.write_string body "hi"; Body.close_writer body; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; + write_string t "hi"; reader_ready t; read_response t (Response.create ~headers:(Headers.of_list []) `OK); reader_ready t; diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index cc6656a..f5c2d29 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -288,8 +288,7 @@ let test_asynchronous_streaming_response () = yield_writer t (fun () -> writer_woken_up := true; writer_yielded t); - Alcotest.(check bool) "Writer not woken up" - false !writer_woken_up; + Alcotest.(check bool) "Writer not woken up" false !writer_woken_up; read_request t request; let body = @@ -300,19 +299,16 @@ let test_asynchronous_streaming_response () = (* XXX(dpatti): This is an observation of a current behavior where the writer is awoken only to find that it was asked to yield again. It is cleaned up in another branch where we move the continuation off of the reqd/body. *) - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; let writer_woken_up = ref false in yield_writer t (fun () -> writer_woken_up := true; write_response t ~body:"Hello " response); Body.write_string body "Hello "; - Alcotest.(check bool) "Writer not woken up" - false !writer_woken_up; + Alcotest.(check bool) "Writer not woken up" false !writer_woken_up; Body.flush body ignore; - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; let writer_woken_up = ref false in writer_yielded t; @@ -321,11 +317,9 @@ let test_asynchronous_streaming_response () = write_string t "world!"; writer_closed t); Body.write_string body "world!"; - Alcotest.(check bool) "Writer not woken up" - false !writer_woken_up; + Alcotest.(check bool) "Writer not woken up" false !writer_woken_up; Body.close_writer body; - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up + Alcotest.(check bool) "Writer woken up" true !writer_woken_up ;; let test_asynchronous_streaming_response_with_immediate_flush () = @@ -341,8 +335,7 @@ let test_asynchronous_streaming_response_with_immediate_flush () = yield_writer t (fun () -> writer_woken_up := true; write_response t response); - Alcotest.(check bool) "Writer not woken up" - false !writer_woken_up; + Alcotest.(check bool) "Writer not woken up" false !writer_woken_up; read_request t request; let body = @@ -350,8 +343,7 @@ let test_asynchronous_streaming_response_with_immediate_flush () = | None -> failwith "no body found" | Some body -> body in - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; let writer_woken_up = ref false in writer_yielded t; @@ -359,8 +351,7 @@ let test_asynchronous_streaming_response_with_immediate_flush () = writer_woken_up := true; writer_closed t); Body.close_writer body; - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up + Alcotest.(check bool) "Writer woken up" true !writer_woken_up ;; let test_empty_fixed_streaming_response () = @@ -398,14 +389,35 @@ let test_multiple_get () = write_response t (Response.create `OK); ;; +let test_asynchronous_streaming_response_flush_immediately () = + let writer_woken_up = ref false in + let continue_response = ref (fun () -> ()) in + let request = Request.create `GET "/" in + let response = Response.create `OK in + let request_handler reqd = + let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd response in + continue_response := (fun () -> + Body.write_string body "hello"; + Body.close_writer body) + in + let t = create request_handler in + read_request t request; + write_response t response; + writer_yielded t; + yield_writer t (fun () -> writer_woken_up := true); + !continue_response (); + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; + write_string t "hello"; + writer_yielded t; +;; + let test_synchronous_error () = let writer_woken_up = ref false in let t = create ~error_handler synchronous_raise in yield_writer t (fun () -> writer_woken_up := true); read_request t (Request.create `GET "/"); reader_errored t; - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t ~msg:"Error response written" (Response.create `Internal_server_error) @@ -426,8 +438,7 @@ let test_synchronous_error_asynchronous_handling () = reader_errored t; writer_yielded t; !continue (); - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t ~msg:"Error response written" (Response.create `Internal_server_error) @@ -449,8 +460,7 @@ let test_asynchronous_error () = reader_yielded t; !continue (); reader_errored t; - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t ~msg:"Error response written" (Response.create `Internal_server_error) @@ -478,8 +488,7 @@ let test_asynchronous_error_asynchronous_handling () = writer_yielded t; !continue_error (); reader_errored t; - Alcotest.(check bool) "Writer woken up" - true !writer_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; write_response t ~msg:"Error response written" (Response.create `Internal_server_error) @@ -881,6 +890,7 @@ let tests = ; "single GET" , `Quick, test_single_get ; "multiple GETs" , `Quick, test_multiple_get ; "asynchronous response" , `Quick, test_asynchronous_response + ; "asynchronous response, asynchronous body", `Quick, test_asynchronous_streaming_response_flush_immediately ; "echo POST" , `Quick, test_echo_post ; "streaming response" , `Quick, test_streaming_response ; "asynchronous streaming response", `Quick, test_asynchronous_streaming_response