Skip to content

Commit

Permalink
troupe: add after in favor of receiveTimeout
Browse files Browse the repository at this point in the history
This patch simplifies the `receive*` machinery, introducing an `after`
`Match` which uses STM's `registerDelay` to implement a `receive` with
timeouts, instead of relying on `CQueue`s `Timeout` and `NonBlocking`
implementations.

This should ease reworking `CQueue`, since there should be no need for
a distinction between blocking, timeout and non-blocking behaviour in
its implementation. Hence, from now on, `troupe` only uses `CQueue`s
`Blocking` `dequeue` method.

See: #12
See: #11
  • Loading branch information
NicolasT committed Mar 29, 2023
1 parent abe08d9 commit 064f036
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 66 deletions.
4 changes: 2 additions & 2 deletions troupe/src/Troupe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ module Troupe
send,
sendLazy,
receive,
receiveTimeout,
expect,
Match,
match,
matchIf,
after,

-- ** Linking and monitoring processes
link,
Expand Down Expand Up @@ -99,6 +99,7 @@ import Troupe.Process
SpawnOptions (..),
ThreadAffinity (..),
WithMonitor (..),
after,
demonitor,
exit,
expect,
Expand All @@ -111,7 +112,6 @@ import Troupe.Process
newNodeContext,
newProcessContext,
receive,
receiveTimeout,
runProcess,
self,
send,
Expand Down
127 changes: 68 additions & 59 deletions troupe/src/Troupe/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ module Troupe.Process
send,
sendLazy,
receive,
receiveTimeout,
expect,
Match,
match,
matchIf,
after,
)
where

