Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http upgrades #203

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d73822b
refactor request queue mechanics
dpatti Jan 13, 2020
94e4480
refactor-request-queue: read loop no longer needs to wake writer
dpatti May 16, 2020
cce55fd
refactor-request-queue: fixes
dpatti Apr 2, 2021
0bab909
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
b761c7e
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
a49ea3c
Merge remote-tracking branch 'upstream/refactor-request-queue' into r…
dhouse-js Apr 27, 2021
949d7d9
Merge branch 'refactor-request-queue' into http-upgrades
dhouse-js Apr 27, 2021
124eed8
Fix requests where the client requests an upgrade but the server decl…
dhouse-js Apr 29, 2021
004e128
refactor
dhouse-js Apr 29, 2021
d1ff221
More fixes for when the upgrade request is declined
dhouse-js Apr 29, 2021
60f137b
Unwind changes to parser
dhouse-js Apr 30, 2021
2a36491
Merge branch 'master' into http-upgrades
dhouse-js May 24, 2021
d45b85c
Fix compilation of examples
dhouse-js May 24, 2021
05c797d
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
e6cda20
Implement HTTP upgrades.
dhouse-js Apr 26, 2021
9ccf77b
Fix requests where the client requests an upgrade but the server decl…
dhouse-js Apr 29, 2021
40ae33c
refactor
dhouse-js Apr 29, 2021
5fcb69a
More fixes for when the upgrade request is declined
dhouse-js Apr 29, 2021
681d363
Unwind changes to parser
dhouse-js Apr 30, 2021
90de89e
Fix compilation of examples
dhouse-js May 24, 2021
203f06c
update reader coercion in client tests
dpatti Jun 3, 2021
cee330c
http-upgrades: require no request body
dpatti Jun 5, 2021
81b4808
http-upgrades: refactor input_state logic
dpatti Jun 5, 2021
9faf49c
http-upgrades: add tests and fix read loop issue
dpatti Jun 5, 2021
a978744
resolve conflicts
dhouse-js Jun 7, 2021
6167c51
merge with master
dhouse-js Jun 7, 2021
d3377dd
fix build
dhouse-js Jun 7, 2021
5264434
improve comment
dhouse-js Jun 7, 2021
052bdcf
fix conflicts
dhouse-js Jun 21, 2021
31a6c1a
minor refactoring
dpatti Jan 12, 2022
0c2ede1
expose is_upgrade and fix up documentation
dpatti Jan 12, 2022
0da6f10
Merge remote-tracking branch 'origin/master' into http-upgrades
dpatti Jan 17, 2022
1384050
fix httpaf-async upgrade implementation
dpatti Jan 17, 2022
5294105
fix lwt implementation and add upgrade helper
dpatti Mar 7, 2022
c7eeda5
switch async upgrade to pass the socket
dpatti Mar 7, 2022
71e0486
fix lwt_echo_upgrade example
dpatti Mar 12, 2022
4944d00
move reader yields to separate function
dpatti Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions async/httpaf_async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,28 @@ let read fd buffer =
open Httpaf

