Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confusion with sourceProcessWithStreams and takeExactly #500

Open
nh2 opened this issue Jan 17, 2023 · 1 comment
Open

Confusion with sourceProcessWithStreams and takeExactly #500

nh2 opened this issue Jan 17, 2023 · 1 comment

Comments

@nh2
Copy link

nh2 commented Jan 17, 2023

I wrote some wrong code:

  -- Capture at most 1 line of stdout/stderr.
  -- When that has happened, `sourceProcessWithStreams` closes the
  -- corresponding pipe, so upon some more output the program will
  -- fail with SIGPIPE, so we ignore the exit code.
  (_exitCode, _outLinesList, errLinesList) <-
    sourceProcessWithStreams
      cp -- my program
      (return ()) -- stdin
      (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList) -- stdout
      (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList) -- stderr

This code does not do what the comment says it should do:

  • When the started process prints one or more lines to stderr and then anything to stdout, it works as expected.
  • But when the started process prints nothing to stderr and a lot to stdout, then sourceProcessWithStreams hangs forever.

I think this is because the process blocks on write(stdout, ...) over the pipe, while our sourceProcessWithStreams is implemented as

    (_, resStdout, resStderr) <-
      runConcurrently (
        (,,)
        <$> forall a. IO a -> Concurrently a
Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
      `finally` (closeStdout >> closeStderr)

and Applicative (,,,) <$> ... <*> ... <*> ... over Concurrently waits for all 3 parts to terminate.

So what happens is:

  1. The program prints >= 1 line to stdout. The Haskell parent consumes 1 line and Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout) terminates. stderr stays open.
  2. Now the Haskell parent waits for Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr)) to terminate, but that never happens because the program gets blocked writing more to stdout (which is never consumed by the parent).

This seems wrong

Question 1: Different sourceProcessWithStreams implementation

For each handle (not only stdin), shouldn't it be closed when the corresponding conduit terminates, like so:

    (_, resStdout, resStderr) <-
      runConcurrently (
        (,,)
        <$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
        <*> Concurrently ((unliftIO u $ runConduit $ sourceStdout .| consumerStdout) `finally` closeStdout)
        <*> Concurrently ((unliftIO u $ runConduit $ sourceStderr .| consumerStderr) `finally` closeStderr)
      )

I would argue this is would be correct behaviour because since the handles are created and contained within sourceProcessWithStreams and not made accessible anywhere, nothing else but consumerStdout/consumerStderr can read from them, so blocking is the only thing that can happen when those sinks terminate.

In either case, I think there should be a clear statement in the docs on what happens if conduits terminate without consuming all they are given (handle closing implying SIGPIPE on more, or infinite hanging).

Question 2: takeExactly docs

With the original sourceProcessWithStreams, I assumed that instead of implementing my desired terminate-after-1-line logic, I could also make it churn through all remaining stdout using sinkNull after consuming the first line:

-      (CT.decode CT.utf8 .| CT.lines .| CC.take 1 .| CC.sinkList)
+     (CT.decode CT.utf8 .| CT.lines .| ((CC.take 1 .| CC.sinkList) <* CC.sinkNull))

This works as expected.

Then I though that I could also write it using takeExactly

      (CT.decode CT.utf8 .| CT.lines .| CC.takeExactly 1 CC.sinkList)

I thought CC.takeExactly 1 CC.sinkList should be equivalent to ((CC.take 1 .| CC.sinkList) <* CC.sinkNull).

However, it is not; it blocks forever.

The docs say:

This function is in contrast to take, which will only consume up to the given number of values, and will terminate early if downstream terminates early. This function will discard any additional values in the stream if they are unconsumed.

After reading the implementation, I figured that

This function will discard any additional values in the stream if they are unconsumed.

really means

This function will discard any additional values in the stream if they are unconsumed by downstream.

and not (what I assumed)

This function will discard any additional values in the stream if they are unconsumed by this function.

That is, the implementation does:

take count .| do
    r <- inner
    CL.sinkNull
    return r

which is equivalent to

take count .| (inner <* CL.sinkNull)

and I thought it does

(take count .| inner) <* CL.sinkNull

I think it would be very helpful make this distinction clear, e.g. at least adding the suggested by downstream to the sentence; ideally even contrasting the two behavious above directly and saying which of them the function implements.

But I want to get your opinion first.

Question 3: createSource docs

Somewhat unrelatedly, I found that Data.Conduit.Process.Typed.createSource says

Read output from a process by read from a conduit.

and I didn't get what by read from a conduit means, maybe there's a typo or a word missing?

@snoyberg
Copy link
Owner

I see no problem with adding the close streams, seems reasonable. Want to confirm if fixes the problem and open a PR?

For takeExactly: you have it right, better docs PR welcome too.

No idea what by read from a conduit is supposed to mean TBH. Read output from a process using a Conduit would seem correct enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants