diff --git a/troupe/src/Troupe.hs b/troupe/src/Troupe.hs index d5e573f..53af085 100644 --- a/troupe/src/Troupe.hs +++ b/troupe/src/Troupe.hs @@ -24,11 +24,11 @@ module Troupe send, sendLazy, receive, - receiveTimeout, expect, Match, match, matchIf, + after, -- ** Linking and monitoring processes link, @@ -99,6 +99,7 @@ import Troupe.Process SpawnOptions (..), ThreadAffinity (..), WithMonitor (..), + after, demonitor, exit, expect, @@ -111,7 +112,6 @@ import Troupe.Process newNodeContext, newProcessContext, receive, - receiveTimeout, runProcess, self, send, diff --git a/troupe/src/Troupe/Process.hs b/troupe/src/Troupe/Process.hs index 333ce91..9aaa8b8 100644 --- a/troupe/src/Troupe/Process.hs +++ b/troupe/src/Troupe/Process.hs @@ -42,11 +42,11 @@ module Troupe.Process send, sendLazy, receive, - receiveTimeout, expect, Match, match, matchIf, + after, ) where @@ -74,6 +74,7 @@ import Control.Concurrent.STM readTMVar, readTQueue, readTVar, + registerDelay, throwSTM, tryReadTMVar, writeTMVar, @@ -84,7 +85,7 @@ import Control.DeepSeq (NFData, deepseq, ($!!)) import Control.Distributed.Process.Internal.CQueue ( BlockSpec (..), CQueue, - MatchOn (MatchMsg), + MatchOn (..), dequeue, enqueueSTM, newCQueue, @@ -104,7 +105,7 @@ import Control.Exception.Safe uninterruptibleMask_, withException, ) -import Control.Monad (MonadPlus, unless, void, when) +import Control.Monad (MonadPlus, forM, unless, when) import Control.Monad.Error.Class (MonadError) import Control.Monad.Fix (MonadFix) import Control.Monad.IO.Class (MonadIO, liftIO) @@ -131,7 +132,6 @@ import qualified Control.Monad.Trans.Writer.CPS as CPS (WriterT) import qualified Control.Monad.Trans.Writer.Lazy as Lazy (WriterT) import qualified Control.Monad.Trans.Writer.Strict as Strict (WriterT) import Data.Dynamic (Dynamic, fromDynamic, toDyn) -import Data.Functor.Identity (Identity (..), runIdentity) import Data.Maybe (isJust) import Data.Typeable (Typeable) import DeferredFolds.UnfoldlM (forM_) @@ -493,11 +493,10 @@ demonitor :: (MonadProcess r m, MonadIO m) => [DemonitorOption] -> MonitorRef -> demonitor !options !ref = do liftMonadProcess id $ demonitorSTM ref when (DemonitorFlush `elem` options) $ do - void $ - receiveTimeout - 0 - [ matchIf (\d -> downMonitorRef d == ref) (\_ -> pure ()) - ] + receive + [ matchIf (\d -> downMonitorRef d == ref) (\_ -> pure ()), + after 0 (pure ()) + ] {-# SPECIALIZE demonitor :: [DemonitorOption] -> MonitorRef -> Process r () #-} exitSTM :: (Exception e) => ProcessId -> Maybe e -> ReaderT (ProcessEnv r) STM () @@ -679,78 +678,51 @@ sendLazy = sendWithOptions SendOptions {-# INLINE sendLazy #-} {-# SPECIALIZE sendLazy :: (Typeable a) => ProcessId -> a -> Process r () #-} -data ReceiveMethod f where - ReceiveBlocking :: ReceiveMethod Identity - ReceiveNonBlocking :: ReceiveMethod Maybe - ReceiveTimeout :: Int -> ReceiveMethod Maybe - -{- HLINT ignore ReceiveOptions "Use newtype instead of data" -} -data ReceiveOptions f = ReceiveOptions - { receiveOptionsMethod :: !(ReceiveMethod f) - } +data ReceiveOptions = ReceiveOptions -- | Matching clause for a value of type @a@ in monadic context @m@. -newtype Match m a +data Match m a = MatchMessage (Dynamic -> Maybe (m a)) + | MatchAfter Int (m a) deriving (Functor) -receiveWithOptions :: (MonadProcess r m, MonadIO m) => ReceiveOptions f -> [Match m a] -> m (f a) -receiveWithOptions !options !matches = do +receiveWithOptions :: (MonadProcess r m, MonadIO m) => ReceiveOptions -> [Match m a] -> m a +receiveWithOptions _options !matches = do queue <- processContextQueue . processEnvProcessContext <$> getProcessEnv - let bs = case receiveOptionsMethod options of - ReceiveBlocking -> Blocking - ReceiveNonBlocking -> NonBlocking - ReceiveTimeout t -> Timeout t - matches' = map (\(MatchMessage fn) -> MatchMsg fn) matches - p <- liftIO $ dequeue queue bs matches' + + p <- liftIO $ do + matches' <- forM matches $ \case + MatchMessage fn -> pure (MatchMsg fn) + MatchAfter t ma -> case t of + 0 -> pure $ MatchChan $ pure ma + t' -> do + tv <- registerDelay t' + pure $ MatchChan $ do + v <- readTVar tv + check v + pure ma + + dequeue queue Blocking matches' ensureSignalsDelivered case p of - Nothing -> case receiveOptionsMethod options of - ReceiveBlocking -> error "receiveWithOptions: dequeue returned Nothing in Blocking call" - ReceiveNonBlocking -> pure Nothing - ReceiveTimeout _ -> pure Nothing - Just a -> case receiveOptionsMethod options of - ReceiveBlocking -> Identity <$> a - ReceiveNonBlocking -> Just <$> a - ReceiveTimeout _ -> Just <$> a + Nothing -> error "receiveWithOptions: dequeue returned Nothing" + Just ma -> ma where ensureSignalsDelivered = do exceptions <- processContextExceptions . processEnvProcessContext <$> getProcessEnv liftIO $ atomically $ do e <- isEmptyTQueue exceptions check e -{-# SPECIALIZE receiveWithOptions :: ReceiveOptions f -> [Match (Process r) a] -> Process r (f a) #-} +{-# SPECIALIZE receiveWithOptions :: ReceiveOptions -> [Match (Process r) a] -> Process r a #-} -- | Receive some message from the process mailbox, blocking. receive :: (MonadProcess r m, MonadIO m) => [Match m a] -> m a -receive !matches = runIdentity <$> receiveWithOptions options matches - where - options = - ReceiveOptions - { receiveOptionsMethod = ReceiveBlocking - } +receive !matches = receiveWithOptions ReceiveOptions matches {-# INLINE receive #-} {-# SPECIALIZE receive :: [Match (Process r) a] -> Process r a #-} --- | Receive some message from the process mailbox. --- --- If the given timeout is @0@, this works in a non-blocking way. Otherwise, --- the call will time out after the given number of microseconds. --- --- If no message is matched within the timeout period, 'Nothing' is returned, --- otherwise @'Just' a@. -receiveTimeout :: (MonadProcess r m, MonadIO m) => Int -> [Match m a] -> m (Maybe a) -receiveTimeout !t = receiveWithOptions options - where - options = - ReceiveOptions - { receiveOptionsMethod = if t == 0 then ReceiveNonBlocking else ReceiveTimeout t - } -{-# INLINE receiveTimeout #-} -{-# SPECIALIZE receiveTimeout :: Int -> [Match (Process r) a] -> Process r (Maybe a) #-} - -- | Utility to 'receive' a value of a specific type. expect :: (MonadProcess r m, MonadIO m, Typeable a) => m a expect = receive [match pure] @@ -769,6 +741,43 @@ matchIf predicate handle = MatchMessage $ \dyn -> case fromDynamic dyn of Just a -> if predicate a then Just (handle a) else Nothing {-# INLINE matchIf #-} +-- | A 'Match' which doesn't receive any messages, but fires after a given +-- amount of time. +-- +-- Instead of looking for a message in the process' mailbox, an 'after' clause +-- in a call to 'receive' will fire after a given number of microseconds, +-- yielding the provided monadic value. This can be used to implement receiving +-- messages with a timeout. +-- +-- When the given timeout is @0@, the 'receive' call will be non-blocking. +-- Note, however, the order of matches is important, so +-- +-- @ +-- s <- self +-- send s () +-- receive [after 0 (pure "timeout"), match (\() -> pure "message")] +-- @ +-- +-- will always return @"timeout"@, whilst +-- +-- @ +-- s <- self +-- send s () +-- receive [match (\() -> pure "message"), after 0 (pure "timeout")] +-- @ +-- +-- will always return @"message"@. +-- +-- In general, @'after'@ should be the last 'Match' passed to 'receive'. +after :: + -- | Timeout in microseconds. Use @0@ for a non-blocking 'receive'. + Int -> + -- | Action to call when the timeout expired. + m a -> + Match m a +after = MatchAfter +{-# INLINE after #-} + spawnImpl :: (MonadProcess r m, MonadIO m) => ThreadAffinity -> (ProcessId -> ReaderT (ProcessEnv r) STM t) -> Process r a -> m t spawnImpl affinity cb process = do currentEnv <- getProcessEnv diff --git a/troupe/test/Troupe/Test.hs b/troupe/test/Troupe/Test.hs index 3bea0ee..d4436d0 100644 --- a/troupe/test/Troupe/Test.hs +++ b/troupe/test/Troupe/Test.hs @@ -34,8 +34,9 @@ import Control.Monad (forever) import Control.Monad.IO.Class (liftIO) import Data.Typeable (Typeable) import GHC.Generics (Generic) +import System.Clock (Clock (Monotonic), diffTimeSpec, getTime) import Test.Tasty (TestTree, testGroup) -import Test.Tasty.HUnit (Assertion, assertFailure, testCase, (@?=)) +import Test.Tasty.HUnit (Assertion, assertBool, assertFailure, testCase, (@?=)) import Troupe ( DemonitorOption (..), Down (..), @@ -48,6 +49,7 @@ import Troupe SpawnOptions (..), ThreadAffinity (..), WithMonitor (..), + after, demonitor, exit, expect, @@ -58,7 +60,6 @@ import Troupe matchIf, monitor, receive, - receiveTimeout, runNode, self, send, @@ -183,7 +184,7 @@ tests = Nothing -> assertFailure $ "Expected downReason to be a TestException: " <> show dr Just TestException -> pure () - receiveTimeout 0 [match pure] >>= \case + receive [match (fmap Just), after 0 (pure Nothing)] >>= \case Nothing -> pure () Just Down {} -> liftIO $ assertFailure "unexpected Down message", testGroup @@ -263,7 +264,7 @@ tests = demonitor [] ref liftIO $ putMVar m () Exit _ _ _ Nothing <- expect - receiveTimeout 0 [matchMonitor ref] >>= \res -> liftIO $ do + receive [Just <$> matchMonitor ref, after 0 (pure Nothing)] >>= \res -> liftIO $ do res @?= Nothing, testCase "DemonitorFlush" $ troupeTest () $ do m <- liftIO newEmptyMVar @@ -278,7 +279,7 @@ tests = receive [matchMonitor ref] demonitor [DemonitorFlush] ref2 - receiveTimeout 0 [matchMonitor ref2] >>= \res -> liftIO $ do + receive [Just <$> matchMonitor ref2, after 0 (pure Nothing)] >>= \res -> liftIO $ do res @?= Nothing ], testGroup @@ -487,6 +488,98 @@ tests = a' <- isProcessAlive pid liftIO $ a' @?= False, + testGroup + "receive" + [ testCase "after 1000" $ troupeTest () $ do + pid <- spawnLink $ do + start <- liftIO $ getTime Monotonic + r <- + receive + [ match (\() -> pure "message"), + after 1000 (pure "after"), + after 1000000 (pure "long after") + ] + end <- liftIO $ getTime Monotonic + + liftIO $ do + r @?= "after" + + let nsPerUs = 1000 + minDelay = 1000 * nsPerUs + maxDelay = (1000000 `div` 2) * nsPerUs + diff = diffTimeSpec end start + + assertBool + ("Unexpectedly short delay: " <> show diff) + (diff >= minDelay) + + assertBool + ("Unexpectedly long delay: " <> show diff) + (diff < maxDelay) + + ref <- monitor pid + awaitProcessExit ref, + testCase "after 1000, with message" $ troupeTest () $ do + pid <- spawnLink $ do + start <- liftIO $ getTime Monotonic + r <- + receive + [ match (\() -> pure "message"), + after 100000000 (pure "after") + ] + end <- liftIO $ getTime Monotonic + + liftIO $ do + r @?= "message" + + let diff = diffTimeSpec end start + maxDelay = 100000000 * 1000 + assertBool ("Unexpectedly long delay: " <> show diff) (diff < maxDelay) + + ref <- monitor pid + send pid () + awaitProcessExit ref, + testGroup + "after 0" + [ testCase "No message" $ troupeTest () $ do + pid <- spawnLink $ do + r <- + receive + [ match (\() -> pure "message"), + after 0 (pure "after") + ] + liftIO $ r @?= "after" + + ref <- monitor pid + awaitProcessExit ref, + testCase "Message" $ troupeTest () $ do + pid <- spawnLink $ do + s <- self + send s () + r <- + receive + [ match (\() -> pure "message"), + after 0 (pure "after") + ] + liftIO $ r @?= "message" + + ref <- monitor pid + awaitProcessExit ref, + testCase "ordering retained: after before match" $ troupeTest () $ do + pid <- spawnLink $ do + s <- self + send s () + r <- + receive + [ after 0 (pure "after"), + match (\() -> pure "message") + ] + liftIO $ r @?= "after" + + ref <- monitor pid + awaitProcessExit ref + ] + ], testGroup "Non-regression" [ testCase "Deliver signals before/when receiving messages (#25)" $ troupeTest () $ do diff --git a/troupe/troupe.cabal b/troupe/troupe.cabal index 1049d1e..26c8c9c 100644 --- a/troupe/troupe.cabal +++ b/troupe/troupe.cabal @@ -96,6 +96,7 @@ test-suite troupe-test ghc-options: -rtsopts -threaded -with-rtsopts=-N2 build-depends: , base ^>=4.17.0.0 || ^>=4.18.0.0 + , clock ^>=0.8.3 , deepseq ^>=1.4.8.0 , safe-exceptions ^>=0.1.7.3 , tasty ^>=1.4.3