diff --git a/Data/Streaming/Process.hs b/Data/Streaming/Process.hs index d0fd5ab..2af3a8d 100644 --- a/Data/Streaming/Process.hs +++ b/Data/Streaming/Process.hs @@ -32,7 +32,7 @@ module Data.Streaming.Process ) where import Control.Applicative as A ((<$>), (<*>)) -import Control.Concurrent (forkIOWithUnmask) +import Control.Concurrent (forkIOWithUnmask, threadWaitRead) import Control.Concurrent.STM (STM, TMVar, atomically, newEmptyTMVar, putTMVar, readTMVar) @@ -44,6 +44,7 @@ import Data.Streaming.Process.Internal import Data.Typeable (Typeable) import System.Exit (ExitCode (ExitSuccess)) import System.IO (hClose) +import System.Posix.IO (handleToFd) import System.Process #if MIN_VERSION_process(1,2,0) @@ -148,6 +149,19 @@ streamingProcess cp = liftIO $ do (getStdout, stdoutStream) = osStdStream (getStderr, stderrStream) = osStdStream + -- We use a pipe to the child process to determine when it's dead. + -- In Unix, when there is a Unix pipe between two processes, then + -- "When the child process terminates, its end of the pipe will be closed" + -- (see https://stackoverflow.com/questions/8976004/using-waitpid-or-sigaction/8976461#8976461) + -- See also http://tldp.org/LDP/lpg/node11.html about Unix pipes. + -- Making this decision based on a pipe FD is better than `waitpid()` because + -- we can use GHC IO manager's `threadWaitRead` function to wait in a + -- non-blocking, non-polling way. + -- TODO: Use `createPipeFd` instead of `createPipe` once this package + -- requries process >= 1.4.2.0; then we don't have to use + -- `handleToFd` below. + (readHandle, writeHandle) <- createPipe + #if MIN_VERSION_process(1,2,0) (stdinH, stdoutH, stderrH, ph) <- PI.createProcess_ "streamingProcess" cp #else @@ -158,12 +172,19 @@ streamingProcess cp = liftIO $ do , std_err = fromMaybe (std_err cp) stderrStream } + -- Close pipe write end from parent process (we don't need it). + hClose writeHandle + -- When the child process closes its write end (e.g. by terminating), + -- we'll read EOF on our read end, and we wait for that to happen with + -- the `threadWaitRead readFd` below. + readFd <- handleToFd readHandle + ec <- atomically newEmptyTMVar -- Apparently waitForProcess can throw an exception itself when -- delegate_ctlc is True, so to avoid this TMVar from being left empty, we -- capture any exceptions and store them as an impure exception in the -- TMVar - _ <- forkIOWithUnmask $ \_unmask -> try (waitForProcess ph) + _ <- forkIOWithUnmask $ \_unmask -> try (threadWaitRead readFd >> waitForProcess ph) >>= atomically . putTMVar ec . either