Skip to content

Commit

Permalink
Add Lwt_io.establish_server_with_client_socket
Browse files Browse the repository at this point in the history
  • Loading branch information
aantron committed Apr 29, 2018
1 parent 37d3da1 commit 56c1b6e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 18 deletions.
20 changes: 13 additions & 7 deletions src/unix/lwt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1621,7 +1621,7 @@ let establish_server_generic
in

connection_handler_callback
client_address (input_channel, output_channel);
client_address client_socket (input_channel, output_channel);

accept_loop ()

Expand Down Expand Up @@ -1659,8 +1659,8 @@ let establish_server_generic

server, server_has_started

let establish_server_with_client_address
?fd ?buffer_size ?backlog ?(no_close = false) sockaddr f =
let establish_server_with_client_socket
?server_fd ?buffer_size ?backlog ?(no_close = false) sockaddr f =
let best_effort_close channel =
(* First, check whether the channel is closed. f may have already tried to
close the channel, received an exception, and handled it somehow. If so,
Expand All @@ -1678,13 +1678,13 @@ let establish_server_with_client_address
Lwt.return_unit)
in

let handler addr ((input_channel, output_channel) as channels) =
let handler addr socket ((input_channel, output_channel) as channels) =
Lwt.async (fun () ->
(* Not using Lwt.finalize here, to make sure that exceptions from [f]
reach !Lwt.async_exception_hook before exceptions from closing the
channels. *)
Lwt.catch
(fun () -> f addr channels)
(fun () -> f addr socket channels)
(fun exn ->
!Lwt.async_exception_hook exn;
Lwt.return_unit)
Expand All @@ -1698,11 +1698,17 @@ let establish_server_with_client_address

let server, started =
establish_server_generic
Lwt_unix.bind ?fd ?buffer_size ?backlog sockaddr handler
Lwt_unix.bind ?fd:server_fd ?buffer_size ?backlog sockaddr handler
in
started >>= fun () ->
Lwt.return server

let establish_server_with_client_address
?fd ?buffer_size ?backlog ?no_close sockaddr handler =
let handler addr _socket c = handler addr c in
establish_server_with_client_socket
?server_fd:fd ?buffer_size ?backlog ?no_close sockaddr handler

let establish_server ?fd ?buffer_size ?backlog ?no_close sockaddr f =
let f _addr c = f c in
establish_server_with_client_address
Expand All @@ -1715,7 +1721,7 @@ let establish_server_deprecated ?fd ?buffer_size ?backlog sockaddr f =
let blocking_bind fd addr =
Lwt.return (Lwt_unix.Versioned.bind_1 fd addr) [@ocaml.warning "-3"]
in
let f _addr c = f c in
let f _addr _socket c = f c in

let server, server_started =
establish_server_generic blocking_bind ?fd ?buffer_size ?backlog sockaddr f
Expand Down
39 changes: 28 additions & 11 deletions src/unix/lwt_io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -512,32 +512,36 @@ val with_close_connection :
type server
(** Type of a server *)

val establish_server_with_client_address :
?fd : Lwt_unix.file_descr ->
val establish_server_with_client_socket :
?server_fd : Lwt_unix.file_descr ->
?buffer_size : int ->
?backlog : int ->
?no_close : bool ->
Unix.sockaddr ->
(Lwt_unix.sockaddr -> input_channel * output_channel -> unit Lwt.t) ->
(Lwt_unix.sockaddr ->
Lwt_unix.file_descr ->
input_channel * output_channel ->
unit Lwt.t) ->
server Lwt.t
(** [establish_server_with_client_address listen_address f] creates a server
(** [establish_server_with_client_socket listen_address f] creates a server
which listens for incoming connections on [listen_address]. When a client
makes a new connection, it is passed to [f]: more precisely, the server
calls
{[
f client_address (in_channel, out_channel)
f client_address client_socket (in_channel, out_channel)
]}
where [client_address] is the address (peer name) of the new client, and
[in_channel] and [out_channel] are two channels wrapping the socket for
where [client_address] is the address (peer name) of the new client,
[client_socket] is the socket connected to the client, and [in_channel] and
[out_channel] are two buffered channels wrapping the socket for
communicating with that client.
The server does not block waiting for [f] to complete: it concurrently tries
to accept more client connections while [f] is handling the client.
When the promise returned by [f] completes (i.e., [f] is done handling the
client), [establish_server_with_client_address] automatically closes
client), [establish_server_with_client_socket] automatically closes
[in_channel] and [out_channel]. This is a default behavior that is useful
for simple cases, but for a robust application you should explicitly close
these channels yourself, and handle any exceptions. If the channels are
Expand All @@ -553,8 +557,10 @@ f client_address (in_channel, out_channel)
an exception), [establish_server_with_client_address] can do nothing with
that exception, except pass it to {!Lwt.async_exception_hook}.
[~fd] can be specified to use an existing file descriptor for listening.
Otherwise, a fresh socket is created internally.
[~server_fd] can be specified to use an existing file descriptor for
listening. Otherwise, a fresh socket is created internally. In either case,
[establish_server_with_client_socket] will internally assign
[listen_address] to the server socket.
[~backlog] is the argument passed to {!Lwt_unix.listen}.
Expand Down Expand Up @@ -686,6 +692,17 @@ val set_default_buffer_size : int -> unit

(** {2 Deprecated} *)

val establish_server_with_client_address :
?fd : Lwt_unix.file_descr ->
?buffer_size : int ->
?backlog : int ->
?no_close : bool ->
Unix.sockaddr ->
(Lwt_unix.sockaddr -> input_channel * output_channel -> unit Lwt.t) ->
server Lwt.t
(** Like [establish_server_with_client_fd], but does not pass the client fd to
the callback [f]. *)

val establish_server :
?fd : Lwt_unix.file_descr ->
?buffer_size : int ->
Expand All @@ -696,7 +713,7 @@ val establish_server :
[@@ocaml.deprecated
" Since Lwt 3.1.0, use Lwt_io.establish_server_with_client_address"]
(** Like [establish_server_with_client_address], but does not pass the client
address to the callback [f].
address or fd to the callback [f].
@deprecated Use {!establish_server_with_client_address}.
@since 3.0.0 *)
Expand Down

0 comments on commit 56c1b6e

Please sign in to comment.