Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nh2 fix streaming process on nonthreaded #45

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions Data/Streaming/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module Data.Streaming.Process
) where

import Control.Applicative as A ((<$>), (<*>))
import Control.Concurrent (forkIOWithUnmask)
import Control.Concurrent (forkIOWithUnmask, threadWaitRead)
import Control.Concurrent.STM (STM, TMVar, atomically,
newEmptyTMVar, putTMVar,
readTMVar)
Expand All @@ -44,6 +44,7 @@ import Data.Streaming.Process.Internal
import Data.Typeable (Typeable)
import System.Exit (ExitCode (ExitSuccess))
import System.IO (hClose)
import System.Posix.IO (handleToFd)
import System.Process

#if MIN_VERSION_process(1,2,0)
Expand Down Expand Up @@ -148,6 +149,19 @@ streamingProcess cp = liftIO $ do
(getStdout, stdoutStream) = osStdStream
(getStderr, stderrStream) = osStdStream

-- We use a pipe to the child process to determine when it's dead.
-- In Unix, when there is a Unix pipe between two processes, then
-- "When the child process terminates, its end of the pipe will be closed"
-- (see https://stackoverflow.com/questions/8976004/using-waitpid-or-sigaction/8976461#8976461)
-- See also http://tldp.org/LDP/lpg/node11.html about Unix pipes.
-- Making this decision based on a pipe FD is better than `waitpid()` because
-- we can use GHC IO manager's `threadWaitRead` function to wait in a
-- non-blocking, non-polling way.
-- TODO: Use `createPipeFd` instead of `createPipe` once this package
-- requries process >= 1.4.2.0; then we don't have to use
-- `handleToFd` below.
(readHandle, writeHandle) <- createPipe

#if MIN_VERSION_process(1,2,0)
(stdinH, stdoutH, stderrH, ph) <- PI.createProcess_ "streamingProcess" cp
#else
Expand All @@ -158,12 +172,19 @@ streamingProcess cp = liftIO $ do
, std_err = fromMaybe (std_err cp) stderrStream
}

-- Close pipe write end from parent process (we don't need it).
hClose writeHandle
-- When the child process closes its write end (e.g. by terminating),
-- we'll read EOF on our read end, and we wait for that to happen with
-- the `threadWaitRead readFd` below.
readFd <- handleToFd readHandle

ec <- atomically newEmptyTMVar
-- Apparently waitForProcess can throw an exception itself when
-- delegate_ctlc is True, so to avoid this TMVar from being left empty, we
-- capture any exceptions and store them as an impure exception in the
-- TMVar
_ <- forkIOWithUnmask $ \_unmask -> try (waitForProcess ph)
_ <- forkIOWithUnmask $ \_unmask -> try (threadWaitRead readFd >> waitForProcess ph)
>>= atomically
. putTMVar ec
. either
Expand Down