Skip to content

Commit

Permalink
client: transfer the writer callback to the request body when yielding (
Browse files Browse the repository at this point in the history
#60)

the writer on a request that has been sent

- applies the same fix as in #58 to the client connection
- adds additional tests that exercise the same behavior on the server
  • Loading branch information
anmonteiro committed May 11, 2020
1 parent d957cdf commit a6d973f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 30 deletions.
9 changes: 8 additions & 1 deletion lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ let yield_writer t k =
then failwith "yield_writer on closed conn"
else if Optional_thunk.is_some t.wakeup_writer
then failwith "yield_writer: only one callback can be registered at a time"
else t.wakeup_writer <- Optional_thunk.some k
else if is_active t then
let respd = current_respd_exn t in
match Respd.output_state respd with
| Wait -> t.wakeup_writer <- Optional_thunk.some k
| Consume -> Respd.on_more_output_available respd k
| Complete -> k ()
else
t.wakeup_writer <- Optional_thunk.some k

let wakeup_writer t =
let f = t.wakeup_writer in
Expand Down
3 changes: 1 addition & 2 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,10 @@ let yield_writer t k =
| No_error ->
if is_active t then
let reqd = current_reqd_exn t in
begin match Reqd.output_state reqd with
match Reqd.output_state reqd with
| Wait -> t.wakeup_writer <- Optional_thunk.some k
| Consume -> Reqd.on_more_output_available reqd k
| Complete -> k ()
end
else
t.wakeup_writer <- Optional_thunk.some k
| Error { response_state; _ } ->
Expand Down
7 changes: 6 additions & 1 deletion lib_test/test_client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,9 @@ let test_fixed_body () =
;;

let test_fixed_body_persistent_connection () =
let writer_woken_up = ref false in
let request' = Request.create
~headers:(Headers.of_list ["Content-Length", "0"])
~headers:(Headers.of_list ["Content-Length", "2"])
`GET "/"
in
let response_handler response response_body =
Expand All @@ -590,7 +591,11 @@ let test_fixed_body_persistent_connection () =
in
write_request t request';
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
Body.write_string body "hi";
Body.close_writer body;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
write_string t "hi";
reader_ready t;
read_response t (Response.create ~headers:(Headers.of_list []) `OK);
reader_ready t;
Expand Down
62 changes: 36 additions & 26 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ let test_asynchronous_streaming_response () =
yield_writer t (fun () ->
writer_woken_up := true;
writer_yielded t);
Alcotest.(check bool) "Writer not woken up"
false !writer_woken_up;
Alcotest.(check bool) "Writer not woken up" false !writer_woken_up;

read_request t request;
let body =
Expand All @@ -300,19 +299,16 @@ let test_asynchronous_streaming_response () =
(* XXX(dpatti): This is an observation of a current behavior where the writer
is awoken only to find that it was asked to yield again. It is cleaned up
in another branch where we move the continuation off of the reqd/body. *)
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
let writer_woken_up = ref false in
yield_writer t (fun () ->
writer_woken_up := true;
write_response t ~body:"Hello " response);

Body.write_string body "Hello ";
Alcotest.(check bool) "Writer not woken up"
false !writer_woken_up;
Alcotest.(check bool) "Writer not woken up" false !writer_woken_up;
Body.flush body ignore;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;

let writer_woken_up = ref false in
writer_yielded t;
Expand All @@ -321,11 +317,9 @@ let test_asynchronous_streaming_response () =
write_string t "world!";
writer_closed t);
Body.write_string body "world!";
Alcotest.(check bool) "Writer not woken up"
false !writer_woken_up;
Alcotest.(check bool) "Writer not woken up" false !writer_woken_up;
Body.close_writer body;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up
Alcotest.(check bool) "Writer woken up" true !writer_woken_up
;;

let test_asynchronous_streaming_response_with_immediate_flush () =
Expand All @@ -341,26 +335,23 @@ let test_asynchronous_streaming_response_with_immediate_flush () =
yield_writer t (fun () ->
writer_woken_up := true;
write_response t response);
Alcotest.(check bool) "Writer not woken up"
false !writer_woken_up;
Alcotest.(check bool) "Writer not woken up" false !writer_woken_up;

read_request t request;
let body =
match !body with
| None -> failwith "no body found"
| Some body -> body
in
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;

let writer_woken_up = ref false in
writer_yielded t;
yield_writer t (fun () ->
writer_woken_up := true;
writer_closed t);
Body.close_writer body;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up
Alcotest.(check bool) "Writer woken up" true !writer_woken_up
;;

let test_empty_fixed_streaming_response () =
Expand Down Expand Up @@ -398,14 +389,35 @@ let test_multiple_get () =
write_response t (Response.create `OK);
;;

let test_asynchronous_streaming_response_flush_immediately () =
let writer_woken_up = ref false in
let continue_response = ref (fun () -> ()) in
let request = Request.create `GET "/" in
let response = Response.create `OK in
let request_handler reqd =
let body = Reqd.respond_with_streaming ~flush_headers_immediately:true reqd response in
continue_response := (fun () ->
Body.write_string body "hello";
Body.close_writer body)
in
let t = create request_handler in
read_request t request;
write_response t response;
writer_yielded t;
yield_writer t (fun () -> writer_woken_up := true);
!continue_response ();
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
write_string t "hello";
writer_yielded t;
;;

let test_synchronous_error () =
let writer_woken_up = ref false in
let t = create ~error_handler synchronous_raise in
yield_writer t (fun () -> writer_woken_up := true);
read_request t (Request.create `GET "/");
reader_errored t;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
write_response t
~msg:"Error response written"
(Response.create `Internal_server_error)
Expand All @@ -426,8 +438,7 @@ let test_synchronous_error_asynchronous_handling () =
reader_errored t;
writer_yielded t;
!continue ();
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
write_response t
~msg:"Error response written"
(Response.create `Internal_server_error)
Expand All @@ -449,8 +460,7 @@ let test_asynchronous_error () =
reader_yielded t;
!continue ();
reader_errored t;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
write_response t
~msg:"Error response written"
(Response.create `Internal_server_error)
Expand Down Expand Up @@ -478,8 +488,7 @@ let test_asynchronous_error_asynchronous_handling () =
writer_yielded t;
!continue_error ();
reader_errored t;
Alcotest.(check bool) "Writer woken up"
true !writer_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
write_response t
~msg:"Error response written"
(Response.create `Internal_server_error)
Expand Down Expand Up @@ -881,6 +890,7 @@ let tests =
; "single GET" , `Quick, test_single_get
; "multiple GETs" , `Quick, test_multiple_get
; "asynchronous response" , `Quick, test_asynchronous_response
; "asynchronous response, asynchronous body", `Quick, test_asynchronous_streaming_response_flush_immediately
; "echo POST" , `Quick, test_echo_post
; "streaming response" , `Quick, test_streaming_response
; "asynchronous streaming response", `Quick, test_asynchronous_streaming_response
Expand Down

0 comments on commit a6d973f

Please sign in to comment.