diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 3fc22e8c..b4935864 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -107,7 +107,9 @@ module Server = struct read fd buffer >>> begin function | `Eof -> - Server_connection.shutdown_reader conn; + Buffer.get buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read_eof conn bigstring ~off ~len) + |> ignore; reader_thread () | `Ok _ -> Buffer.get buffer ~f:(fun bigstring ~off ~len -> @@ -170,7 +172,9 @@ module Client = struct read fd buffer >>> begin function | `Eof -> - Client_connection.shutdown_reader conn; + Buffer.get buffer ~f:(fun bigstring ~off ~len -> + Client_connection.read_eof conn bigstring ~off ~len) + |> ignore; reader_thread () | `Ok _ -> Buffer.get buffer ~f:(fun bigstring ~off ~len -> diff --git a/lib/client_connection.ml b/lib/client_connection.ml index f6af6751..55e76a89 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -92,7 +92,7 @@ module Oneshot = struct ;; let shutdown_reader t = - Reader.close t.reader; + Reader.force_close t.reader; begin match !(t.state) with | Awaiting_response | Closed -> () | Received_response(_, response_body) -> @@ -138,8 +138,8 @@ module Oneshot = struct if not (Body.is_closed response_body) then Reader.next t.reader else begin - Reader.close t.reader; - Reader.next t.reader + Reader.force_close t.reader; + Reader.next t.reader end ;; @@ -155,12 +155,18 @@ module Oneshot = struct | (`Read | `Close) as operation -> operation ;; - let read t bs ~off ~len = - let consumed = Reader.read t.reader bs ~off ~len in + let read_with_more t bs ~off ~len more = + let consumed = Reader.read_with_more t.reader bs ~off ~len more in flush_response_body t; consumed ;; + let read t bs ~off ~len = + read_with_more t bs ~off ~len Incomplete + + let read_eof t bs ~off ~len = + read_with_more t bs ~off ~len Complete + let next_write_operation t = flush_request_body t; Writer.next t.writer diff --git a/lib/httpaf.mli b/lib/httpaf.mli index c409eb25..6f4b81ae 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -733,18 +733,19 @@ module Server_connection : sig returns a [`Read] value and additional input is available for the connection to consume. *) + val read_eof : _ t -> Bigstring.t -> off:int -> len:int -> int + (** [read t bigstring ~off ~len] reads bytes of input from the provided range + of [bigstring] and returns the number of bytes consumed by the + connection. {!read} should be called after {!next_read_operation} + returns a [`Read] and an EOF has been received from the communication + channel. The connection will attempt to consume any buffered input and + then shutdown the HTTP parser for the connection. *) + val yield_reader : _ t -> (unit -> unit) -> unit (** [yield_reader t continue] registers with the connection to call [continue] when reading should resume. {!yield_reader} should be called after {next_read_operation} returns a [`Yield] value. *) - val shutdown_reader : _ t -> unit - (** [shutdown_reader t] shuts down the read processor for the connection. All - subsequent calls to {!next_read_operations} will return [`Close]. - {!shutdown_reader} should be called after {!next_read_operation} returns - a [`Read] value and there is no further input available for the - connection to consume. *) - val next_write_operation : _ t -> [ | `Write of Bigstring.t IOVec.t list | `Yield @@ -820,12 +821,13 @@ module Client_connection : sig returns a [`Read] value and additional input is available for the connection to consume. *) - val shutdown_reader : t -> unit - (** [shutdown_reader t] shuts down the read processor for the connection. All - subsequent calls to {!next_read_operations} will return [`Close]. - {!shutdown_reader} should be called after {!next_read_operation} returns - a [`Read] value and there is no further input available for the - connection to consume. *) + val read_eof : t -> Bigstring.t -> off:int -> len:int -> int + (** [read t bigstring ~off ~len] reads bytes of input from the provided range + of [bigstring] and returns the number of bytes consumed by the + connection. {!read} should be called after {!next_read_operation} + returns a [`Read] and an EOF has been received from the communication + channel. The connection will attempt to consume any buffered input and + then shutdown the HTTP parser for the connection. *) val next_write_operation : t -> [ | `Write of Bigstring.t IOVec.t list diff --git a/lib/jbuild b/lib/jbuild index 177d6e80..17e97851 100644 --- a/lib/jbuild +++ b/lib/jbuild @@ -4,5 +4,5 @@ ((name httpaf) (public_name httpaf) (libraries - (angstrom faraday result)) + (angstrom faraday bigstringaf result)) (flags (:standard -safe-string)))) diff --git a/lib/parse.ml b/lib/parse.ml index 8176d9a4..d17f4437 100644 --- a/lib/parse.ml +++ b/lib/parse.ml @@ -260,8 +260,6 @@ module Reader = struct create parser ;; - let close t = - t.closed <- true let is_closed t = t.closed @@ -291,14 +289,25 @@ module Reader = struct | _ -> assert false ;; - let rec read t bs ~off ~len = - match t.parse_state with - | Fail _ -> 0 - | Done -> - start t (AU.parse t.parser); - read t bs ~off ~len; - | Partial continue -> - transition t (continue bs Incomplete ~off ~len) + let rec read_with_more t bs ~off ~len more = + let consumed = + match t.parse_state with + | Fail _ -> 0 + | Done -> + start t (AU.parse t.parser); + read_with_more t bs ~off ~len more; + | Partial continue -> + transition t (continue bs more ~off ~len) + in + begin match more with + | Complete -> t.closed <- true; + | Incomplete -> () + end; + consumed; + ;; + + let force_close t = + ignore (read_with_more t Bigstringaf.empty ~off:0 ~len:0 Complete : int); ;; let next t = diff --git a/lib/server_connection.ml b/lib/server_connection.ml index a393806f..ee3c653c 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -157,7 +157,7 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer let shutdown_reader t = - Reader.close t.reader; + Reader.force_close t.reader; if is_active t then Reqd.close_request_body (current_reqd_exn t) else wakeup_reader t @@ -241,13 +241,19 @@ let next_read_operation t = | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close) as operation -> operation -let read t bs ~off ~len = - let consumed = Reader.read t.reader bs ~off ~len in +let read_with_more t bs ~off ~len more = + let consumed = Reader.read_with_more t.reader bs ~off ~len more in if is_active t then Reqd.flush_request_body (current_reqd_exn t); consumed ;; +let read t bs ~off ~len = + read_with_more t bs ~off ~len Incomplete + +let read_eof t bs ~off ~len = + read_with_more t bs ~off ~len Complete + let yield_reader t k = on_wakeup_reader t k ;; diff --git a/lib_test/simulator.ml b/lib_test/simulator.ml index 44aa6bd9..21e7d57c 100644 --- a/lib_test/simulator.ml +++ b/lib_test/simulator.ml @@ -74,11 +74,13 @@ let test_server ~input ~output ~handler () = else Bigstring.sub ~off:result input, reads' | `Read, [] -> debug " server iloop: eof"; - Server_connection.shutdown_reader conn; + let input_len = Bigstring.length input in + ignore (Server_connection.read_eof conn input ~off:0 ~len:input_len : int); bigstring_empty, [] | _ , [] -> debug " server iloop: eof"; - Server_connection.shutdown_reader conn; + let input_len = Bigstring.length input in + ignore (Server_connection.read_eof conn input ~off:0 ~len:input_len : int); bigstring_empty, [] | `Close , _ -> debug " server iloop: close(ok)"; input, [] @@ -167,11 +169,13 @@ let test_client ~request ~request_body_writes ~response_stream () = else Bigstring.sub ~off:result input, reads' | `Read, [] -> debug " client iloop: eof"; - Client_connection.shutdown_reader conn; + let input_len = Bigstring.length input in + ignore (Client_connection.read_eof conn input ~off:0 ~len:input_len : int); input, [] | _ , [] -> debug " client iloop: eof"; - Client_connection.shutdown_reader conn; + let input_len = Bigstring.length input in + ignore (Client_connection.read_eof conn input ~off:0 ~len:input_len : int); input, [] | `Close , _ -> debug " client iloop: close(ok)"; diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index ba9704e8..fd450bb5 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -138,23 +138,25 @@ module Server_connection = struct `Read (next_read_operation t); ;; - let test_shutdown_reader_is_closed () = + let test_reader_is_closed_after_eof () = let t = create default_request_handler in - shutdown_reader t; + let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in + Alcotest.(check int) "read_eof with no input returns 0" 0 c; Alcotest.(check (of_pp Read_operation.pp_hum)) "Shutting down a reader closes it" `Close (next_read_operation t); let t = create default_request_handler in - let consumed = read t Bigstringaf.empty ~off:0 ~len:0 in - assert (consumed = 0); - shutdown_reader t; + let c = read t Bigstringaf.empty ~off:0 ~len:0 in + Alcotest.(check int) "read with no input returns 0" 0 c; + let c = read_eof t Bigstringaf.empty ~off:0 ~len:0; in + Alcotest.(check int) "read_eof with no input returns 0" 0 c; Alcotest.(check (of_pp Read_operation.pp_hum)) "Shutting down a reader closes it" `Close (next_read_operation t); ;; let tests = - [ "initial reader state" , `Quick, test_initial_reader_state - ; "shutdown reader closed", `Quick, test_shutdown_reader_is_closed + [ "initial reader state" , `Quick, test_initial_reader_state + ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof ] end