module Server = struct
let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler =
let create_connection_handler
?(config=Config.default) ~request_handler ~error_handler ~upgrade_handler =
fun client_addr socket ->
let fd = Socket.fd socket in
let writev = Faraday_async.writev_of_fd fd in
let request_handler = request_handler client_addr in
let error_handler = error_handler client_addr in
let conn = Server_connection.create ~config ~error_handler request_handler in
let read_complete = Ivar.create () in
let write_complete = Ivar.create () in
let upgrade_read, upgrade_write = Ivar.create (), Ivar.create () in
upon
(Deferred.both (Ivar.read upgrade_read) (Ivar.read upgrade_write))
(fun ((), ()) ->
match upgrade_handler with
| None -> failwith "HTTP upgrades not supported"
| Some upgrade_handler ->
upgrade_handler client_addr socket
>>> fun () ->
if not (Fd.is_closed fd) then Socket.shutdown socket `Both;
Ivar.fill read_complete ();
Ivar.fill write_complete ());
let buffer = Buffer.create config.read_buffer_size in
let rec reader_thread () =
match Server_connection.next_read_operation conn with
Expand All @@ -119,13 +133,13 @@ module Server = struct
| `Yield ->
(* Log.Global.printf "read_yield(%d)%!" (Fd.to_int_exn fd); *)
Server_connection.yield_reader conn reader_thread
| `Upgrade -> Ivar.fill upgrade_read ()
| `Close ->
(* Log.Global.printf "read_close(%d)%!" (Fd.to_int_exn fd); *)
Ivar.fill read_complete ();
if not (Fd.is_closed fd)
then Socket.shutdown socket `Receive
in
let write_complete = Ivar.create () in
let rec writer_thread () =
match Server_connection.next_write_operation conn with
| `Write iovecs ->
Expand All @@ -136,6 +150,7 @@ module Server = struct
| `Yield ->
(* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *)
Server_connection.yield_writer conn writer_thread;
| `Upgrade -> Ivar.fill upgrade_write ()
| `Close _ ->
(* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *)
Ivar.fill write_complete ();
Expand Down
1 change: 1 addition & 0 deletions async/httpaf_async.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Server : sig
: ?config : Config.t
-> request_handler : ('a -> Server_connection.request_handler)
-> error_handler : ('a -> Server_connection.error_handler)
-> upgrade_handler : ('a -> ([`Active], 'a) Socket.t -> unit Deferred.t) option
-> ([< Socket.Address.t] as 'a)
-> ([`Active], 'a) Socket.t
-> unit Deferred.t
Expand Down
5 changes: 4 additions & 1 deletion examples/async/async_echo_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ let main port max_accepts_per_batch () =
let where_to_listen = Tcp.Where_to_listen.of_port port in
Tcp.(Server.create_sock ~on_handler_error:`Raise
~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen)
(Server.create_connection_handler ~request_handler ~error_handler)
(Server.create_connection_handler
~request_handler
~error_handler
~upgrade_handler:None)
>>= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Expand Down
47 changes: 47 additions & 0 deletions examples/async/async_echo_upgrade.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
open Core
open Async

open Httpaf_async

let request_handler (_ : Socket.Address.Inet.t) = Httpaf_examples.Server.upgrade
let error_handler (_ : Socket.Address.Inet.t) = Httpaf_examples.Server.error_handler

let upgrade_handler (_ : Socket.Address.Inet.t) socket =
let fd = Socket.fd socket in
let reader = Reader.create fd in
let writer = Writer.create fd in
Reader.read_one_chunk_at_a_time reader ~handle_chunk:(fun bigstring ~pos ~len ->
Writer.write_bigstring writer bigstring ~pos ~len;
return `Continue)
>>| function
| `Eof | `Stopped _ | `Eof_with_unconsumed_data _ -> ()
;;

let main port max_accepts_per_batch () =
let where_to_listen = Tcp.Where_to_listen.of_port port in
Tcp.(Server.create_sock ~on_handler_error:`Raise
~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen)
(Server.create_connection_handler
~request_handler
~error_handler
~upgrade_handler:(Some upgrade_handler))
>>= fun _server ->
Stdio.printf "Listening on port %i, upgrading, and echoing data.\n" port;
Stdio.printf "To send an interactive upgrade request, try\n\n";
Stdio.printf " examples/script/upgrade-connect\n%!";
Deferred.never ()
;;

let () =
Command.async
~summary:"Echo POST requests"
Command.Param.(
map (both
(flag "-p" (optional_with_default 8080 int)
~doc:"int Source port to listen on")
(flag "-a" (optional_with_default 1 int)
~doc:"int Maximum accepts per batch"))
~f:(fun (port, accepts) ->
(fun () -> main port accepts ())))
|> Command.run
;;
2 changes: 1 addition & 1 deletion examples/async/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(executables
(libraries httpaf httpaf-async httpaf_examples async core)
(names async_echo_post async_get async_post))
(names async_echo_post async_echo_upgrade async_get async_post))

(alias
(name examples)
Expand Down
10 changes: 10 additions & 0 deletions examples/lib/httpaf_examples.ml
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,14 @@ module Server = struct
end;
Body.Writer.close response_body
;;

let upgrade reqd =
if Request.is_upgrade (Reqd.request reqd) then (
let headers = Headers.of_list [ "connection", "upgrade" ] in
Reqd.respond_with_upgrade reqd headers;
) else (
let headers = Headers.of_list [ "connection", "close" ] in
Reqd.respond_with_string reqd (Response.create ~headers `Not_found) ""
)
;;
end
2 changes: 1 addition & 1 deletion examples/lwt/dune
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(executables
(libraries httpaf httpaf-lwt-unix httpaf_examples base stdio lwt lwt.unix)
(names lwt_get lwt_post lwt_echo_post))
(names lwt_get lwt_post lwt_echo_post lwt_echo_upgrade))

(alias
(name examples)
Expand Down
5 changes: 4 additions & 1 deletion examples/lwt/lwt_echo_post.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ let main port =
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler ~request_handler ~error_handler)
(Server.create_connection_handler
~request_handler
~error_handler
~upgrade_handler:None)
>|= fun _server ->
Stdio.printf "Listening on port %i and echoing POST requests.\n" port;
Stdio.printf "To send a POST request, try one of the following\n\n";
Expand Down
47 changes: 47 additions & 0 deletions examples/lwt/lwt_echo_upgrade.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
open Base
open Lwt.Infix
module Arg = Caml.Arg

open Httpaf_lwt_unix

let request_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.upgrade
let error_handler (_ : Unix.sockaddr) = Httpaf_examples.Server.error_handler

let upgrade_handler (_ : Unix.sockaddr) (fd : Lwt_unix.file_descr) =
let input = Lwt_io.of_fd fd ~mode:Input in
let output = Lwt_io.of_fd fd ~mode:Output in
let rec loop () =
Lwt_io.read input ~count:4096
>>= function
| "" -> Lwt.return_unit
| data -> Lwt_io.write output data >>= loop
in
loop ()
;;

let main port =
let listen_address = Unix.(ADDR_INET (inet_addr_loopback, port)) in
Lwt.async (fun () ->
Lwt_io.establish_server_with_client_socket
listen_address
(Server.create_connection_handler
~request_handler
~error_handler
~upgrade_handler:(Some upgrade_handler))
>|= fun _server ->
Stdio.printf "Listening on port %i, upgrading, and echoing data.\n" port;
Stdio.printf "To send an interactive upgrade request, try\n\n";
Stdio.printf " examples/script/upgrade-connect\n%!");
let forever, _ = Lwt.wait () in
Lwt_main.run forever
;;

let () =
let port = ref 8080 in
Arg.parse
["-p", Arg.Set_int port, " Listening port number (8080 by default)"]
ignore
"Echoes POST requests. Runs forever.";
main !port
;;

13 changes: 13 additions & 0 deletions examples/script/upgrade-connect
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash
set -euo pipefail

function headers {
printf "\
GET / HTTP/1.1\r
Host: localhost\r
Connection: upgrade\r
\r
"
}

( headers; echo hello; cat; echo bye ) | nc localhost 8080 --close
17 changes: 16 additions & 1 deletion lib/httpaf.mli
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ module Request : sig
more details. *)

