Skip to content

Commit

Permalink
Merge branch System.ThreadManager
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Dec 3, 2024
2 parents cc034b1 + c7a1713 commit aa16cfd
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 29 deletions.
6 changes: 6 additions & 0 deletions time-manager/ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# ChangeLog for time-manager

## 0.2.0

* Providing `System.ThreadManager`.
* `withHandle` catches `TimeoutThread` internally.
It returns `Nothing` on timeout.

## 0.1.3

* Providing `withHandle` and `withHandleKillThread`.
Expand Down
211 changes: 211 additions & 0 deletions time-manager/System/ThreadManager.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | A thread manager including a time manager.
-- The manager has responsibility to kill managed threads.
module System.ThreadManager (
ThreadManager,
newThreadManager,
stopAfter,

-- * Fork
forkManaged,
forkManagedFinally,
forkManagedUnmask,
forkManagedTimeout,
forkManagedTimeoutFinally,

-- * Synchronization
waitUntilAllGone,

-- * Re-exports
T.Manager,
withHandle,
T.Handle,
T.tickle,
T.pause,
T.resume,
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (Exception (..), SomeException (..))
import qualified Control.Exception as E
import Control.Monad (unless, void)
import Data.Foldable (forM_)
import Data.IORef
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Word (Word64)
import GHC.Conc.Sync (labelThread)
#if __GLASGOW_HASKELL__ >= 908
import GHC.Conc.Sync (fromThreadId)
#endif
import System.Mem.Weak (Weak, deRefWeak)
import qualified System.TimeManager as T

----------------------------------------------------------------

-- | Manager to manage the thread and the timer.
data ThreadManager = ThreadManager T.Manager (TVar ManagedThreads)

type Key = Word64
type ManagedThreads = Map Key ManagedThread

----------------------------------------------------------------

-- 'IORef' prevents race between WAI TimeManager (TimeoutThread)
-- and stopAfter (KilledByThreadManager).
-- It is initialized with 'False' and turned into 'True' when locked.
-- The winner can throw an asynchronous exception.
data ManagedThread = ManagedThread (Weak ThreadId) (IORef Bool)

----------------------------------------------------------------

-- | Starting a thread manager.
-- Its action is initially set to 'return ()' and should be set
-- by 'setAction'. This allows that the action can include
-- the manager itself.
newThreadManager :: T.Manager -> IO ThreadManager
newThreadManager timmgr = ThreadManager timmgr <$> newTVarIO Map.empty

----------------------------------------------------------------

-- | An exception used internally to kill a managed thread.
data KilledByThreadManager = KilledByThreadManager (Maybe SomeException)
deriving (Show)

instance Exception KilledByThreadManager where
toException = E.asyncExceptionToException
fromException = E.asyncExceptionFromException

-- | Stopping the manager.
--
-- The action is run in the scope of an exception handler that catches all
-- exceptions (including asynchronous ones); this allows the cleanup handler
-- to cleanup in all circumstances. If an exception is caught, it is rethrown
-- after the cleanup is complete.
stopAfter :: ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter (ThreadManager _timmgr var) action cleanup = do
E.mask $ \unmask -> do
ma <- E.try $ unmask action
m <- atomically $ do
m0 <- readTVar var
writeTVar var Map.empty
return m0
let ths = Map.elems m
er = either Just (const Nothing) ma
ex = KilledByThreadManager er
forM_ ths $ \(ManagedThread wtid ref) -> lockAndKill wtid ref ex
case ma of
Left err -> cleanup (Just err) >> E.throwIO err
Right a -> cleanup Nothing >> return a

----------------------------------------------------------------

-- | Fork a managed thread.
--
-- This guarantees that the thread ID is added to the manager's queue before
-- the thread starts, and is removed again when the thread terminates
-- (normally or abnormally).
forkManaged
:: ThreadManager
-> String
-- ^ Thread name
-> IO ()
-- ^ Action
-> IO ()
forkManaged mgr label io =
forkManagedUnmask mgr label $ \unmask -> unmask io

-- | Like 'forkManaged', but run action with exceptions masked
forkManagedUnmask
:: ThreadManager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask (ThreadManager _timmgr var) label io =
void $ E.mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do
labelMe label
E.bracket (setup var) (clear var) $ \_ -> io unmask

-- | Fork a managed thread with a handle created by a timeout manager.
forkManagedTimeout :: ThreadManager -> String -> (T.Handle -> IO ()) -> IO ()
forkManagedTimeout (ThreadManager timmgr var) label io =
void $ forkIO $ E.handle ignore $ do
labelMe label
E.bracket (setup var) (clear var) $ \(_n, wtid, ref) ->
-- 'TimeoutThread' is ignored by 'withHandle'.
void $ T.withHandle timmgr (lockAndKill wtid ref T.TimeoutThread) io

-- | Fork a managed thread with a cleanup function.
forkManagedFinally :: ThreadManager -> String -> IO () -> IO () -> IO ()
forkManagedFinally mgr label io final = E.mask $ \restore ->
forkManaged
mgr
label
(E.try (restore io) >>= \(_ :: Either E.SomeException ()) -> final)

-- | Fork a managed thread with a handle created by a timeout manager
-- and with a cleanup function.
forkManagedTimeoutFinally
:: ThreadManager -> String -> (T.Handle -> IO ()) -> IO () -> IO ()
forkManagedTimeoutFinally mgr label io final = E.mask $ \restore ->
forkManagedTimeout
mgr
label
(\th -> E.try (restore $ io th) >>= \(_ :: Either E.SomeException ()) -> final)

setup :: TVar (Map Key ManagedThread) -> IO (Key, Weak ThreadId, IORef Bool)
setup var = do
(wtid, n) <- myWeakThradId
ref <- newIORef False
let ent = ManagedThread wtid ref
-- asking to throw KilledByThreadManager to me
atomically $ modifyTVar' var $ Map.insert n ent
return (n, wtid, ref)

lockAndKill :: Exception e => Weak ThreadId -> IORef Bool -> e -> IO ()
lockAndKill wtid ref e = do
alreadyLocked <- atomicModifyIORef' ref (\b -> (True, b)) -- try to lock
unless alreadyLocked $ do
mtid <- deRefWeak wtid
case mtid of
Nothing -> return ()
Just tid -> E.throwTo tid e

clear
:: TVar (Map Key ManagedThread)
-> (Key, Weak ThreadId, IORef Bool)
-> IO ()
clear var (n, _, _) = atomically $ modifyTVar' var $ Map.delete n

ignore :: KilledByThreadManager -> IO ()
ignore (KilledByThreadManager _) = return ()

-- | Wait until all managed thread are finished.
waitUntilAllGone :: ThreadManager -> IO ()
waitUntilAllGone (ThreadManager _timmgr var) = atomically $ do
m <- readTVar var
check (Map.size m == 0)

----------------------------------------------------------------

myWeakThradId :: IO (Weak ThreadId, Key)
myWeakThradId = do
tid <- myThreadId
wtid <- mkWeakThreadId tid
let n = fromThreadId tid
return (wtid, n)

labelMe :: String -> IO ()
labelMe l = do
tid <- myThreadId
labelThread tid l

withHandle
:: ThreadManager -> T.TimeoutAction -> (T.Handle -> IO a) -> IO (Maybe a)
withHandle (ThreadManager timmgr _) = T.withHandle timmgr

#if __GLASGOW_HASKELL__ < 908
fromThreadId :: ThreadId -> Word64
fromThreadId tid = read (drop 9 $ show tid)
#endif
78 changes: 52 additions & 26 deletions time-manager/System/TimeManager.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE RecordWildCards #-}

module System.TimeManager (
-- ** Types
Expand All @@ -18,7 +19,7 @@ module System.TimeManager (
withHandle,
withHandleKillThread,

-- ** Control
-- ** Control timeout
tickle,
pause,
resume,
Expand All @@ -38,7 +39,7 @@ import Control.Reaper
import Data.IORef (IORef)
import qualified Data.IORef as I
import Data.Typeable (Typeable)
import GHC.Weak (deRefWeak)
import System.Mem.Weak (deRefWeak)

----------------------------------------------------------------

Expand All @@ -48,8 +49,12 @@ type Manager = Reaper [Handle] Handle
-- | An action to be performed on timeout.
type TimeoutAction = IO ()

-- | A handle used by 'Manager'
data Handle = Handle Manager !(IORef TimeoutAction) !(IORef State)
-- | A handle used by a timeout manager.
data Handle = Handle
{ handleManager :: Manager
, handleActionRef :: IORef TimeoutAction
, handleStateRef :: IORef State
}

data State
= Active -- Manager turns it to Inactive.
Expand All @@ -64,17 +69,19 @@ initialize :: Int -> IO Manager
initialize timeout =
mkReaper
defaultReaperSettings
{ reaperAction = mkListAction prune
{ -- Data.Set cannot be used since 'partition' cannot be used
-- with 'readIORef`. So, let's just use a list.
reaperAction = mkListAction prune
, reaperDelay = timeout
, reaperThreadName = "WAI timeout manager (Reaper)"
}
where
prune m@(Handle _ actionRef stateRef) = do
state <- I.atomicModifyIORef' stateRef (\x -> (inactivate x, x))
prune m@Handle{..} = do
state <- I.atomicModifyIORef' handleStateRef (\x -> (inactivate x, x))
case state of
Inactive -> do
onTimeout <- I.readIORef actionRef
onTimeout `E.catch` ignoreAll
onTimeout <- I.readIORef handleActionRef
onTimeout `E.catch` ignoreSync
return Nothing
_ -> return $ Just m

Expand All @@ -87,12 +94,9 @@ initialize timeout =
stopManager :: Manager -> IO ()
stopManager mgr = E.mask_ (reaperStop mgr >>= mapM_ fire)
where
fire (Handle _ actionRef _) = do
onTimeout <- I.readIORef actionRef
onTimeout `E.catch` ignoreAll

ignoreAll :: E.SomeException -> IO ()
ignoreAll _ = return ()
fire Handle{..} = do
onTimeout <- I.readIORef handleActionRef
onTimeout `E.catch` ignoreSync

-- | Killing timeout manager immediately without firing onTimeout.
killManager :: Manager -> IO ()
Expand All @@ -102,17 +106,21 @@ killManager = reaperKill

-- | Registering a timeout action and unregister its handle
-- when the body action is finished.
withHandle :: Manager -> TimeoutAction -> (Handle -> IO a) -> IO a
-- 'Nothing' is returned on timeout.
withHandle :: Manager -> TimeoutAction -> (Handle -> IO a) -> IO (Maybe a)
withHandle mgr onTimeout action =
E.bracket (register mgr onTimeout) cancel action
E.handle ignore $ E.bracket (register mgr onTimeout) cancel $ \th ->
Just <$> action th
where
ignore TimeoutThread = return Nothing

-- | Registering a timeout action of killing this thread and
-- unregister its handle when the body action is killed or finished.
withHandleKillThread :: Manager -> TimeoutAction -> (Handle -> IO ()) -> IO ()
withHandleKillThread mgr onTimeout action =
E.handle handler $ E.bracket (registerKillThread mgr onTimeout) cancel action
E.handle ignore $ E.bracket (registerKillThread mgr onTimeout) cancel action
where
handler TimeoutThread = return ()
ignore TimeoutThread = return ()

----------------------------------------------------------------

Expand All @@ -121,21 +129,26 @@ register :: Manager -> TimeoutAction -> IO Handle
register mgr !onTimeout = do
actionRef <- I.newIORef onTimeout
stateRef <- I.newIORef Active
let h = Handle mgr actionRef stateRef
let h =
Handle
{ handleManager = mgr
, handleActionRef = actionRef
, handleStateRef = stateRef
}
reaperAdd mgr h
return h

-- | Removing the 'Handle' from the 'Manager' immediately.
cancel :: Handle -> IO ()
cancel (Handle mgr _ stateRef) = do
_ <- reaperModify mgr filt
cancel Handle{..} = do
_ <- reaperModify handleManager filt
return ()
where
-- It's very important that this function forces the whole workload so we
-- don't retain old handles, otherwise disasterous leaks occur.
filt [] = []
filt (h@(Handle _ _ stateRef') : hs)
| stateRef == stateRef' = hs
filt (h@(Handle _ _ ref) : hs)
| handleStateRef == ref = hs
| otherwise =
let !hs' = filt hs
in h : hs'
Expand Down Expand Up @@ -174,12 +187,12 @@ registerKillThread m onTimeout = do
-- | Setting the state to active.
-- 'Manager' turns active to inactive repeatedly.
tickle :: Handle -> IO ()
tickle (Handle _ _ stateRef) = I.writeIORef stateRef Active
tickle Handle{..} = I.writeIORef handleStateRef Active

-- | Setting the state to paused.
-- 'Manager' does not change the value.
pause :: Handle -> IO ()
pause (Handle _ _ stateRef) = I.writeIORef stateRef Paused
pause Handle{..} = I.writeIORef handleStateRef Paused

-- | Setting the paused state to active.
-- This is an alias to 'tickle'.
Expand Down Expand Up @@ -213,3 +226,16 @@ withManager' timeout f =
(initialize timeout)
killManager
f

----------------------------------------------------------------

isAsyncException :: E.Exception e => e -> Bool
isAsyncException e =
case E.fromException (E.toException e) of
Just (E.SomeAsyncException _) -> True
Nothing -> False

ignoreSync :: E.SomeException -> IO ()
ignoreSync se
| isAsyncException se = E.throwIO se
| otherwise = return ()
Loading

0 comments on commit aa16cfd

Please sign in to comment.