diff --git a/.travis.yml b/.travis.yml index 190e3e81..98db4ae4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,11 +6,14 @@ install: wget https://raw.githubusercontent.com/ocaml/ocaml-travisci-skeleton/ma script: bash -ex ./.travis-docker.sh env: global: - - PINS="httpaf-async:. httpaf:." + - PINS="httpaf-async:. httpaf-lwt:. httpaf:." matrix: - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" + - PACKAGE="httpaf-lwt" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0" - PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" - PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" + - PACKAGE="httpaf-lwt" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2" - PACKAGE="httpaf" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" - PACKAGE="httpaf-async" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" + - PACKAGE="httpaf-lwt" DISTRO="debian-unstable" OCAML_VERSION="4.03.0" diff --git a/benchmarks/jbuild b/benchmarks/jbuild index a5d21386..87b0897b 100644 --- a/benchmarks/jbuild +++ b/benchmarks/jbuild @@ -1,6 +1,11 @@ (jbuild_version 1) -(executables +(executable ((libraries (httpaf httpaf-async async core)) (modules (wrk_async_benchmark)) - (names (wrk_async_benchmark)))) + (name wrk_async_benchmark))) + +(executable + ((name wrk_lwt_benchmark) + (modules (Wrk_lwt_benchmark)) + (libraries (httpaf httpaf-lwt lwt.unix)))) diff --git a/benchmarks/wrk_lwt_benchmark.ml b/benchmarks/wrk_lwt_benchmark.ml new file mode 100644 index 00000000..e19be227 --- /dev/null +++ b/benchmarks/wrk_lwt_benchmark.ml @@ -0,0 +1,95 @@ +let text = +{|CHAPTER I. Down the Rabbit-Hole +Alice was beginning to get very tired of sitting by her sister on the bank, and +of having nothing to do: once or twice she had peeped into the book her sister +was reading, but it had no pictures or conversations in it, thought Alice So she was +considering in her own mind (as well as she could, for the hot day made her feel +very sleepy and stupid), whether the pleasure of making a daisy-chain would be +worth the trouble of getting up and picking the daisies, when suddenly a White +Rabbit with pink eyes ran close by her. There was nothing so very remarkable in +that; nor did Alice think it so very much out of the way to hear the Rabbit say +to itself, (when she thought it over +afterwards, it occurred to her that she ought to have wondered at this, but at +the time it all seemed quite natural); but when the Rabbit actually took a watch +out of its waistcoat-pocket, and looked at it, and then hurried on, Alice +started to her feet, for it flashed across her mind that she had never before +seen a rabbit with either a waistcoat-pocket, or a watch to take out of it, and +burning with curiosity, she ran across the field after it, and fortunately was +just in time to see it pop down a large rabbit-hole under the hedge. In another +moment down went Alice after it, never once considering how in the world she was +to get out again. The rabbit-hole went straight on like a tunnel for some way, +and then dipped suddenly down, so suddenly that Alice had not a moment to think +about stopping herself before she found herself falling down a very deep well. +Either the well was very deep, or she fell very slowly, for she had plenty of +time as she went down to look about her and to wonder what was going to happen +next. First, she tried to look down and make out what she was coming to, but it +was too dark to see anything; then she looked at the sides of the well, and +noticed that they were filled with cupboards......|} + +let connection_handler = + let module Body = Httpaf.Body in + let module Headers = Httpaf.Headers in + let module Reqd = Httpaf.Reqd in + let module Response = Httpaf.Response in + let module Status = Httpaf.Status in + + let text = Lwt_bytes.of_string text in + + let response_headers = + Headers.of_list [ + "Content-Length", string_of_int (Lwt_bytes.length text) + ] + in + + let request_handler _ reqd = + let {Httpaf.Request.target; _} = Reqd.request reqd in + let request_body = Reqd.request_body reqd in + Body.close_reader request_body; + + match target with + | "/" -> + Reqd.respond_with_bigstring + reqd (Response.create ~headers:response_headers `OK) text; + | _ -> + Reqd.respond_with_string + reqd (Response.create `Not_found) "Route not found" + in + + let error_handler _ ?request error start_response = + let response_body = start_response Headers.empty in + + begin match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n"; + + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + + Body.close_writer response_body + in + + Httpaf_lwt.Server.create_connection_handler + ?config:None ~request_handler ~error_handler + +let () = + let open Lwt.Infix in + + let port = ref 8080 in + Arg.parse + ["-p", Arg.Set_int port, " Listening port number (8080 by default)"] + ignore + "Responds to requests with a fixed string for benchmarking purposes."; + + let listen_address = Unix.(ADDR_INET (inet_addr_loopback, !port)) in + + Lwt.async begin fun () -> + Lwt_io.establish_server_with_client_socket + ~backlog:11_000 listen_address connection_handler + >>= fun _server -> Lwt.return_unit + end; + + let forever, _ = Lwt.wait () in + Lwt_main.run forever diff --git a/examples/async_echo_post.ml b/examples/async/async_echo_post.ml similarity index 100% rename from examples/async_echo_post.ml rename to examples/async/async_echo_post.ml diff --git a/examples/async_get.ml b/examples/async/async_get.ml similarity index 100% rename from examples/async_get.ml rename to examples/async/async_get.ml diff --git a/examples/async_post.ml b/examples/async/async_post.ml similarity index 100% rename from examples/async_post.ml rename to examples/async/async_post.ml diff --git a/examples/jbuild b/examples/async/jbuild similarity index 100% rename from examples/jbuild rename to examples/async/jbuild diff --git a/examples/lwt/jbuild b/examples/lwt/jbuild new file mode 100644 index 00000000..b21fe052 --- /dev/null +++ b/examples/lwt/jbuild @@ -0,0 +1,5 @@ +(jbuild_version 1) + +(executables + ((names (lwt_get lwt_post lwt_echo_server)) + (libraries (httpaf httpaf-lwt lwt lwt.unix)))) diff --git a/examples/lwt/lwt_echo_server.ml b/examples/lwt/lwt_echo_server.ml new file mode 100644 index 00000000..1f2bc386 --- /dev/null +++ b/examples/lwt/lwt_echo_server.ml @@ -0,0 +1,101 @@ +let connection_handler : Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t = + let module Body = Httpaf.Body in + let module Headers = Httpaf.Headers in + let module Reqd = Httpaf.Reqd in + let module Response = Httpaf.Response in + let module Status = Httpaf.Status in + + let request_handler : Unix.sockaddr -> _ Reqd.t -> unit = + fun _client_address request_descriptor -> + + let request = Reqd.request request_descriptor in + match request.meth with + | `POST -> + let request_body = Reqd.request_body request_descriptor in + + let response_content_type = + match Headers.get request.headers "Content-Type" with + | Some request_content_type -> request_content_type + | None -> "application/octet-stream" + in + + let response = + Response.create + ~headers:(Headers.of_list [ + "Content-Type", response_content_type; + "Connection", "close"; + ]) + `OK + in + + let response_body = + Reqd.respond_with_streaming request_descriptor response in + + let rec respond () = + Body.schedule_read + request_body + ~on_eof:(fun () -> Body.close_writer response_body) + ~on_read:(fun request_data ~off ~len -> + Body.write_bigstring response_body request_data ~off ~len; + respond ()) + in + respond () + + | _ -> + Reqd.respond_with_string + request_descriptor (Response.create `Method_not_allowed) "" + in + + let error_handler : + Unix.sockaddr -> + ?request:Httpaf.Request.t -> + _ -> + (Headers.t -> [`write] Body.t) -> + unit = + fun _client_address ?request:_ error start_response -> + + let response_body = start_response Headers.empty in + + begin match error with + | `Exn exn -> + Body.write_string response_body (Printexc.to_string exn); + Body.write_string response_body "\n"; + + | #Status.standard as error -> + Body.write_string response_body (Status.default_reason_phrase error) + end; + + Body.close_writer response_body + in + + Httpaf_lwt.Server.create_connection_handler + ?config:None + ~request_handler + ~error_handler + + + +let () = + let open Lwt.Infix in + + let port = ref 8080 in + Arg.parse + ["-p", Arg.Set_int port, " Listening port number (8080 by default)"] + ignore + "Echoes POST requests. Runs forever."; + + let listen_address = Unix.(ADDR_INET (inet_addr_loopback, !port)) in + + Lwt.async begin fun () -> + Lwt_io.establish_server_with_client_socket + listen_address connection_handler + >>= fun _server -> + Printf.printf "Listening on port %i and echoing POST requests.\n" !port; + print_string "To send a POST request, try\n\n"; + print_string " echo foo | dune exec examples/lwt/lwt_post.exe\n\n"; + flush stdout; + Lwt.return_unit + end; + + let forever, _ = Lwt.wait () in + Lwt_main.run forever diff --git a/examples/lwt/lwt_get.ml b/examples/lwt/lwt_get.ml new file mode 100644 index 00000000..271e3e29 --- /dev/null +++ b/examples/lwt/lwt_get.ml @@ -0,0 +1,73 @@ +module Body = Httpaf.Body + +let response_handler notify_response_received response response_body = + let module Response = Httpaf.Response in + match Response.(response.status) with + | `OK -> + let rec read_response () = + Body.schedule_read + response_body + ~on_eof:(fun () -> Lwt.wakeup_later notify_response_received ()) + ~on_read:(fun response_fragment ~off ~len -> + let response_fragment_string = Bytes.create len in + Lwt_bytes.blit_to_bytes + response_fragment off + response_fragment_string 0 + len; + print_string (Bytes.unsafe_to_string response_fragment_string); + + read_response ()) + in + read_response () + + | _ -> + Format.fprintf Format.err_formatter "%a\n%!" Response.pp_hum response; + exit 1 + +let error_handler _ = + assert false + +open Lwt.Infix + +let () = + let host = ref None in + let port = ref 80 in + + Arg.parse + ["-p", Set_int port, " Port number (80 by default)"] + (fun host_argument -> host := Some host_argument) + "lwt_get.exe [-p N] HOST"; + + let host = + match !host with + | None -> failwith "No hostname provided" + | Some host -> host + in + + Lwt_main.run begin + Lwt_unix.getaddrinfo host (string_of_int !port) [Unix.(AI_FAMILY PF_INET)] + >>= fun addresses -> + + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr + >>= fun () -> + + let request_headers = + Httpaf.Request.create + `GET "/" ~headers:(Httpaf.Headers.of_list ["Host", host]) + in + + let response_received, notify_response_received = Lwt.wait () in + let response_handler = response_handler notify_response_received in + + let request_body = + Httpaf_lwt.Client.request + socket + request_headers + ~error_handler + ~response_handler + in + Body.close_writer request_body; + + response_received + end diff --git a/examples/lwt/lwt_post.ml b/examples/lwt/lwt_post.ml new file mode 100644 index 00000000..b9d2e841 --- /dev/null +++ b/examples/lwt/lwt_post.ml @@ -0,0 +1,77 @@ +module Body = Httpaf.Body + +let response_handler notify_response_received response response_body = + let module Response = Httpaf.Response in + match Response.(response.status) with + | `OK -> + let rec read_response () = + Body.schedule_read + response_body + ~on_eof:(fun () -> Lwt.wakeup_later notify_response_received ()) + ~on_read:(fun response_fragment ~off ~len -> + let response_fragment_string = Bytes.create len in + Lwt_bytes.blit_to_bytes + response_fragment off + response_fragment_string 0 + len; + print_string (Bytes.unsafe_to_string response_fragment_string); + + read_response ()) + in + read_response () + + | _ -> + Format.fprintf Format.err_formatter "%a\n%!" Response.pp_hum response; + exit 1 + +let error_handler _ = + assert false + +open Lwt.Infix + +let () = + let host = ref "127.0.0.1" in + let port = ref 8080 in + + Arg.parse + [ + "-h", Set_string host, " Hostname (127.0.0.1 by default)"; + "-p", Set_int port, " Port number (8080 by default)"; + ] + ignore + "lwt_get.exe [-h HOST] [-p N]"; + + Lwt_main.run begin + Lwt_io.(read stdin) + >>= fun text_to_send -> + + Lwt_unix.getaddrinfo !host (string_of_int !port) [Unix.(AI_FAMILY PF_INET)] + >>= fun addresses -> + + let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in + Lwt_unix.connect socket (List.hd addresses).Unix.ai_addr + >>= fun () -> + + let request_headers = + Httpaf.Request.create `POST "/" ~headers:(Httpaf.Headers.of_list [ + "Host", !host; + "Connection", "close"; + "Content-Length", string_of_int (String.length text_to_send); + ]) + in + + let response_received, notify_response_received = Lwt.wait () in + let response_handler = response_handler notify_response_received in + + let request_body = + Httpaf_lwt.Client.request + socket + request_headers + ~error_handler + ~response_handler + in + Body.write_string request_body text_to_send; + Body.close_writer request_body; + + response_received + end diff --git a/httpaf-lwt.opam b/httpaf-lwt.opam new file mode 100644 index 00000000..1c47a2c9 --- /dev/null +++ b/httpaf-lwt.opam @@ -0,0 +1,20 @@ +opam-version: "2.0" +name: "httpaf-lwt" +maintainer: "Spiros Eliopoulos " +authors: [ "Anton Bachin " ] +license: "BSD-3-clause" +homepage: "https://github.com/inhabitedtype/httpaf" +bug-reports: "https://github.com/inhabitedtype/httpaf/issues" +dev-repo: "git+https://github.com/inhabitedtype/httpaf.git" +build: [ + ["jbuilder" "subst" "-p" name] {pinned} + ["jbuilder" "build" "-p" name "-j" jobs] +] +depends: [ + "ocaml" {>= "4.03.0"} + "faraday-lwt-unix" + "httpaf" + "jbuilder" {build & >= "1.0+beta10"} + "lwt" +] +synopsis: "Lwt support for http/af" diff --git a/lwt/httpaf_lwt.ml b/lwt/httpaf_lwt.ml new file mode 100644 index 00000000..2da8c102 --- /dev/null +++ b/lwt/httpaf_lwt.ml @@ -0,0 +1,271 @@ +open Lwt.Infix + + + +(* Based on the Buffer module in httpaf_async.ml. *) +module Buffer : sig + type t + + val create : int -> t + + val get : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int) -> int + val put : t -> f:(Lwt_bytes.t -> off:int -> len:int -> int Lwt.t) -> int Lwt.t +end = struct + type t = + { buffer : Lwt_bytes.t + ; mutable off : int + ; mutable len : int } + + let create size = + let buffer = Lwt_bytes.create size in + { buffer; off = 0; len = 0 } + + let compress t = + if t.len = 0 + then begin + t.off <- 0; + t.len <- 0; + end else if t.off > 0 + then begin + Lwt_bytes.blit t.buffer t.off t.buffer 0 t.len; + t.off <- 0; + end + + let get t ~f = + let n = f t.buffer ~off:t.off ~len:t.len in + t.off <- t.off + n; + t.len <- t.len - n; + if t.len = 0 + then t.off <- 0; + n + + let put t ~f = + compress t; + f t.buffer ~off:(t.off + t.len) ~len:(Lwt_bytes.length t.buffer - t.len) + >>= fun n -> + t.len <- t.len + n; + Lwt.return n +end + +let read fd buffer = + Lwt.catch + (fun () -> + Buffer.put buffer ~f:(fun bigstring ~off ~len -> + Lwt_bytes.read fd bigstring off len)) + (function + | Unix.Unix_error (Unix.EBADF, _, _) as exn -> + Lwt.fail exn + | exn -> + Lwt.async (fun () -> + Lwt_unix.close fd); + Lwt.fail exn) + + >>= fun bytes_read -> + if bytes_read = 0 then + Lwt.return `Eof + else + Lwt.return (`Ok bytes_read) + + + +let shutdown socket command = + try Lwt_unix.shutdown socket command + with Unix.Unix_error (Unix.ENOTCONN, _, _) -> () + + + +module Server = struct + type request_handler = + Lwt_unix.file_descr Httpaf.Server_connection.request_handler + + + + let create_connection_handler ?config ~request_handler ~error_handler = + fun client_addr socket -> + let module Server_connection = Httpaf.Server_connection in + let connection = + Server_connection.create + ?config + ~error_handler:(error_handler client_addr) + (request_handler client_addr) + in + + + let read_buffer = Buffer.create 0x1000 in + let read_loop_exited, notify_read_loop_exited = Lwt.wait () in + + let rec read_loop () = + let rec read_loop_step () = + match Server_connection.next_read_operation connection with + | `Read -> + read socket read_buffer >>= begin function + | `Eof -> + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read_eof connection bigstring ~off ~len) + |> ignore; + read_loop_step () + | `Ok _ -> + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Server_connection.read connection bigstring ~off ~len) + |> ignore; + read_loop_step () + end + + | `Yield -> + Server_connection.yield_reader connection read_loop; + Lwt.return_unit + + | `Close -> + Lwt.wakeup_later notify_read_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_RECEIVE + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + read_loop_step + (fun exn -> + Server_connection.report_exn connection exn; + Lwt.return_unit)) + in + + + let writev = Faraday_lwt_unix.writev_of_fd socket in + let write_loop_exited, notify_write_loop_exited = Lwt.wait () in + + let rec write_loop () = + let rec write_loop_step () = + match Server_connection.next_write_operation connection with + | `Write io_vectors -> + writev io_vectors >>= fun result -> + Server_connection.report_write_result connection result; + write_loop_step () + + | `Yield -> + Server_connection.yield_writer connection write_loop; + Lwt.return_unit + + | `Close _ -> + Lwt.wakeup_later notify_write_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_SEND + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + write_loop_step + (fun exn -> + Server_connection.report_exn connection exn; + Lwt.return_unit)) + in + + + read_loop (); + write_loop (); + Lwt.join [read_loop_exited; write_loop_exited] >>= fun () -> + + if Lwt_unix.state socket <> Lwt_unix.Closed then + Lwt.catch + (fun () -> Lwt_unix.close socket) + (fun _exn -> Lwt.return_unit) + else + Lwt.return_unit +end + + + +module Client = struct + let request socket request ~error_handler ~response_handler = + let module Client_connection = Httpaf.Client_connection in + let request_body, connection = + Client_connection.request request ~error_handler ~response_handler in + + + let read_buffer = Buffer.create 0x1000 in + let read_loop_exited, notify_read_loop_exited = Lwt.wait () in + + let read_loop () = + let rec read_loop_step () = + match Client_connection.next_read_operation connection with + | `Read -> + read socket read_buffer >>= begin function + | `Eof -> + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Client_connection.read_eof connection bigstring ~off ~len) + |> ignore; + read_loop_step () + | `Ok _ -> + Buffer.get read_buffer ~f:(fun bigstring ~off ~len -> + Client_connection.read connection bigstring ~off ~len) + |> ignore; + read_loop_step () + end + + | `Close -> + Lwt.wakeup_later notify_read_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_RECEIVE + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + read_loop_step + (fun exn -> + Client_connection.report_exn connection exn; + Lwt.return_unit)) + in + + + let writev = Faraday_lwt_unix.writev_of_fd socket in + let write_loop_exited, notify_write_loop_exited = Lwt.wait () in + + let rec write_loop () = + let rec write_loop_step () = + match Client_connection.next_write_operation connection with + | `Write io_vectors -> + writev io_vectors >>= fun result -> + Client_connection.report_write_result connection result; + write_loop_step () + + | `Yield -> + Client_connection.yield_writer connection write_loop; + Lwt.return_unit + + | `Close _ -> + Lwt.wakeup_later notify_write_loop_exited (); + if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin + shutdown socket Unix.SHUTDOWN_SEND + end; + Lwt.return_unit + in + + Lwt.async (fun () -> + Lwt.catch + write_loop_step + (fun exn -> + Client_connection.report_exn connection exn; + Lwt.return_unit)) + in + + + read_loop (); + write_loop (); + + Lwt.async (fun () -> + Lwt.join [read_loop_exited; write_loop_exited] >>= fun () -> + + if Lwt_unix.state socket <> Lwt_unix.Closed then + Lwt.catch + (fun () -> Lwt_unix.close socket) + (fun _exn -> Lwt.return_unit) + else + Lwt.return_unit); + + request_body +end diff --git a/lwt/httpaf_lwt.mli b/lwt/httpaf_lwt.mli new file mode 100644 index 00000000..63fe2a5a --- /dev/null +++ b/lwt/httpaf_lwt.mli @@ -0,0 +1,23 @@ +(* The function that results from [create_connection_handler] should be passed + to [Lwt_io.establish_server_with_client_socket]. For an example, see + [examples/lwt_echo_server.ml]. *) +module Server : sig + type request_handler = + Lwt_unix.file_descr Httpaf.Server_connection.request_handler + + val create_connection_handler + : ?config : Httpaf.Server_connection.Config.t + -> request_handler : (Unix.sockaddr -> request_handler) + -> error_handler : (Unix.sockaddr -> Httpaf.Server_connection.error_handler) + -> (Unix.sockaddr -> Lwt_unix.file_descr -> unit Lwt.t) +end + +(* For an example, see [examples/lwt_get.ml]. *) +module Client : sig + val request + : Lwt_unix.file_descr + -> Httpaf.Request.t + -> error_handler : Httpaf.Client_connection.error_handler + -> response_handler : Httpaf.Client_connection.response_handler + -> [`write] Httpaf.Body.t +end diff --git a/lwt/jbuild b/lwt/jbuild new file mode 100644 index 00000000..179e7f9a --- /dev/null +++ b/lwt/jbuild @@ -0,0 +1,7 @@ +(jbuild_version 1) + +(library + ((name httpaf_lwt) + (public_name httpaf-lwt) + (libraries (faraday-lwt-unix httpaf lwt.unix)) + (flags (:standard -safe-string))))