val pp_hum : Format.formatter -> t -> unit [@@ocaml.toplevel_printer]

val is_upgrade : t -> bool
(** [is_upgrade t] returns true if the request has the "Connection: upgrade"
header. *)
end


Expand Down Expand Up @@ -659,6 +663,16 @@ module Reqd : sig
val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit
val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> Body.Writer.t

val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit
(** Initiate an HTTP upgrade. [Server_connection.next_write_request] and
[next_read_request] will begin returning [`Upgrade] once the response
headers have been written, which indicates that the runtime should take
over direct control of the socket rather than shuttling bytes through
httpaf.

The headers must indicate a valid upgrade message, e.g. must include
"Connection: upgrade". See [Request.is_upgrade]. *)

(** {3 Exception Handling} *)

val report_exn : t -> exn -> unit
Expand Down Expand Up @@ -700,7 +714,7 @@ module Server_connection : sig
(** [create ?config ?error_handler ~request_handler] creates a connection
handler that will service individual requests with [request_handler]. *)

val next_read_operation : t -> [ `Read | `Yield | `Close ]
val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ]
(** [next_read_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)

Expand All @@ -727,6 +741,7 @@ module Server_connection : sig
val next_write_operation : t -> [
| `Write of Bigstringaf.t IOVec.t list
| `Yield
| `Upgrade
| `Close of int ]
(** [next_write_operation t] returns a value describing the next operation
that the caller should conduct on behalf of the connection. *)
Expand Down
2 changes: 2 additions & 0 deletions lib/parse.ml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ module Reader = struct
| `Fixed 0L ->
handler request Body.Reader.empty;
ok
| `Fixed _ | `Chunked when Request.is_upgrade request ->
return (Error (`Bad_request request))
| `Fixed _ | `Chunked as encoding ->
let request_body = Body.Reader.create Bigstringaf.empty in
handler request request_body;
Expand Down
Loading