diff --git a/troupe/src/Troupe.hs b/troupe/src/Troupe.hs index 0948e38..b14bd60 100644 --- a/troupe/src/Troupe.hs +++ b/troupe/src/Troupe.hs @@ -13,6 +13,7 @@ module Troupe spawn, spawnLink, spawnMonitor, + kindaUnlift, -- *** Spawning processes with options spawnWithOptions, @@ -118,6 +119,7 @@ import Troupe.Process sendLazy, setProcessOption, spawnWithOptions, + kindaUnlift, unlink, ) import Troupe.Types (Down (..), MonitorRef, ProcessId) diff --git a/troupe/src/Troupe/Process.hs b/troupe/src/Troupe/Process.hs index d6c120c..8b17732 100644 --- a/troupe/src/Troupe/Process.hs +++ b/troupe/src/Troupe/Process.hs @@ -33,6 +33,7 @@ module Troupe.Process exit, isProcessAlive, spawnWithOptions, + kindaUnlift, SpawnOptions (..), ThreadAffinity (..), WithMonitor (..), @@ -46,7 +47,7 @@ module Troupe.Process where import Control.Applicative (Alternative, (<|>)) -import Control.Concurrent (throwTo) +import Control.Concurrent (threadDelay, throwTo) import Control.Concurrent.Async ( async, asyncThreadId, @@ -100,7 +101,7 @@ import Control.Exception.Safe uninterruptibleMask_, withException, ) -import Control.Monad (MonadPlus, forM, unless, when) +import Control.Monad (MonadPlus, forever, forM, unless, when) import Control.Monad.Error.Class (MonadError) import Control.Monad.Fix (MonadFix) import Control.Monad.IO.Class (MonadIO, liftIO) @@ -583,17 +584,7 @@ data ThreadAffinity -- 'Troupe.spawn', 'Troupe.spawnLink' and 'Troupe.spawnMonitor' are specialized -- versions of this function. spawnWithOptions :: (MonadProcess r m, MonadIO m) => SpawnOptions t -> Process r a -> m t -spawnWithOptions !options process = do - let cb pid = do - when (spawnOptionsLink options) $ - linkSTM pid - case spawnOptionsMonitor options of - WithoutMonitor -> pure pid - WithMonitor -> do - ref <- monitorSTM pid - pure (pid, ref) - - spawnImpl (spawnOptionsAffinity options) cb process +spawnWithOptions options = spawnImpl (spawnOptionsAffinity options) (mkCallback options) {-# SPECIALIZE spawnWithOptions :: SpawnOptions t -> Process r a -> Process r t #-} data SendOptions = SendOptions @@ -719,22 +710,43 @@ after :: after = MatchAfter {-# INLINE after #-} +kindaUnlift :: (MonadProcess r m, MonadIO io) => ((ThreadAffinity -> Process r a -> io ()) -> m b) -> m b +kindaUnlift foreignSpawner = do + env <- getProcessEnv + foreignSpawner $ \affinity action -> do + _pid <- spawnImplWith env affinity pure action -- XXX: does it make sense to link/monitor a wrapped process? + -- TODO: spawnImplWith should have a blocking version + liftIO . forever $ + threadDelay 10000000 + +mkCallback :: SpawnOptions r -> ProcessId -> ReaderT (ProcessEnv a) STM r +mkCallback !options pid = do + when (spawnOptionsLink options) $ + linkSTM pid + case spawnOptionsMonitor options of + WithoutMonitor -> pure pid + WithMonitor -> do + ref <- monitorSTM pid + pure (pid, ref) + spawnImpl :: (MonadProcess r m, MonadIO m) => ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t spawnImpl affinity cb process = do currentEnv <- getProcessEnv + spawnImplWith currentEnv affinity cb process - liftIO $ do - processContext <- newProcessContext (processEnvNodeContext currentEnv) - let processEnv = currentEnv {processEnvProcessContext = processContext} +spawnImplWith :: MonadIO m => ProcessEnv r -> ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t +spawnImplWith currentEnv affinity cb process = liftIO $ do + processContext <- newProcessContext (processEnvNodeContext currentEnv) + let processEnv = currentEnv {processEnvProcessContext = processContext} - m <- newEmptyTMVarIO + m <- newEmptyTMVarIO - bracketOnError - (run currentEnv processEnv m) - uninterruptibleCancel - (wrapup m) + bracketOnError + (run processEnv m) + uninterruptibleCancel + (wrapup m) where - run currentEnv processEnv m = mask_ $ async $ do + run processEnv m = mask_ $ async $ do c <- newEmptyTMVarIO let act restore = atomically (readTMVar c) >>= \() -> restore (runProcess process processEnv)