Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

withBuffer should kill the other thread when one completes #1

Open
ocharles opened this issue Nov 8, 2017 · 5 comments
Open

withBuffer should kill the other thread when one completes #1

ocharles opened this issue Nov 8, 2017 · 5 comments

Comments

@ocharles
Copy link
Contributor

ocharles commented Nov 8, 2017

withBuffer creates two threads - one to produce values and one to consume values. They communicate with a "seal" which starts empty, and when one thread finishes it signals that the work is done. The other thread notices the seal has been written to and stops what it is doing. However, this only works if the threads are blocked on activities relating to the basket. If they are actually blocked on some IO, they will never be terminated, and hence the whole withBuffer never terminates.

In my project, I have a producer that reads from a Socket, and the consumer reads Binary serialized values from connections to this socket:

withSocketValues
  :: (Binary a, Show a, MonadBase IO m, MonadMask m, MonadBaseControl IO m)
  => Socket -> (Stream (Of a) m () -> m r) -> m r
withSocketValues socket go =
  withBuffer unbounded receive (\basket -> withStreamBasket basket go <* liftBase (putStrLn "OK"))

  where
    receive basket =
      liftBase $
      forever $ do
        (client, _, _) <- accept socket
        forkIO $ writeStreamBasket (decoded (SBS.hGetContentsN 1 client)) basket

After all clients are done sending data, this won't terminate, because the receive action just gets stuck in accept.

I think withBuffer should be in charge of killing the reader or writer. Does this make sense?

@ivan-m
Copy link
Collaborator

ivan-m commented Nov 9, 2017

Hmmm, I see what you mean. I suppose I was thinking more of input being exhausted rather than output not wanting any more input.

I think what needs to happen is that the two bracket_ calls should both become finally which should solve this issue (as I think I had forgotten about finally when I first ported this from pipes-concurrency).

@ivan-m
Copy link
Collaborator

ivan-m commented Nov 13, 2017

Turns out finally is implemented using bracket_ so that's not it.

I'm not sure if it's possible in the general case for withBuffer to kill either thread; how can it do so?

I think the problem here is the usage of forever: writeStreamBasket will stop as soon as the buffer indicates it can no longer accept input (as the continuation will seal the buffer when it completes).

I think what you should do is to use forever to build up Stream and then write that to the basket, rather than continuously trying to write to the basket.

The only other approach I can think of is that if you try and write to a basket that's already sealed it throws an exception; since it's possible, however, for the continuation to immediately exit before you recursively try to write to it I don't think that's viable.

@ocharles
Copy link
Contributor Author

I'm not sure if it's possible in the general case for withBuffer to kill either thread; how can it do so?

Can't it just throw an exception to the thread? It's obviously a bit cleaner if you use async and just cancel it. I believe that's what I meant, but I haven't fulled paged this issue back into my head.

@tonyday567
Copy link

It might be the old "needs one in the chamber problem":

import Prelude
import qualified Streaming as S
import qualified Streaming.Prelude as S
import Streaming.Concurrent

ex1 :: IO [String]
ex1 = S.toList_ . S.take 3 $ S.stdinLn

buff o i = withBuffer unbounded (writeStreamBasket i) (\basket -> withStreamBasket basket o)

ex2 :: IO [String]
ex2 = (S.toList_ . S.take 3) `buff` S.stdinLn
� ex1
a
b
c
["a","b","c"]
� ex2
a
b
c
x
["a","b","c"]
� ```

@tonyday567
Copy link

@ocharles I think #3 will work for your example. The InBasket thread can't cancel the OutBasket one, because that needs to return to arrive at the answer, but you can have the OutBasket thread cancel the InBasket thread if it finishes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants