Skip to content

Commit

Permalink
fix-eof: fix bug in eof handling
Browse files Browse the repository at this point in the history
  • Loading branch information
seliopou committed Nov 11, 2018
1 parent 0b05fc6 commit b35e238
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 45 deletions.
8 changes: 6 additions & 2 deletions async/httpaf_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ module Server = struct
read fd buffer
>>> begin function
| `Eof ->
Server_connection.shutdown_reader conn;
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Server_connection.read_eof conn bigstring ~off ~len)
|> ignore;
reader_thread ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Expand Down Expand Up @@ -170,7 +172,9 @@ module Client = struct
read fd buffer
>>> begin function
| `Eof ->
Client_connection.shutdown_reader conn;
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Client_connection.read_eof conn bigstring ~off ~len)
|> ignore;
reader_thread ()
| `Ok _ ->
Buffer.get buffer ~f:(fun bigstring ~off ~len ->
Expand Down
16 changes: 11 additions & 5 deletions lib/client_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Oneshot = struct
;;

let shutdown_reader t =
Reader.close t.reader;
Reader.force_close t.reader;
begin match !(t.state) with
| Awaiting_response | Closed -> ()
| Received_response(_, response_body) ->
Expand Down Expand Up @@ -138,8 +138,8 @@ module Oneshot = struct
if not (Body.is_closed response_body)
then Reader.next t.reader
else begin
Reader.close t.reader;
Reader.next t.reader
Reader.force_close t.reader;
Reader.next t.reader
end
;;

Expand All @@ -155,12 +155,18 @@ module Oneshot = struct
| (`Read | `Close) as operation -> operation
;;

let read t bs ~off ~len =
let consumed = Reader.read t.reader bs ~off ~len in
let read_with_more t bs ~off ~len more =
let consumed = Reader.read_with_more t.reader bs ~off ~len more in
flush_response_body t;
consumed
;;

let read t bs ~off ~len =
read_with_more t bs ~off ~len Incomplete

let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let next_write_operation t =
flush_request_body t;
Writer.next t.writer
Expand Down
28 changes: 15 additions & 13 deletions lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -733,18 +733,19 @@ module Server_connection : sig
returns a [`Read] value and additional input is available for the
connection to consume. *)

val read_eof : _ t -> Bigstring.t -> off:int -> len:int -> int
(** [read t bigstring ~off ~len] reads bytes of input from the provided range
of [bigstring] and returns the number of bytes consumed by the
connection. {!read} should be called after {!next_read_operation}
returns a [`Read] and an EOF has been received from the communication
channel. The connection will attempt to consume any buffered input and
then shutdown the HTTP parser for the connection. *)

val yield_reader : _ t -> (unit -> unit) -> unit
(** [yield_reader t continue] registers with the connection to call
[continue] when reading should resume. {!yield_reader} should be called
after {next_read_operation} returns a [`Yield] value. *)

val shutdown_reader : _ t -> unit
(** [shutdown_reader t] shuts down the read processor for the connection. All
subsequent calls to {!next_read_operations} will return [`Close].
{!shutdown_reader} should be called after {!next_read_operation} returns
a [`Read] value and there is no further input available for the
connection to consume. *)

val next_write_operation : _ t -> [
| `Write of Bigstring.t IOVec.t list
| `Yield
Expand Down Expand Up @@ -820,12 +821,13 @@ module Client_connection : sig
returns a [`Read] value and additional input is available for the
connection to consume. *)

val shutdown_reader : t -> unit
(** [shutdown_reader t] shuts down the read processor for the connection. All
subsequent calls to {!next_read_operations} will return [`Close].
{!shutdown_reader} should be called after {!next_read_operation} returns
a [`Read] value and there is no further input available for the
connection to consume. *)
val read_eof : t -> Bigstring.t -> off:int -> len:int -> int
(** [read t bigstring ~off ~len] reads bytes of input from the provided range
of [bigstring] and returns the number of bytes consumed by the
connection. {!read} should be called after {!next_read_operation}
returns a [`Read] and an EOF has been received from the communication
channel. The connection will attempt to consume any buffered input and
then shutdown the HTTP parser for the connection. *)

val next_write_operation : t -> [
| `Write of Bigstring.t IOVec.t list
Expand Down
2 changes: 1 addition & 1 deletion lib/jbuild
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
((name httpaf)
(public_name httpaf)
(libraries
(angstrom faraday result))
(angstrom faraday bigstringaf result))
(flags (:standard -safe-string))))
29 changes: 19 additions & 10 deletions lib/parse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ module Reader = struct
create parser
;;

let close t =
t.closed <- true

let is_closed t =
t.closed
Expand Down Expand Up @@ -291,14 +289,25 @@ module Reader = struct
| _ -> assert false
;;

let rec read t bs ~off ~len =
match t.parse_state with
| Fail _ -> 0
| Done ->
start t (AU.parse t.parser);
read t bs ~off ~len;
| Partial continue ->
transition t (continue bs Incomplete ~off ~len)
let rec read_with_more t bs ~off ~len more =
let consumed =
match t.parse_state with
| Fail _ -> 0
| Done ->
start t (AU.parse t.parser);
read_with_more t bs ~off ~len more;
| Partial continue ->
transition t (continue bs more ~off ~len)
in
begin match more with
| Complete -> t.closed <- true;
| Incomplete -> ()
end;
consumed;
;;

let force_close t =
ignore (read_with_more t Bigstringaf.empty ~off:0 ~len:0 Complete : int);
;;

let next t =
Expand Down
12 changes: 9 additions & 3 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer

let shutdown_reader t =
Reader.close t.reader;
Reader.force_close t.reader;
if is_active t
then Reqd.close_request_body (current_reqd_exn t)
else wakeup_reader t
Expand Down Expand Up @@ -241,13 +241,19 @@ let next_read_operation t =
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
| (`Read | `Yield | `Close) as operation -> operation

let read t bs ~off ~len =
let consumed = Reader.read t.reader bs ~off ~len in
let read_with_more t bs ~off ~len more =
let consumed = Reader.read_with_more t.reader bs ~off ~len more in
if is_active t then
Reqd.flush_request_body (current_reqd_exn t);
consumed
;;

let read t bs ~off ~len =
read_with_more t bs ~off ~len Incomplete

let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let yield_reader t k =
on_wakeup_reader t k
;;
Expand Down
12 changes: 8 additions & 4 deletions lib_test/simulator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ let test_server ~input ~output ~handler () =
else Bigstring.sub ~off:result input, reads'
| `Read, [] ->
debug " server iloop: eof";
Server_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Server_connection.read_eof conn input ~off:0 ~len:input_len : int);
bigstring_empty, []
| _ , [] ->
debug " server iloop: eof";
Server_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Server_connection.read_eof conn input ~off:0 ~len:input_len : int);
bigstring_empty, []
| `Close , _ ->
debug " server iloop: close(ok)"; input, []
Expand Down Expand Up @@ -167,11 +169,13 @@ let test_client ~request ~request_body_writes ~response_stream () =
else Bigstring.sub ~off:result input, reads'
| `Read, [] ->
debug " client iloop: eof";
Client_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Client_connection.read_eof conn input ~off:0 ~len:input_len : int);
input, []
| _ , [] ->
debug " client iloop: eof";
Client_connection.shutdown_reader conn;
let input_len = Bigstring.length input in
ignore (Client_connection.read_eof conn input ~off:0 ~len:input_len : int);
input, []
| `Close , _ ->
debug " client iloop: close(ok)";
Expand Down
16 changes: 9 additions & 7 deletions lib_test/test_httpaf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,25 @@ module Server_connection = struct
`Read (next_read_operation t);
;;

let test_shutdown_reader_is_closed () =
let test_reader_is_closed_after_eof () =
let t = create default_request_handler in
shutdown_reader t;
let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in
Alcotest.(check int) "read_eof with no input returns 0" 0 c;
Alcotest.(check (of_pp Read_operation.pp_hum)) "Shutting down a reader closes it"
`Close (next_read_operation t);

let t = create default_request_handler in
let consumed = read t Bigstringaf.empty ~off:0 ~len:0 in
assert (consumed = 0);
shutdown_reader t;
let c = read t Bigstringaf.empty ~off:0 ~len:0 in
Alcotest.(check int) "read with no input returns 0" 0 c;
let c = read_eof t Bigstringaf.empty ~off:0 ~len:0; in
Alcotest.(check int) "read_eof with no input returns 0" 0 c;
Alcotest.(check (of_pp Read_operation.pp_hum)) "Shutting down a reader closes it"
`Close (next_read_operation t);
;;

let tests =
[ "initial reader state" , `Quick, test_initial_reader_state
; "shutdown reader closed", `Quick, test_shutdown_reader_is_closed
[ "initial reader state" , `Quick, test_initial_reader_state
; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof
]

end
Expand Down

0 comments on commit b35e238

Please sign in to comment.