Skip to content

Commit

Permalink
WIP: Add kindaUnlift to find out what happens where
Browse files Browse the repository at this point in the history
  • Loading branch information
dpwiz committed Apr 10, 2023
1 parent 4a2b36b commit 52c1932
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
2 changes: 2 additions & 0 deletions troupe/src/Troupe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Troupe
spawn,
spawnLink,
spawnMonitor,
kindaUnlift,

-- *** Spawning processes with options
spawnWithOptions,
Expand Down Expand Up @@ -118,6 +119,7 @@ import Troupe.Process
sendLazy,
setProcessOption,
spawnWithOptions,
kindaUnlift,
unlink,
)
import Troupe.Types (Down (..), MonitorRef, ProcessId)
Expand Down
56 changes: 34 additions & 22 deletions troupe/src/Troupe/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Troupe.Process
exit,
isProcessAlive,
spawnWithOptions,
kindaUnlift,
SpawnOptions (..),
ThreadAffinity (..),
WithMonitor (..),
Expand All @@ -46,7 +47,7 @@ module Troupe.Process
where

import Control.Applicative (Alternative, (<|>))
import Control.Concurrent (throwTo)
import Control.Concurrent (threadDelay, throwTo)
import Control.Concurrent.Async
( async,
asyncThreadId,
Expand Down Expand Up @@ -100,7 +101,7 @@ import Control.Exception.Safe
uninterruptibleMask_,
withException,
)
import Control.Monad (MonadPlus, forM, unless, when)
import Control.Monad (MonadPlus, forever, forM, unless, when)
import Control.Monad.Error.Class (MonadError)
import Control.Monad.Fix (MonadFix)
import Control.Monad.IO.Class (MonadIO, liftIO)
Expand Down Expand Up @@ -583,17 +584,7 @@ data ThreadAffinity
-- 'Troupe.spawn', 'Troupe.spawnLink' and 'Troupe.spawnMonitor' are specialized
-- versions of this function.
spawnWithOptions :: (MonadProcess r m, MonadIO m) => SpawnOptions t -> Process r a -> m t
spawnWithOptions !options process = do
let cb pid = do
when (spawnOptionsLink options) $
linkSTM pid
case spawnOptionsMonitor options of
WithoutMonitor -> pure pid
WithMonitor -> do
ref <- monitorSTM pid
pure (pid, ref)

spawnImpl (spawnOptionsAffinity options) cb process
spawnWithOptions options = spawnImpl (spawnOptionsAffinity options) (mkCallback options)
{-# SPECIALIZE spawnWithOptions :: SpawnOptions t -> Process r a -> Process r t #-}

data SendOptions = SendOptions
Expand Down Expand Up @@ -719,22 +710,43 @@ after ::
after = MatchAfter
{-# INLINE after #-}

kindaUnlift :: (MonadProcess r m, MonadIO io) => ((ThreadAffinity -> Process r a -> io ()) -> m b) -> m b
kindaUnlift foreignSpawner = do
env <- getProcessEnv
foreignSpawner $ \affinity action -> do
_pid <- spawnImplWith env affinity pure action -- XXX: does it make sense to link/monitor a wrapped process?
-- TODO: spawnImplWith should have a blocking version
liftIO . forever $
threadDelay 10000000

mkCallback :: SpawnOptions r -> ProcessId -> ReaderT (ProcessEnv a) STM r
mkCallback !options pid = do
when (spawnOptionsLink options) $
linkSTM pid
case spawnOptionsMonitor options of
WithoutMonitor -> pure pid
WithMonitor -> do
ref <- monitorSTM pid
pure (pid, ref)

spawnImpl :: (MonadProcess r m, MonadIO m) => ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t
spawnImpl affinity cb process = do
currentEnv <- getProcessEnv
spawnImplWith currentEnv affinity cb process

liftIO $ do
processContext <- newProcessContext (processEnvNodeContext currentEnv)
let processEnv = currentEnv {processEnvProcessContext = processContext}
spawnImplWith :: MonadIO m => ProcessEnv r -> ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t
spawnImplWith currentEnv affinity cb process = liftIO $ do
processContext <- newProcessContext (processEnvNodeContext currentEnv)
let processEnv = currentEnv {processEnvProcessContext = processContext}

m <- newEmptyTMVarIO
m <- newEmptyTMVarIO

bracketOnError
(run currentEnv processEnv m)
uninterruptibleCancel
(wrapup m)
bracketOnError
(run processEnv m)
uninterruptibleCancel
(wrapup m)
where
run currentEnv processEnv m = mask_ $ async $ do
run processEnv m = mask_ $ async $ do
c <- newEmptyTMVarIO
let act restore = atomically (readTMVar c) >>= \() -> restore (runProcess process processEnv)

Expand Down

0 comments on commit 52c1932

Please sign in to comment.