Skip to content

Commit

Permalink
Make streamingProcess work with the non-threaded runtime. Fixes #40
Browse files Browse the repository at this point in the history
  • Loading branch information
nh2 authored and snoyberg committed Jan 19, 2018
1 parent 8aac17a commit b4e93a9
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions Data/Streaming/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module Data.Streaming.Process
) where

import Control.Applicative as A ((<$>), (<*>))
import Control.Concurrent (forkIOWithUnmask)
import Control.Concurrent (forkIOWithUnmask, threadWaitRead)
import Control.Concurrent.STM (STM, TMVar, atomically,
newEmptyTMVar, putTMVar,
readTMVar)
Expand All @@ -44,6 +44,7 @@ import Data.Streaming.Process.Internal
import Data.Typeable (Typeable)
import System.Exit (ExitCode (ExitSuccess))
import System.IO (hClose)
import System.Posix.IO (handleToFd)
import System.Process

#if MIN_VERSION_process(1,2,0)
Expand Down Expand Up @@ -148,6 +149,19 @@ streamingProcess cp = liftIO $ do
(getStdout, stdoutStream) = osStdStream
(getStderr, stderrStream) = osStdStream

-- We use a pipe to the child process to determine when it's dead.
-- In Unix, when there is a Unix pipe between two processes, then
-- "When the child process terminates, its end of the pipe will be closed"
-- (see https://stackoverflow.com/questions/8976004/using-waitpid-or-sigaction/8976461#8976461)
-- See also http://tldp.org/LDP/lpg/node11.html about Unix pipes.
-- Making this decision based on a pipe FD is better than `waitpid()` because
-- we can use GHC IO manager's `threadWaitRead` function to wait in a
-- non-blocking, non-polling way.
-- TODO: Use `createPipeFd` instead of `createPipe` once this package
-- requries process >= 1.4.2.0; then we don't have to use
-- `handleToFd` below.
(readHandle, writeHandle) <- createPipe

#if MIN_VERSION_process(1,2,0)
(stdinH, stdoutH, stderrH, ph) <- PI.createProcess_ "streamingProcess" cp
#else
Expand All @@ -158,12 +172,19 @@ streamingProcess cp = liftIO $ do
, std_err = fromMaybe (std_err cp) stderrStream
}

-- Close pipe write end from parent process (we don't need it).
hClose writeHandle
-- When the child process closes its write end (e.g. by terminating),
-- we'll read EOF on our read end, and we wait for that to happen with
-- the `threadWaitRead readFd` below.
readFd <- handleToFd readHandle

ec <- atomically newEmptyTMVar
-- Apparently waitForProcess can throw an exception itself when
-- delegate_ctlc is True, so to avoid this TMVar from being left empty, we
-- capture any exceptions and store them as an impure exception in the
-- TMVar
_ <- forkIOWithUnmask $ \_unmask -> try (waitForProcess ph)
_ <- forkIOWithUnmask $ \_unmask -> try (threadWaitRead readFd >> waitForProcess ph)
>>= atomically
. putTMVar ec
. either
Expand Down

0 comments on commit b4e93a9

Please sign in to comment.