Expand Down Expand Up @@ -74,6 +74,7 @@ import Control.Concurrent.STM
readTMVar,
readTQueue,
readTVar,
registerDelay,
throwSTM,
tryReadTMVar,
writeTMVar,
Expand All @@ -84,7 +85,7 @@ import Control.DeepSeq (NFData, deepseq, ($!!))
import Control.Distributed.Process.Internal.CQueue
( BlockSpec (..),
CQueue,
MatchOn (MatchMsg),
MatchOn (..),
dequeue,
enqueueSTM,
newCQueue,
Expand All @@ -104,7 +105,7 @@ import Control.Exception.Safe
uninterruptibleMask_,
withException,
)
import Control.Monad (MonadPlus, unless, void, when)
import Control.Monad (MonadPlus, forM, unless, when)
import Control.Monad.Error.Class (MonadError)
import Control.Monad.Fix (MonadFix)
import Control.Monad.IO.Class (MonadIO, liftIO)
Expand All @@ -131,7 +132,6 @@ import qualified Control.Monad.Trans.Writer.CPS as CPS (WriterT)
import qualified Control.Monad.Trans.Writer.Lazy as Lazy (WriterT)
import qualified Control.Monad.Trans.Writer.Strict as Strict (WriterT)
import Data.Dynamic (Dynamic, fromDynamic, toDyn)
import Data.Functor.Identity (Identity (..), runIdentity)
import Data.Maybe (isJust)
import Data.Typeable (Typeable)
import DeferredFolds.UnfoldlM (forM_)
Expand Down Expand Up @@ -493,11 +493,10 @@ demonitor :: (MonadProcess r m, MonadIO m) => [DemonitorOption] -> MonitorRef ->
demonitor !options !ref = do
liftMonadProcess id $ demonitorSTM ref
when (DemonitorFlush `elem` options) $ do
void $
receiveTimeout
0
[ matchIf (\d -> downMonitorRef d == ref) (\_ -> pure ())
]
receive
[ matchIf (\d -> downMonitorRef d == ref) (\_ -> pure ()),
after 0 (pure ())
]
{-# SPECIALIZE demonitor :: [DemonitorOption] -> MonitorRef -> Process r () #-}

exitSTM :: (Exception e) => ProcessId -> Maybe e -> ReaderT (ProcessEnv r) STM ()
Expand Down Expand Up @@ -679,78 +678,51 @@ sendLazy = sendWithOptions SendOptions
{-# INLINE sendLazy #-}
{-# SPECIALIZE sendLazy :: (Typeable a) => ProcessId -> a -> Process r () #-}

data ReceiveMethod f where
ReceiveBlocking :: ReceiveMethod Identity
ReceiveNonBlocking :: ReceiveMethod Maybe
ReceiveTimeout :: Int -> ReceiveMethod Maybe

{- HLINT ignore ReceiveOptions "Use newtype instead of data" -}
data ReceiveOptions f = ReceiveOptions
{ receiveOptionsMethod :: !(ReceiveMethod f)
}
data ReceiveOptions = ReceiveOptions

-- | Matching clause for a value of type @a@ in monadic context @m@.
newtype Match m a
data Match m a
= MatchMessage (Dynamic -> Maybe (m a))
| MatchAfter Int (m a)
deriving (Functor)

receiveWithOptions :: (MonadProcess r m, MonadIO m) => ReceiveOptions f -> [Match m a] -> m (f a)
receiveWithOptions !options !matches = do
receiveWithOptions :: (MonadProcess r m, MonadIO m) => ReceiveOptions -> [Match m a] -> m a
receiveWithOptions ReceiveOptions !matches = do
queue <- processContextQueue . processEnvProcessContext <$> getProcessEnv
let bs = case receiveOptionsMethod options of
ReceiveBlocking -> Blocking
ReceiveNonBlocking -> NonBlocking
ReceiveTimeout t -> Timeout t
matches' = map (\(MatchMessage fn) -> MatchMsg fn) matches
p <- liftIO $ dequeue queue bs matches'

p <- liftIO $ do
matches' <- forM matches $ \case
MatchMessage fn -> pure (MatchMsg fn)
MatchAfter t ma -> case t of
0 -> pure $ MatchChan $ pure ma
t' -> do
tv <- registerDelay t'
pure $ MatchChan $ do
v <- readTVar tv
check v
pure ma

dequeue queue Blocking matches'

ensureSignalsDelivered

case p of
Nothing -> case receiveOptionsMethod options of
ReceiveBlocking -> error "receiveWithOptions: dequeue returned Nothing in Blocking call"
ReceiveNonBlocking -> pure Nothing
ReceiveTimeout _ -> pure Nothing
Just a -> case receiveOptionsMethod options of
ReceiveBlocking -> Identity <$> a
ReceiveNonBlocking -> Just <$> a
ReceiveTimeout _ -> Just <$> a
Nothing -> error "receiveWithOptions: dequeue returned Nothing"
Just ma -> ma
where
ensureSignalsDelivered = do
exceptions <- processContextExceptions . processEnvProcessContext <$> getProcessEnv
liftIO $ atomically $ do
e <- isEmptyTQueue exceptions
check e
{-# SPECIALIZE receiveWithOptions :: ReceiveOptions f -> [Match (Process r) a] -> Process r (f a) #-}
{-# SPECIALIZE receiveWithOptions :: ReceiveOptions -> [Match (Process r) a] -> Process r a #-}

-- | Receive some message from the process mailbox, blocking.
receive :: (MonadProcess r m, MonadIO m) => [Match m a] -> m a
receive !matches = runIdentity <$> receiveWithOptions options matches
where
options =
ReceiveOptions
{ receiveOptionsMethod = ReceiveBlocking
}
receive !matches = receiveWithOptions ReceiveOptions matches
{-# INLINE receive #-}
{-# SPECIALIZE receive :: [Match (Process r) a] -> Process r a #-}

-- | Receive some message from the process mailbox.
--
-- If the given timeout is @0@, this works in a non-blocking way. Otherwise,
-- the call will time out after the given number of microseconds.
--
-- If no message is matched within the timeout period, 'Nothing' is returned,
-- otherwise @'Just' a@.
receiveTimeout :: (MonadProcess r m, MonadIO m) => Int -> [Match m a] -> m (Maybe a)
receiveTimeout !t = receiveWithOptions options
where
options =
ReceiveOptions
{ receiveOptionsMethod = if t == 0 then ReceiveNonBlocking else ReceiveTimeout t
}
{-# INLINE receiveTimeout #-}
{-# SPECIALIZE receiveTimeout :: Int -> [Match (Process r) a] -> Process r (Maybe a) #-}

-- | Utility to 'receive' a value of a specific type.
expect :: (MonadProcess r m, MonadIO m, Typeable a) => m a
expect = receive [match pure]
Expand All @@ -769,6 +741,43 @@ matchIf predicate handle = MatchMessage $ \dyn -> case fromDynamic dyn of
Just a -> if predicate a then Just (handle a) else Nothing
{-# INLINE matchIf #-}

-- | A 'Match' which doesn't receive any messages, but fires after a given
-- amount of time.
--
-- Instead of looking for a message in the process' mailbox, an 'after' clause
-- in a call to 'receive' will fire after a given number of microseconds,
-- yielding the provided monadic value. This can be used to implement receiving
-- messages with a timeout.
--
-- When the given timeout is @0@, the 'receive' call will be non-blocking.
-- Note, however, the order of matches is important, so
--
-- @
-- s <- self
-- send s ()
-- receive [after 0 (pure "timeout"), match (\() -> pure "message")]
-- @
--
-- will always return @"timeout"@, whilst
--
-- @
-- s <- self
-- send s ()
-- receive [match (\() -> pure "message"), after 0 (pure "timeout")]
-- @
--
-- will always return @"message"@.
--
-- In general, @'after'@ should be the last 'Match' passed to 'receive'.
after ::
-- | Timeout in microseconds. Use @0@ for a non-blocking 'receive'.
Int ->
-- | Action to call when the timeout expired.
m a ->
Match m a
after = MatchAfter
{-# INLINE after #-}

spawnImpl :: (MonadProcess r m, MonadIO m) => ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t
spawnImpl affinity cb process = do
currentEnv <- getProcessEnv
Expand Down
103 changes: 98 additions & 5 deletions troupe/test/Troupe/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import Control.Monad (forever)
import Control.Monad.IO.Class (liftIO)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import System.Clock (Clock (Monotonic), diffTimeSpec, getTime)
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.HUnit (Assertion, assertFailure, testCase, (@?=))
import Test.Tasty.HUnit (Assertion, assertBool, assertFailure, testCase, (@?=))
import Troupe
( DemonitorOption (..),
Down (..),
Expand All @@ -48,6 +49,7 @@ import Troupe
SpawnOptions (..),
ThreadAffinity (..),
WithMonitor (..),
after,
demonitor,
exit,
expect,
Expand All @@ -58,7 +60,6 @@ import Troupe
matchIf,
monitor,
receive,
receiveTimeout,
runNode,
self,
send,
Expand Down Expand Up @@ -183,7 +184,7 @@ tests =
Nothing -> assertFailure $ "Expected downReason to be a TestException: " <> show dr
Just TestException -> pure ()

receiveTimeout 0 [match pure] >>= \case
receive [match (fmap Just), after 0 (pure Nothing)] >>= \case
Nothing -> pure ()
Just Down {} -> liftIO $ assertFailure "unexpected Down message",
testGroup
Expand Down Expand Up @@ -263,7 +264,7 @@ tests =
demonitor [] ref
liftIO $ putMVar m ()
Exit _ _ _ Nothing <- expect
receiveTimeout 0 [matchMonitor ref] >>= \res -> liftIO $ do
receive [Just <$> matchMonitor ref, after 0 (pure Nothing)] >>= \res -> liftIO $ do
res @?= Nothing,
testCase "DemonitorFlush" $ troupeTest () $ do
m <- liftIO newEmptyMVar
Expand All @@ -278,7 +279,7 @@ tests =

receive [matchMonitor ref]
demonitor [DemonitorFlush] ref2
receiveTimeout 0 [matchMonitor ref2] >>= \res -> liftIO $ do
receive [Just <$> matchMonitor ref2, after 0 (pure Nothing)] >>= \res -> liftIO $ do
res @?= Nothing
],
testGroup
Expand Down Expand Up @@ -487,6 +488,98 @@ tests =

a' <- isProcessAlive pid
liftIO $ a' @?= False,
testGroup
"receive"
[ testCase "after 1000" $ troupeTest () $ do
pid <- spawnLink $ do
start <- liftIO $ getTime Monotonic
r <-
receive
[ match (\() -> pure "message"),
after 1000 (pure "after"),
after 1000000 (pure "long after")
]
end <- liftIO $ getTime Monotonic

liftIO $ do
r @?= "after"

let nsPerUs = 1000
minDelay = 1000 * nsPerUs
maxDelay = (1000000 `div` 2) * nsPerUs
diff = diffTimeSpec end start

assertBool
("Unexpectedly short delay: " <> show diff)
(diff >= minDelay)

assertBool
("Unexpectedly long delay: " <> show diff)
(diff < maxDelay)

ref <- monitor pid
awaitProcessExit ref,
testCase "after 1000, with message" $ troupeTest () $ do
pid <- spawnLink $ do
start <- liftIO $ getTime Monotonic
r <-
receive
[ match (\() -> pure "message"),
after 100000000 (pure "after")
]
end <- liftIO $ getTime Monotonic

liftIO $ do
r @?= "message"

let diff = diffTimeSpec end start
maxDelay = 100000000 * 1000
assertBool ("Unexpectedly long delay: " <> show diff) (diff < maxDelay)

ref <- monitor pid
send pid ()
awaitProcessExit ref,
testGroup
"after 0"
[ testCase "No message" $ troupeTest () $ do
pid <- spawnLink $ do
r <-
receive
[ match (\() -> pure "message"),
after 0 (pure "after")
]
liftIO $ r @?= "after"

ref <- monitor pid
awaitProcessExit ref,
testCase "Message" $ troupeTest () $ do
pid <- spawnLink $ do
s <- self
send s ()
r <-
receive
[ match (\() -> pure "message"),
after 0 (pure "after")
]
liftIO $ r @?= "message"

ref <- monitor pid
awaitProcessExit ref,
testCase "ordering retained: after before match" $ troupeTest () $ do
pid <- spawnLink $ do
s <- self
send s ()
r <-
receive
[ after 0 (pure "after"),
match (\() -> pure "message")
]
liftIO $ r @?= "after"

ref <- monitor pid
awaitProcessExit ref
]
],
testGroup
"Non-regression"
[ testCase "Deliver signals before/when receiving messages (#25)" $ troupeTest () $ do
Expand Down
1 change: 1 addition & 0 deletions troupe/troupe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ test-suite troupe-test
ghc-options: -rtsopts -threaded -with-rtsopts=-N2
build-depends:
, base ^>=4.17.0.0 || ^>=4.18.0.0
, clock ^>=0.8.3
, deepseq ^>=1.4.8.0
, safe-exceptions ^>=0.1.7.3
, tasty ^>=1.4.3
Expand Down

0 comments on commit 064f036

Please sign in to comment.