Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
6.0.0 Unreleased.
---
* Solve problem with EINTR.
Add `send_all_r`, `send_all_msg_r`, `recv_all_r` and `recv_all_msg_r`
to allow resuming if the calls raise EAGIN or EINTR.

* Deprecate `send_all`, `send_all_msg`, `recv_all` and `recv_all_msg`
as resumable functions should be used instead.

* Fix deprecation warning on use of Async_kernel.Ivar.fill

* lwt and async now retry on EINTR and EAGAIN and handles
recv|send_*_all function correctly.

5.3.0
---
* Add eio binding in zmq-eio (#126, @andersfugmann)
Expand Down
2 changes: 1 addition & 1 deletion zmq-async/src/deferred.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ end
module Mailbox = struct
type 'a t = 'a Async_kernel.Ivar.t
let create () = Async_kernel.Ivar.create ()
let send t v = Async_kernel.Ivar.fill t v
let send t v = Async_kernel.Ivar.fill_exn t v
let recv t = Async_kernel.Ivar.read t
end

Expand Down
41 changes: 14 additions & 27 deletions zmq-deferred/src/socket.ml
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ module Make(T: Deferred.T) = struct
t

type op = Send | Receive
let post: _ t -> op -> (_ Zmq.Socket.t -> 'a) -> 'a Deferred.t = fun t op f ->
let f' mailbox () =
let res = match f t.socket with
let post: _ t -> op -> (_ Zmq.Socket.t -> unit -> 'a) -> 'a Deferred.t = fun t op f ->
let f' f mailbox () =
let res = match f () with
| v -> Ok v
| exception Unix.Unix_error (Unix.EAGAIN, _, _) ->
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EINTR), _, _) ->
(* Signal try again *)
raise Retry
| exception exn -> Error exn
Expand All @@ -176,7 +176,7 @@ module Make(T: Deferred.T) = struct
in
let mailbox = Mailbox.create () in
let should_signal = Queue.is_empty queue in
Queue.push (f' mailbox) queue;
Queue.push (f' (f t.socket) mailbox) queue;

(* Wakeup the thread if the queue was empty *)
begin
Expand All @@ -191,38 +191,25 @@ module Make(T: Deferred.T) = struct

let to_socket t = t.socket

let recv s = post s Receive (fun s -> Zmq.Socket.recv ~block:false s)
let send s m = post s Send (fun s -> Zmq.Socket.send ~block:false s m)
let recv s = post s Receive (fun s () -> Zmq.Socket.recv ~block:false s)
let send s m = post s Send (fun s () -> Zmq.Socket.send ~block:false s m)

let recv_msg s = post s Receive (fun s -> Zmq.Socket.recv_msg ~block:false s)
let recv_msg s = post s Receive (fun s () -> Zmq.Socket.recv_msg ~block:false s)
let send_msg s m =
post s Send (fun s -> Zmq.Socket.send_msg ~block:false s m)
post s Send (fun s () -> Zmq.Socket.send_msg ~block:false s m)

(** Recevie all message blocks. *)

let recv_all s =
(* The documentaton says that either all message parts are
transmitted, or none. So once a message becomes available, all
parts can be read wothout blocking.

Also receiving a multipart message must not be interleaved with
another receving thread on the same socket.

We could have a read-mutex and a write mutex in order to limit
potential starvation of other threads while reading large
multipart messages.

*)
post s Receive (fun s -> Zmq.Socket.recv_all ~block:false s)
post s Receive (fun s -> Zmq.Socket.recv_all_r ~block:false s)

let send_all s parts =
(* See the comment in recv_all. *)
post s Send (fun s -> Zmq.Socket.send_all ~block:false s parts)
post s Send (fun s -> Zmq.Socket.send_all_r ~block:false s parts)

let recv_msg_all s =
post s Receive (fun s -> Zmq.Socket.recv_msg_all ~block:false s)
post s Receive (fun s -> Zmq.Socket.recv_msg_all_r ~block:false s)
let send_msg_all s parts =
post s Send (fun s -> Zmq.Socket.send_msg_all ~block:false s parts)
post s Send (fun s -> Zmq.Socket.send_msg_all_r ~block:false s parts)

let close t =
t.closing <- true;
Expand All @@ -247,7 +234,7 @@ module Make(T: Deferred.T) = struct
end

module Monitor = struct
let recv s = post s Receive (fun s -> Zmq.Monitor.recv ~block:false s)
let recv s = post s Receive (fun s -> Zmq.Monitor.recv_r ~block:false s)
end

end
15 changes: 7 additions & 8 deletions zmq-eio/src/socket.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ let process queue =
| () ->
let (_: unit -> unit) = Queue.pop queue in
()
| exception Unix.Unix_error (Unix.EAGAIN, _, _) ->
(* If f raised EAGAIN, dont pop the message. *)
(* This should never happen. If so, the queue could be replaced with a Eio.Stream for faster handling *)
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EINTR), _, _) ->
(* Leave the function on the queue to be retried *)
()

let with_lock lock f =
Expand Down Expand Up @@ -123,10 +122,10 @@ let send_msg t message =
request t t.senders (fun () -> Zmq.Socket.send_msg ~block:false t.socket message)

let send_all t messages =
request t t.senders (fun () -> Zmq.Socket.send_all ~block:false t.socket messages)
request t t.senders (Zmq.Socket.send_all_r ~block:false t.socket messages)

let send_msg_all t messages =
request t t.senders (fun () -> Zmq.Socket.send_msg_all ~block:false t.socket messages)
request t t.senders (Zmq.Socket.send_msg_all_r ~block:false t.socket messages)

let recv t =
request t t.receivers (fun () -> Zmq.Socket.recv ~block:false t.socket)
Expand All @@ -135,10 +134,10 @@ let recv_msg t =
request t t.receivers (fun () -> Zmq.Socket.recv_msg ~block:false t.socket)

let recv_all t =
request t t.receivers (fun () -> Zmq.Socket.recv_all ~block:false t.socket)
request t t.receivers (Zmq.Socket.recv_all_r ~block:false t.socket)

let recv_msg_all t =
request t t.receivers (fun () -> Zmq.Socket.recv_msg_all ~block:false t.socket)
request t t.receivers (Zmq.Socket.recv_msg_all_r ~block:false t.socket)

module Router = struct
type id_t = string
Expand All @@ -155,5 +154,5 @@ module Router = struct
end

module Monitor = struct
let recv t = request t t.receivers (fun () -> Zmq.Monitor.recv ~block:false t.socket)
let recv t = request t t.receivers (Zmq.Monitor.recv_r ~block:false t.socket)
end
65 changes: 65 additions & 0 deletions zmq/src/zmq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ exception ZMQ_exception of error * string

external version : unit -> int * int * int = "caml_zmq_version"

type 'a resumable = unit -> 'a


module Context = struct
type t

Expand Down Expand Up @@ -492,6 +495,42 @@ module Socket = struct
type event = No_event | Poll_in | Poll_out | Poll_in_out | Poll_error
external events : 'a t -> event = "caml_zmq_get_events"


(** Allow resuming receive of a multipart message.
The function returns a function that can be "resumed" in case of EGAGIN or EINTR.
*)
let recv_all_wrapper_resumable: (?block:bool -> _ t -> 'a) -> ?block:bool -> _ t -> 'a list resumable = fun f ?block socket ->
let received = ref [] in
let rec cont f ?block () =
let message = f ?block socket in
received := message :: !received;
match has_more socket with
| true ->
cont f ~block:false ()
| false ->
(* Convert the queue to a list *)
List.rev !received
in
cont f ?block

(** Allow resuming send of a multipart message.
The function returns a function that can be "resumed" in case of EGAGIN or EINTR.
*)
let send_all_wrapper_resumable: (?block:bool -> ?more:bool -> _ t -> 'a -> unit) -> ?block:bool -> _ t -> 'a list -> unit resumable = fun f ?block socket messages ->
let messages = ref messages in
let rec cont (f: (?block:bool -> ?more:bool -> _ t -> 'a -> unit)) ?block socket () =
match !messages with
| [] -> ()
| [ msg ] ->
f ?block ~more:false socket msg
| msg :: msgs ->
f ?block ~more:true socket msg;
messages := msgs;
cont f ~block:true socket ()
in
cont f ?block socket

(** This function should never be used. It does not allow resuming if a signal is received, which may leave half read multi part messages on the socket. *)
let recv_all_wrapper (f : ?block:bool -> _ t -> _) =
(* Once the first message part is received all remaining message parts can
be received without blocking. *)
Expand Down Expand Up @@ -529,14 +568,26 @@ module Socket = struct
let recv_all ?block socket =
recv_all_wrapper recv ?block socket

let recv_all_r ?block socket =
recv_all_wrapper_resumable recv ?block socket

let send_all ?block socket message =
send_all_wrapper send ?block socket message

let send_all_r ?block socket message =
send_all_wrapper_resumable send ?block socket message

let recv_msg_all ?block socket =
recv_all_wrapper recv_msg ?block socket

let recv_msg_all_r ?block socket =
recv_all_wrapper_resumable recv_msg ?block socket

let send_msg_all ?block socket message =
send_all_wrapper send_msg ?block socket message

let send_msg_all_r ?block socket message =
send_all_wrapper_resumable send_msg ?block socket message
end

module Proxy = struct
Expand Down Expand Up @@ -635,6 +686,20 @@ module Monitor = struct
let addr = Socket.recv ~block:false socket in
decode_monitor_event event addr

let recv_r ?block socket =
let event = ref None in
let rec cont () =
match !event with
| None ->
event := Some (Socket.recv ?block socket);
cont ()
| Some event ->
assert (Socket.has_more socket);
let addr = Socket.recv ~block:false socket in
decode_monitor_event event addr
in
cont

let get_peer_address fd =
try
let sockaddr = Unix.getpeername fd in
Expand Down
74 changes: 62 additions & 12 deletions zmq/src/zmq.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@ type error =

exception ZMQ_exception of error * string

(** Resumable.
Allows repeated calls until the operation has
completed.
This is needed when reading or writing multipart messages. If EINTR is not
handled corretly, the socket may be left with a half written/read
message which will break subsequent operations.

For this reason, if EINTR is raised its important to repeat the
call to the resumable to avoid the the socket go into a broken
state.

For all [resumable]s the creation of the resumable does not send or receive any messages - that happend only when the resumable is evaluated.

*)
type 'a resumable = unit -> 'a

val version : unit -> int * int * int

module Context : sig
Expand Down Expand Up @@ -103,51 +119,78 @@ module Socket : sig
val recv : ?block:bool -> 'a t -> string

(** Read a complete multipart message from the socket.
block indicates if the call should be blocking or non-blocking. Default true
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@deprecated This function is unsafe wtr EINTR signals. Use {!recv_all_r} to allow resuming the operation
*)
val recv_all : ?block:bool -> 'a t -> string list

(** Read a complete multipart message from the socket.
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@return {!resumable} to allow resuming the operation in case of EINTR or EAGAIN.
*)
val recv_all_r : ?block:bool -> 'a t -> string list resumable

(** Send a message to the socket.
block indicates if the call should be blocking or non-blocking. Default true
more is used for multipart messages, and indicates that the more message parts will follow. Default false
@param block indicates if the call should be blocking or non-blocking.
Defaults to [true].
@param more indicate that more messages will follow and will be joined into a multipart message.
Defaults to [true].
*)
val send : ?block:bool -> ?more:bool -> 'a t -> string -> unit

(** Send a multipart message to the socket.
block indicates if the call should be blocking or non-blocking. Default true
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@deprecated This function is unsafe wtr EINTR signals. Use {!send_all_r} to allow resuming the operation
*)
val send_all : ?block:bool -> 'a t -> string list -> unit

(** Receive a {!Msg.t} on the socket.
(** Send a multipart message to the socket.
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN.
*)
val send_all_r : ?block:bool -> 'a t -> string list -> unit resumable


(** Receive a {!Msg.t} on the socket.
@param block indicates if the call should be blocking or non-blocking.
Defaults to [true].
*)
val recv_msg : ?block:bool -> 'a t -> Msg.t

(** Receive a multi-part message on the socket.

@param block indicates if the call should be blocking or non-blocking.
Defaults to [true].
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@deprecated This function is unsafe wtr EINTR signals. Use {!recv_msg_all_r} to allow resuming the operation
*)
val recv_msg_all : ?block:bool -> 'a t -> Msg.t list

(** Send a {!Msg.t} to the socket.
(** Receive a multi-part message on the socket.
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN.
*)
val recv_msg_all_r : ?block:bool -> 'a t -> Msg.t list resumable

(** Send a {!Msg.t} to the socket.
@param block indicates if the call should be blocking or non-blocking.
Defaults to [true].
@param more is used for multipart messages Set to [true] to indicate that
more message parts will follow. Defaults to [false].
more message parts will follow. Defaults to [false].
*)
val send_msg : ?block:bool -> ?more:bool -> 'a t -> Msg.t -> unit

(** Send a multi-part message to the socket.

@param block indicates if the call should be blocking or non-blocking.
Defaults to [true].
@deprecated This function is unsafe wtr EINTR signals. Use {!send_msg_all_r} to allow resuming the operation
*)
val send_msg_all : ?block:bool -> 'a t -> Msg.t list -> unit

(** Send a multi-part message to the socket.
@param block indicates if the call should be blocking or non-blocking.
Defaults to [true].
@return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN.
*)
val send_msg_all_r : ?block:bool -> 'a t -> Msg.t list -> unit resumable

(** Option Getter and Setters *)

(** Set the maximum message size of a message sent in this context,
Expand Down Expand Up @@ -314,10 +357,17 @@ module Monitor : sig
val connect: Context.t -> t -> [<`Monitor] Socket.t

(** Receive an event from the monitor socket.
block indicates if the call should be blocking or non-blocking. Default true
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@deprecated This function is unsafe wtr EINTR signals. Use {!recv_r} to allow resuming the operation
*)
val recv: ?block:bool -> [< `Monitor ] Socket.t -> event

(** Receive an event from the monitor socket.
@param block indicates if the call should be blocking or non-blocking. Defaults to [true].
@return {!resumable} to allow resuming the operation in case of EINTR or EGAGIN.
*)
val recv_r: ?block:bool -> [< `Monitor ] Socket.t -> event resumable

val string_of_event: event -> string

(** Create a memorizing function for converting an event to a string.
Expand Down
Loading