Skip to content

Commit

Permalink
Add Lwt UNIX adapter (httpaf-lwt-unix).
Browse files Browse the repository at this point in the history
  • Loading branch information
paurkedal committed Jun 16, 2018
1 parent b82a47f commit b935fd7
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 1 deletion.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ install: wget https://raw.githubusercontent.com/ocaml/ocaml-travisci-skeleton/ma
script: bash -ex ./.travis-docker.sh
env:
global:
- PINS="httpaf-async:. httpaf:."
- PINS="httpaf-async:. httpaf-lwt:. httpaf:."
matrix:
- PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0"
- PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0"
- PACKAGE="httpaf-lwt" DISTRO="ubuntu-16.04" OCAML_VERSION="4.06.0"
- PACKAGE="httpaf" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2"
- PACKAGE="httpaf-async" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2"
- PACKAGE="httpaf-lwt" DISTRO="ubuntu-16.04" OCAML_VERSION="4.04.2"
- PACKAGE="httpaf" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf-async" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf-lwt" DISTRO="alpine-3.5" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf" DISTRO="debian-unstable" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf-async" DISTRO="debian-unstable" OCAML_VERSION="4.03.0"
- PACKAGE="httpaf-lwt" DISTRO="debian-unstable" OCAML_VERSION="4.03.0"
5 changes: 5 additions & 0 deletions benchmarks/jbuild
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@
((libraries (httpaf httpaf-async async core))
(modules (wrk_async_benchmark))
(names (wrk_async_benchmark))))

(executables
((libraries (httpaf httpaf-lwt-unix lwt lwt.unix))
(modules (wrk_lwt_benchmark))
(names (wrk_lwt_benchmark))))
64 changes: 64 additions & 0 deletions benchmarks/wrk_lwt_benchmark.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
open Lwt.Infix
open Httpaf

let text = "CHAPTER I. Down the Rabbit-Hole Alice was beginning to get very tired of sitting by her sister on the bank, and of having nothing to do: once or twice she had peeped into the book her sister was reading, but it had no pictures or conversations in it, <and what is the use of a book,> thought Alice <without pictures or conversations?> So she was considering in her own mind (as well as she could, for the hot day made her feel very sleepy and stupid), whether the pleasure of making a daisy-chain would be worth the trouble of getting up and picking the daisies, when suddenly a White Rabbit with pink eyes ran close by her. There was nothing so very remarkable in that; nor did Alice think it so very much out of the way to hear the Rabbit say to itself, <Oh dear! Oh dear! I shall be late!> (when she thought it over afterwards, it occurred to her that she ought to have wondered at this, but at the time it all seemed quite natural); but when the Rabbit actually took a watch out of its waistcoat-pocket, and looked at it, and then hurried on, Alice started to her feet, for it flashed across her mind that she had never before seen a rabbit with either a waistcoat-pocket, or a watch to take out of it, and burning with curiosity, she ran across the field after it, and fortunately was just in time to see it pop down a large rabbit-hole under the hedge. In another moment down went Alice after it, never once considering how in the world she was to get out again. The rabbit-hole went straight on like a tunnel for some way, and then dipped suddenly down, so suddenly that Alice had not a moment to think about stopping herself before she found herself falling down a very deep well. Either the well was very deep, or she fell very slowly, for she had plenty of time as she went down to look about her and to wonder what was going to happen next. First, she tried to look down and make out what she was coming to, but it was too dark to see anything; then she looked at the sides of the well, and noticed that they were filled with cupboards......"

let text = Bigstring.of_string text

