diff --git a/lib_eio/unix/process.ml b/lib_eio/unix/process.ml index 0e722465d..800104b22 100644 --- a/lib_eio/unix/process.ml +++ b/lib_eio/unix/process.ml @@ -102,3 +102,8 @@ let spawn_unix ~sw (mgr:#mgr) ?cwd ~fds ?env ?executable args = let executable = get_executable executable ~args in let env = get_env env in mgr#spawn_unix ~sw ?cwd ~fds ~env ~executable args + +let sigchld = Eio.Condition.create () + +let install_sigchld_handler () = + Sys.(set_signal sigchld) (Signal_handle (fun (_:int) -> Eio.Condition.broadcast sigchld)) diff --git a/lib_eio/unix/process.mli b/lib_eio/unix/process.mli index 2c6b42c1a..bc92bc89e 100644 --- a/lib_eio/unix/process.mli +++ b/lib_eio/unix/process.mli @@ -45,3 +45,14 @@ val spawn_unix : The arguments are as for {!Eio.Process.spawn}, except that it takes a list of FD mappings for {!Fork_action.inherit_fds} directly, rather than just flows for the standard streams. *) + +val sigchld : Eio.Condition.t +(** {b If} an Eio backend installs a SIGCHLD handler, the handler will broadcast on this condition. + + This allows non-Eio libraries (such as Lwt) to share its signal handler. + + Note: Not all backends install a handler (e.g. eio_linux uses process descriptors instead), + so be sure to call {!install_sigchld_handler} if you need to use this. *) + +val install_sigchld_handler : unit -> unit +(** [install_sigchld_handler ()] sets the signal handler for SIGCHLD to broadcast {!sigchld}. *) diff --git a/lib_eio_posix/children.ml b/lib_eio_posix/children.ml deleted file mode 100644 index d5a12d139..000000000 --- a/lib_eio_posix/children.ml +++ /dev/null @@ -1,88 +0,0 @@ -(* Keep track of running child processes and notify their fiber when they exit. - After forking a child process, it gets registered in the global [db] along with a resolver - for the promise of its exit status. When we get a SIGCHLD signal, we reap all exited processes - and resolve their promises, waking whichever fibers are waiting for them. - - We have to be careful not to use a PID after [wait] reaps it, as the PID could have been reused by then. - - The signal handler can run in any domain or systhread, so we have to be careful about that too. - We can't defer the call to [wait] until we're running in an Eio domain as we don't know which domain - should handle it until [wait] gives as the process ID. We don't want to delegate to a particular domain - because it might be spinning doing CPU stuff for a long time. Instead, we try to take the lock in the - signal handler and do it there. If we can't get the lock then we just record that a wait is needed; - whoever holds the lock will soon release it and will do the reaping for us. - - Note that, since signal handlers are global, - this will interfere with any libraries trying to manage processes themselves. - - For systems with Process Descriptors we could skip all this nonsense and - just poll on the process's FD. e.g. using [pdfork] on FreeBSD or [CLONE_PIDFD] on Linux. *) - -open Eio.Std - -(* Each child process is registered in this table. - Must hold [lock] when accessing it. *) -let db : (int, Unix.process_status Promise.u) Hashtbl.t = Hashtbl.create 10 - -(* Set to [true] when we receive [SIGCHLD] and [false] before calling [wait]. *) -let need_wait = Atomic.make false - -(* [lock] must be held when spawning or reaping. Otherwise, this can happen: - - - We spawn process 100, adding it to [db]. - - It exits, sending us SIGCHLD. - - The signal handler calls [wait], reaping it. - - Another domain spawns another process 100 and adds it to [db], - overwriting the previous entry. - - The signal handler resumes, and gets the wrong entry. - - If [lock] is already locked when the SIGCHLD handler runs then it just leaves [need_wait = true] - (a signal handler can't wait on a mutex, since it may have interrupted the holder). - The unlocker needs to check [need_wait] after releasing the lock. *) -let lock = Mutex.create () - -(* [pid] has exited. Notify the waiter. Must hold [lock] when calling this. *) -let report_child_status pid status = - match Hashtbl.find_opt db pid with - | Some r -> - Hashtbl.remove db pid; - Promise.resolve r status - | None -> - (* Not one of ours. Not much we can do here. The spawner will probably get - an [ECHILD] error when they wait, which will do for the error. *) - () - -(* Must hold [lock] when calling this. *) -let rec reap () = - Atomic.set need_wait false; - match Unix.(waitpid [WNOHANG] (-1)) with - | 0, _ -> () (* Returned if there are children but none has exited yet. *) - | pid, status -> report_child_status pid status; reap () - | exception Unix.Unix_error (EINTR, _, _) -> reap () - | exception Unix.Unix_error (ECHILD, _, _) -> () (* Returned if there are no children at all. *) - -let rec reap_nonblocking () = - if Mutex.try_lock lock then ( - reap (); - Mutex.unlock lock; - if Atomic.get need_wait then reap_nonblocking () - ) (* else the unlocker will see [need_wait] and call us later *) - -let unlock () = - Mutex.unlock lock; - if Atomic.get need_wait then reap_nonblocking () - -(* Must hold [lock] when calling this. *) -let register pid = - assert (not (Hashtbl.mem db pid)); - let p, r = Promise.create () in - Hashtbl.add db pid r; - p - -let with_lock fn = - Mutex.lock lock; - Fun.protect fn ~finally:unlock - -let handle_sigchld () = - Atomic.set need_wait true; - reap_nonblocking () diff --git a/lib_eio_posix/children.mli b/lib_eio_posix/children.mli deleted file mode 100644 index a8ead2276..000000000 --- a/lib_eio_posix/children.mli +++ /dev/null @@ -1,14 +0,0 @@ -(** Keep track of child processes and respond to SIGCHLD. *) - -val with_lock : (unit -> 'a) -> 'a -(** This must be held during the (fork, register) sequence - (so that we don't try to reap the process before it's registered), - and also when signalling a child process - (to ensure it isn't reaped at the same time). *) - -val register : int -> Unix.process_status Eio.Promise.t -(** [register pid] adds [pid] to the list of children and returns a promise for its exit status. - You must hold the lock while forking and then calling this. *) - -val handle_sigchld : unit -> unit -(** Call this on [SIGCHLD]. *) diff --git a/lib_eio_posix/eio_posix.ml b/lib_eio_posix/eio_posix.ml index e2f2d3f53..ce0b4edea 100644 --- a/lib_eio_posix/eio_posix.ml +++ b/lib_eio_posix/eio_posix.ml @@ -21,7 +21,7 @@ type stdenv = Eio_unix.Stdenv.base let run main = (* SIGPIPE makes no sense in a modern application. *) Sys.(set_signal sigpipe Signal_ignore); - Sys.(set_signal sigchld (Signal_handle (fun (_:int) -> Children.handle_sigchld ()))); + Eio_unix.Process.install_sigchld_handler (); let stdin = (Flow.of_fd Eio_unix.Fd.stdin :> Eio_unix.source) in let stdout = (Flow.of_fd Eio_unix.Fd.stdout :> Eio_unix.sink) in let stderr = (Flow.of_fd Eio_unix.Fd.stderr :> Eio_unix.sink) in diff --git a/lib_eio_posix/low_level.ml b/lib_eio_posix/low_level.ml index 4255c15ab..5938a8302 100644 --- a/lib_eio_posix/low_level.ml +++ b/lib_eio_posix/low_level.ml @@ -229,7 +229,9 @@ module Process = struct type t = { pid : int; exit_status : Unix.process_status Promise.t; + lock : Mutex.t; } + (* When [lock] is unlocked, [exit_status] is resolved iff the process has been reaped. *) let exit_status t = t.exit_status let pid t = t.pid @@ -249,36 +251,53 @@ module Process = struct fn r w let signal t signal = - (* The lock here ensures we don't signal the PID after reaping it. *) - Children.with_lock @@ fun () -> + (* We need the lock here so that one domain can't signal the process exactly as another is reaping it. *) + Mutex.lock t.lock; + Fun.protect ~finally:(fun () -> Mutex.unlock t.lock) @@ fun () -> if not (Promise.is_resolved t.exit_status) then ( Unix.kill t.pid signal - ) + ) (* else process has been reaped and t.pid is invalid *) external eio_spawn : Unix.file_descr -> Eio_unix.Private.Fork_action.c_action list -> int = "caml_eio_posix_spawn" + (* Wait for [pid] to exit and then resolve [exit_status] to its status. *) + let reap t exit_status = + Eio.Condition.loop_no_mutex Eio_unix.Process.sigchld (fun () -> + Mutex.lock t.lock; + match Unix.waitpid [WNOHANG] t.pid with + | 0, _ -> Mutex.unlock t.lock; None (* Not ready; wait for next SIGCHLD *) + | p, status -> + assert (p = t.pid); + Promise.resolve exit_status status; + Mutex.unlock t.lock; + Some () + ) + let spawn ~sw actions = with_pipe @@ fun errors_r errors_w -> Eio_unix.Private.Fork_action.with_actions actions @@ fun c_actions -> Switch.check sw; + let exit_status, set_exit_status = Promise.create () in let t = - (* We take the lock to ensure that the signal handler won't reap the - process before we've registered it. *) - Children.with_lock (fun () -> - let pid = - Fd.use_exn "errors-w" errors_w @@ fun errors_w -> - eio_spawn errors_w c_actions - in - Fd.close errors_w; - { pid; exit_status = Children.register pid } - ) + let pid = + Fd.use_exn "errors-w" errors_w @@ fun errors_w -> + eio_spawn errors_w c_actions + in + Fd.close errors_w; + { pid; exit_status; lock = Mutex.create () } in - let hook = Switch.on_release_cancellable sw (fun () -> signal t Sys.sigkill) in - (* Removing the hook must be done from our own domain, not from the signal handler, - so fork a fiber to deal with that. If the switch gets cancelled then this won't - run, but then the [on_release] handler will run the hook soon anyway. *) + let hook = Switch.on_release_cancellable sw (fun () -> + (* Kill process (if still running) *) + signal t Sys.sigkill; + (* The switch is being released, so either the daemon fiber got + cancelled or it hasn't started yet (and never will start). *) + if not (Promise.is_resolved t.exit_status) then ( + (* Do a (non-cancellable) waitpid here to reap the child. *) + reap t set_exit_status + ) + ) in Fiber.fork_daemon ~sw (fun () -> - ignore (Promise.await t.exit_status : Unix.process_status); + reap t set_exit_status; Switch.remove_hook hook; `Stop_daemon ); diff --git a/tests/process.md b/tests/process.md index dd6ded5ea..340a2b4bd 100644 --- a/tests/process.md +++ b/tests/process.md @@ -165,3 +165,20 @@ A custom environment: Process.parse_out mgr Eio.Buf_read.line ["sh"; "-c"; "echo $DISPLAY"] ~env;; - : string = ":2" ``` + +Eio's child reaping code doesn't interfere with OCaml's process spawning: + +```ocaml +let rec waitpid_with_retry flags pid = + try Unix.waitpid flags pid + with Unix.Unix_error(Unix.EINTR, _, _) -> waitpid_with_retry flags pid +``` + +```ocaml +# Eio_main.run @@ fun env -> + let p = Unix.(create_process "/usr/bin/env" [|"env"; "echo"; "hi"|] stdin stdout stderr) in + Eio.Time.Mono.sleep env#mono_clock 0.01; + waitpid_with_retry [] p |> snd;; +hi +- : Unix.process_status = Unix.WEXITED 0 +```