Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add kindaUnlift to find out what happens where #43

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions troupe/src/Troupe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Troupe
spawn,
spawnLink,
spawnMonitor,
kindaUnlift,

-- *** Spawning processes with options
spawnWithOptions,
Expand Down Expand Up @@ -118,6 +119,7 @@ import Troupe.Process
sendLazy,
setProcessOption,
spawnWithOptions,
kindaUnlift,
unlink,
)
import Troupe.Types (Down (..), MonitorRef, ProcessId)
Expand Down
56 changes: 34 additions & 22 deletions troupe/src/Troupe/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Troupe.Process
exit,
isProcessAlive,
spawnWithOptions,
kindaUnlift,
SpawnOptions (..),
ThreadAffinity (..),
WithMonitor (..),
Expand All @@ -46,7 +47,7 @@ module Troupe.Process
where

import Control.Applicative (Alternative, (<|>))
import Control.Concurrent (throwTo)
import Control.Concurrent (threadDelay, throwTo)
import Control.Concurrent.Async
( async,
asyncThreadId,
Expand Down Expand Up @@ -100,7 +101,7 @@ import Control.Exception.Safe
uninterruptibleMask_,
withException,
)
import Control.Monad (MonadPlus, forM, unless, when)
import Control.Monad (MonadPlus, forever, forM, unless, when)
import Control.Monad.Error.Class (MonadError)
import Control.Monad.Fix (MonadFix)
import Control.Monad.IO.Class (MonadIO, liftIO)
Expand Down Expand Up @@ -583,17 +584,7 @@ data ThreadAffinity
-- 'Troupe.spawn', 'Troupe.spawnLink' and 'Troupe.spawnMonitor' are specialized
-- versions of this function.
spawnWithOptions :: (MonadProcess r m, MonadIO m) => SpawnOptions t -> Process r a -> m t
spawnWithOptions !options process = do
let cb pid = do
when (spawnOptionsLink options) $
linkSTM pid
case spawnOptionsMonitor options of
WithoutMonitor -> pure pid
WithMonitor -> do
ref <- monitorSTM pid
pure (pid, ref)

spawnImpl (spawnOptionsAffinity options) cb process
spawnWithOptions options = spawnImpl (spawnOptionsAffinity options) (mkCallback options)
{-# SPECIALIZE spawnWithOptions :: SpawnOptions t -> Process r a -> Process r t #-}

data SendOptions = SendOptions
Expand Down Expand Up @@ -719,22 +710,43 @@ after ::
after = MatchAfter
{-# INLINE after #-}

kindaUnlift :: (MonadProcess r m, MonadIO io) => ((ThreadAffinity -> Process r a -> io ()) -> m b) -> m b
kindaUnlift foreignSpawner = do
env <- getProcessEnv
foreignSpawner $ \affinity action -> do
_pid <- spawnImplWith env affinity pure action -- XXX: does it make sense to link/monitor a wrapped process?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe so. In the current example (Server.run), unlift is used in an infinite loop, so there it may not make sense. However, there might be cases where it's used one-shot, and hence the parent Process may very well be interested in the child exiting (through monitor), or getting linked.

In a way, I think link should almost be the default: with the current approach, it's way too easy to end up with ghost thread/processes, no?

Copy link
Author

@dpwiz dpwiz Apr 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A server process may wish to monitor spawned handler processes, but no new implementation details would be needed for that.

server = do
  serverPid <- self
  kindaUnlift \unlift -> liftIO do
    config <- Server.setup
    Server.run config \connection ->
      unlift Unbound do
        handlerPid <- self
        -- The handler can now use serverPid and handlerPid to establish monitoring, linking or else
        forever $ doHandlerThings connection
    -- XXX: the server Process is now blocked by running Server loop

-- TODO: spawnImplWith should have a blocking version
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. Would it make sense for the caller thread to become the monitor of the process, instead of spawning two more threads (one monitor, one to run the effective Process), and hence block?

Not even sure that's the right approach. In a way, I'm bit uncomfortable with how threads would be managed now. Does your Server.run ever kill a thread it spawns? (In which case it'd be this forever one, and we leak 2 child threads). Does Server.run somehow wrap the inner thing and close the connection when it returns?...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does your Server.run ever kill a thread it spawns?

In general, we don't know.

A typical server loop is

run handler =
  bracket open close \socket ->
    forever do
      connection <- accept socket
      void $! forkIO (handler connection)
  • A server doesn't care if a handler thread crashes.
  • A handler may crash when the server quits - when doing things with its connection derived from the now-dead socket.
  • A handler spawning new threads doesn't affect the server and its loop.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I was thinking: what if we invert things a bit? Basically, instead of spawning a monitor thread (as done in spawnImpl), and another one to run the Process code, can we implement things such that unlift spawns a monitor thread, and then re-uses the thread from which it was invoked (i.e., the thread spawned by Server.run) to actually run the Process code?

This way, whatever Server.run does to the thread it spawned (monitor it, send it some async exception,...) will be "just fine": the code running in the thread (now in a Process context) likely already expects to run in a Server.run-spawned thread (and whatever this may mean), and the monitor thread can still ensure monitors/links are notified on the Troupe side of things.

There's one (somewhat major) caveat: we must make sure a single thread is never used for two ProcessContexts, so Server.run really must forkIO, and kinadUnlift can't be used to create an unlift which is then used in some existing Process thread.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this could be done, spawnImpl could be (re)written to basically use the very same pattern when using 'regular' spawn*, kinda inverting the current implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, instead of spawning a monitor thread (as done in spawnImpl), and another one to run the Process code, can we implement things such that unlift spawns a monitor thread, and then re-uses the thread from which it was invoked (i.e., the thread spawned by Server.run) to actually run the Process code?

Yes, this is how I see it too.

liftIO . forever $
dpwiz marked this conversation as resolved.
Show resolved Hide resolved
threadDelay 10000000

mkCallback :: SpawnOptions r -> ProcessId -> ReaderT (ProcessEnv a) STM r
mkCallback !options pid = do
when (spawnOptionsLink options) $
linkSTM pid
case spawnOptionsMonitor options of
WithoutMonitor -> pure pid
WithMonitor -> do
ref <- monitorSTM pid
pure (pid, ref)

spawnImpl :: (MonadProcess r m, MonadIO m) => ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t
spawnImpl affinity cb process = do
currentEnv <- getProcessEnv
spawnImplWith currentEnv affinity cb process

liftIO $ do
processContext <- newProcessContext (processEnvNodeContext currentEnv)
let processEnv = currentEnv {processEnvProcessContext = processContext}
spawnImplWith :: MonadIO m => ProcessEnv r -> ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t
spawnImplWith currentEnv affinity cb process = liftIO $ do
processContext <- newProcessContext (processEnvNodeContext currentEnv)
let processEnv = currentEnv {processEnvProcessContext = processContext}

m <- newEmptyTMVarIO
m <- newEmptyTMVarIO

bracketOnError
(run currentEnv processEnv m)
uninterruptibleCancel
(wrapup m)
bracketOnError
(run processEnv m)
uninterruptibleCancel
(wrapup m)
where
run currentEnv processEnv m = mask_ $ async $ do
run processEnv m = mask_ $ async $ do
c <- newEmptyTMVarIO
let act restore = atomically (readTMVar c) >>= \() -> restore (runProcess process processEnv)

Expand Down