let headers = Headers.of_list ["content-length", string_of_int (Bigstring.length text)]
let error_handler _ ?request:_ error start_response =
let response_body = start_response Headers.empty in
begin match error with
| `Exn exn ->
Body.write_string response_body (Printexc.to_string exn);
Body.write_string response_body "\n";
| #Status.standard as error ->
Body.write_string response_body (Status.default_reason_phrase error)
end;
Body.close_writer response_body
;;

let request_handler _ reqd =
let {Request.target; _} = Reqd.request reqd in
let request_body = Reqd.request_body reqd in
Body.close_reader request_body;
match target with
| "/" -> Reqd.respond_with_bigstring reqd (Response.create ~headers `OK) text;
| _ -> Reqd.respond_with_string reqd (Response.create `Not_found) "Route not found"

let main port max_accepts_per_batch =
let conn_count = ref 0 in
let sock = Lwt_unix.socket Lwt_unix.PF_INET Lwt_unix.SOCK_STREAM 0 in
let sockaddr = Lwt_unix.ADDR_INET (Unix.inet_addr_loopback, port) in
Lwt_unix.bind sock sockaddr >>= fun () ->
Lwt_unix.listen sock 11_000;
let h = Httpaf_lwt_unix.Server.create_connection_handler ~error_handler ~request_handler in

let rec monitor () =
Lwt_unix.sleep 0.5 >>= fun () ->
Lwt_io.printlf "conns: %d" !conn_count >>= fun () ->
monitor () in
Lwt.async monitor;

let rec serve () =
Lwt_unix.accept_n sock max_accepts_per_batch >>= fun (accepts, exn) ->
begin match exn with
| None -> ()
| Some exn -> prerr_endline ("Accept failed: " ^ Printexc.to_string exn)
end;
conn_count := !conn_count + List.length accepts;
accepts |> List.iter begin fun (sa, fd) ->
Lwt.async (fun () -> h fd sa >|= fun () -> decr conn_count)
end;
serve () in
serve ()

let () =
let port = ref 8080 in
let max_accepts_per_batch = ref 1 in
Arg.parse
[ "-p", Arg.Set_int port, "int Source port to listen on";
"-a", Arg.Set_int max_accepts_per_batch, "int Maximum accepts per batch" ]
(fun _ -> raise (Arg.Bad "positional arg"))
"Start a hello world Lwt server";
Lwt_main.run (main !port !max_accepts_per_batch)
5 changes: 5 additions & 0 deletions examples/jbuild
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
((libraries (httpaf httpaf-async async core))
(modules (async_echo_post async_get async_post))
(names (async_echo_post async_get async_post))))

(executables
((libraries (httpaf httpaf-lwt-unix lwt lwt.unix))
(modules (lwt_unix_echo_post lwt_unix_get))
(names (lwt_unix_echo_post lwt_unix_get))))
68 changes: 68 additions & 0 deletions examples/lwt_unix_echo_post.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
open Httpaf
open Httpaf_lwt_unix
open Lwt.Infix

let error_handler _ ?request:_ error start_response =
let response_body = start_response Headers.empty in
match error with
| `Exn exn ->
Body.write_string response_body (Printexc.to_string exn);
Body.write_string response_body "\n"
| #Status.standard as error ->
Body.write_string response_body (Status.default_reason_phrase error);
Body.close_writer response_body

let request_handler _ reqd =
match Reqd.request reqd with
| {Request.meth = `POST; headers; _} ->
let response =
let content_type =
match Headers.get headers "content-type" with
| None -> "application/octet-stream"
| Some x -> x in
let headers = Headers.of_list [
"content-type", content_type;
"connection", "close";
] in
Response.create ~headers `OK in
let request_body = Reqd.request_body reqd in
let response_body = Reqd.respond_with_streaming reqd response in
let
rec on_read buffer ~off ~len =
Body.write_bigstring response_body buffer ~off ~len;
Body.schedule_read request_body ~on_eof ~on_read
and on_eof () =
print_endline "eof";
Body.close_writer response_body
in
Body.schedule_read (Reqd.request_body reqd) ~on_eof ~on_read
| _ ->
Reqd.respond_with_string reqd (Response.create `Method_not_allowed) ""

let main port max_accepts_per_batch =
let sock = Lwt_unix.socket Lwt_unix.PF_INET Lwt_unix.SOCK_STREAM 0 in
let sockaddr = Lwt_unix.ADDR_INET (Unix.inet_addr_loopback, port) in
Lwt_unix.bind sock sockaddr >>= fun () ->
Lwt_unix.listen sock 10_000;
let h = Server.create_connection_handler ~error_handler ~request_handler in
let rec serve () =
Lwt_unix.accept_n sock max_accepts_per_batch >>= fun (accepts, exn) ->
begin match exn with
| None -> ()
| Some exn -> prerr_endline ("Accept failed: " ^ Printexc.to_string exn)
end;
List.iter (fun (sa, fd) -> Lwt.async (fun () -> h fd sa)) accepts;
serve () in
serve ()

let () =
let port = ref 8080 in
let batch_capacity = ref 100 in
Arg.parse
["-p", Arg.Set_int port, " Port number to listen on.";
"-a", Arg.Set_int batch_capacity, " Maximum number of accepts per batch."]
(fun _ ->
prerr_endline "No posititonal arguments accepted.";
exit 64)
"lwt_unix_echo_post [-p PORT] [-a N-ACCEPT-PER-BATCH]";
Lwt_main.run (main !port !batch_capacity)
90 changes: 90 additions & 0 deletions examples/lwt_unix_get.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
open Httpaf
open Lwt.Infix

let string_of_pp f x =
let buf = Buffer.create 128 in
let ppf = Format.formatter_of_buffer buf in
f ppf x;
Format.pp_print_flush ppf ();
Buffer.contents buf

let report_error = function
| `Malformed_response msg ->
Lwt_io.eprintl msg
| `Invalid_response_body_length _ ->
Lwt_io.eprintl "Invalid response body length."
| `Exn exn ->
Lwt_io.eprintl (Printexc.to_string exn)

let response_handler finished response response_body =
Lwt.async @@ fun () ->
Lwt_io.printl (string_of_pp Response.pp_hum response) >|= fun () ->
let on_eof () = Lwt.wakeup_later finished (Ok ()) in
let rec on_read bs ~off ~len =
Lwt.async @@ fun () ->
Lwt_io.print (Bigstring.to_string ~off ~len bs) >|= fun () ->
Body.schedule_read response_body ~on_read ~on_eof in
Body.schedule_read response_body ~on_read ~on_eof

let error_handler finished error = Lwt.wakeup_later finished (Error error)

let main host port =
let host_entry = Unix.gethostbyname host in
let sock = Lwt_unix.socket host_entry.Unix.h_addrtype Lwt_unix.SOCK_STREAM 0 in
let errors = ref [] in
let process () =
let request_promise, request_resolver = Lwt.wait () in
let headers = Headers.of_list [
"Host", host;
] in
let request_body =
Httpaf_lwt_unix.Client.request
~error_handler:(error_handler request_resolver)
~response_handler:(response_handler request_resolver)
sock
(Request.create ~headers `GET "/") in
Body.close_writer request_body;
request_promise in
let rec connect_loop i =
if i = Array.length host_entry.h_addr_list then begin
Lwt_io.eprintl "Address unreachable." >>= fun () ->
Lwt_list.iter_s
(fun (inet_addr, msg) ->
Lwt_io.eprintlf "%s: %s"
(Unix.string_of_inet_addr inet_addr) msg)
(List.rev !errors) >>= fun () ->
exit 69
end else begin
let inet_addr = host_entry.h_addr_list.(i) in
let sockaddr = Lwt_unix.ADDR_INET (inet_addr, port) in
Lwt.catch
(fun () ->
Lwt_unix.connect sock sockaddr >>= fun () ->
process ())
(function
| Unix.Unix_error (error, _, _) ->
errors := (inet_addr, Unix.error_message error) :: !errors;
connect_loop (i + 1)
| exn ->
Lwt.return_error (`Exn exn))
end in
connect_loop 0 >>=
(function
| Ok () -> Lwt.return 0
| Error error -> report_error error >|= fun () -> 69)

let () =
let host = ref "" in
let port = ref 80 in
Arg.parse
["-a", Arg.Set_string host, " Address.";
"-p", Arg.Set_int port, " Port number."]
(fun _ ->
prerr_endline "No positional arguments accepted.";
exit 64)
"lwt_unix_get -a ADDRESS [-p PORT]";
if !host = "" then begin
prerr_endline "The -a option is mandatory unless you just ask for -help.";
exit 64
end else
exit (Lwt_main.run (main !host !port))
23 changes: 23 additions & 0 deletions httpaf-lwt-unix.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
opam-version: "1.2"
name: "httpaf-lwt-unix"
maintainer: "Spiros Eliopoulos <[email protected]>"
authors: [ "Spiros Eliopoulos <[email protected]>" ]
license: "BSD-3-clause"
homepage: "https://github.com/inhabitedtype/httpaf"
bug-reports: "https://github.com/inhabitedtype/httpaf/issues"
dev-repo: "https://github.com/inhabitedtype/httpaf.git"
build: [
["jbuilder" "subst"] {pinned}
["jbuilder" "build" "-p" name "-j" jobs]
]
build-test: [
["jbuilder" "runtest" "-p" name]
]
depends: [
"jbuilder" {build & >= "1.0+beta10"}
"angstrom-lwt-unix"
"faraday-lwt-unix"
"httpaf"
"lwt"
]
available: [ ocaml-version >= "4.03.0" ]
Loading

0 comments on commit b935fd7

Please sign in to comment.