Skip to content

Commit

Permalink
Merge pull request #5048 from IntersectMBO/karknu/startondemandany
Browse files Browse the repository at this point in the history
New start strategy for miniprotocols
  • Loading branch information
karknu authored Jan 29, 2025
2 parents 21d3380 + 06beeb2 commit f4d05d1
Show file tree
Hide file tree
Showing 25 changed files with 241 additions and 84 deletions.
4 changes: 4 additions & 0 deletions network-mux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Breaking changes

* Use the correct tracing message for `StartOnDemand`.
* Implement `StartOnDemandAny`, which will start the miniprotocol as soon as
any `StartOnDemand` protocol starts.

### Non-breaking changes

## 0.6.0.0 -- 2025-01-02
Expand Down
122 changes: 88 additions & 34 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import Data.Monoid.Synchronisation (FirstToFinish (..))
import Control.Applicative
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.JobPool qualified as JobPool
import Control.Exception (SomeAsyncException (..))
import Control.Exception (SomeAsyncException (..), assert)
import Control.Monad
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
Expand Down Expand Up @@ -342,7 +342,10 @@ data StartOnDemandOrEagerly =
-- mini-protocol. Must be used only when initial message is sent by the
-- remote side.
| StartOnDemand
deriving Eq
-- | Like `StartOnDemand`, but start a mini-protocol if data is received for
-- any mini-protocol set to `StartOnDemand`.
| StartOnDemandAny
deriving (Eq, Show)

data MiniProtocolAction m where
MiniProtocolAction :: forall m a.
Expand All @@ -355,12 +358,16 @@ data MiniProtocolAction m where

type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir)

newtype MonitorCtx m mode = MonitorCtx {
data MonitorCtx m mode = MonitorCtx {
-- | Mini-Protocols started on demand and waiting to be scheduled.
--
mcOnDemandProtocols :: Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m)

mcOnDemandProtocols :: !(Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m))
-- | Mini-Protocols started on demand any and waiting to be scheduled.
-- Disjoint from `mcOnDemandProtocols`.
--
, mcOnDemandAnyProtocols :: !(Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m))
}

-- | The monitoring loop does two jobs:
Expand All @@ -383,10 +390,11 @@ monitor :: forall mode m.
-> StrictTVar m Status
-> m ()
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
go (MonitorCtx Map.empty)
go (MonitorCtx Map.empty Map.empty)
where
go :: MonitorCtx m mode -> m ()
go !monitorCtx@MonitorCtx { mcOnDemandProtocols } = do
go monitorCtx@MonitorCtx { mcOnDemandProtocols
, mcOnDemandAnyProtocols } = do
result <- atomically $ runFirstToFinish $
-- wait for a mini-protocol thread to terminate
FirstToFinish (EventJobResult <$> JobPool.waitForJob jobpool)
Expand All @@ -403,7 +411,13 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
return (EventStartOnDemand ptclState ptclAction)
)
mcOnDemandProtocols

<> foldMap
(\(ptclState, _ptclAction) ->
FirstToFinish $ do
checkNonEmptyQueue (miniProtocolIngressQueue ptclState)
return EventStartOnDemandAny
)
mcOnDemandAnyProtocols
case result of
-- Protocols that runs to completion are not automatically restarted.
EventJobResult (MiniProtocolShutdown pnum pmode) -> do
Expand Down Expand Up @@ -473,7 +487,25 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
(ptclState, ptclAction)
mcOnDemandProtocols
}
traceWith tracer (TraceStartedOnDemand miniProtocolNum
traceWith tracer (TraceStartOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
go monitorCtx'

EventControlCmd (CmdStartProtocolThread
StartOnDemandAny
ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
}
}
ptclAction) -> do
let monitorCtx' = monitorCtx { mcOnDemandAnyProtocols =
Map.insert (protocolKey ptclState)
(ptclState, ptclAction)
mcOnDemandAnyProtocols
}
traceWith tracer (TraceStartOnDemandAny miniProtocolNum
(protocolDirEnum miniProtocolDir))
go monitorCtx'

Expand All @@ -492,31 +524,51 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
-- muxer threads

-- Data has arrived on a channel for a mini-protocol for which we have
-- an on-demand-start protocol thread. So we start it now.
EventStartOnDemand ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
},
miniProtocolStatusVar
}
ptclAction -> do
traceWith tracer (TraceStartOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
atomically $ writeTVar miniProtocolStatusVar StatusRunning
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
let ptclKey = protocolKey ptclState
monitorCtx' = monitorCtx { mcOnDemandProtocols =
Map.delete ptclKey
mcOnDemandProtocols
-- an on-demand-start protocol thread. So we start it now along with all
-- StartOnDemandAny protocols.
EventStartOnDemand ptclState ptclAction ->
let ptclKey = protocolKey ptclState in
assert (Map.null (mcOnDemandAnyProtocols `Map.intersection` mcOnDemandProtocols)) $ do
doStartOnDemand ptclState ptclAction

-- Also start any StartOnDemandAny protocols
mapM_ (uncurry doStartOnDemand) mcOnDemandAnyProtocols

let monitorCtx' = MonitorCtx { mcOnDemandProtocols =
Map.delete ptclKey mcOnDemandProtocols
, mcOnDemandAnyProtocols = Map.empty
}

go monitorCtx'

-- Data has arrived on a channel for a mini-protocol for which we have
-- an on-demand-start-any protocol thread. So we start them all now.
EventStartOnDemandAny -> do
mapM_ (uncurry doStartOnDemand) mcOnDemandAnyProtocols

go $ monitorCtx { mcOnDemandAnyProtocols = Map.empty }

doStartOnDemand :: MiniProtocolState mode m
-> MiniProtocolAction m
-> m ()
doStartOnDemand ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
},
miniProtocolStatusVar
}
ptclAction = do
traceWith tracer (TraceStartedOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
atomically $ modifyTVar miniProtocolStatusVar (\a -> assert (a /= StatusRunning) StatusRunning)
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction

checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue q = do
buf <- readTVar q
Expand All @@ -536,6 +588,7 @@ data MonitorEvent mode m =
| EventControlCmd (ControlCmd mode m)
| EventStartOnDemand (MiniProtocolState mode m)
(MiniProtocolAction m)
| EventStartOnDemandAny

-- | The mux forks off a number of threads and its main thread waits and
-- monitors them all. This type covers the different thread and their possible
Expand Down Expand Up @@ -680,8 +733,9 @@ runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus}
unless (status == StatusIdle) $
throwSTM (ProtocolAlreadyRunning ptclNum ptclDir' status)
let !status' = case startMode of
StartOnDemand -> StatusStartOnDemand
StartEagerly -> StatusRunning
StartOnDemand -> StatusStartOnDemand
StartOnDemandAny -> StatusStartOnDemandAny
StartEagerly -> StatusRunning
writeTVar miniProtocolStatusVar status'

-- Tell the mux control to start the thread
Expand Down
2 changes: 2 additions & 0 deletions network-mux/src/Network/Mux/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ data Trace =
| TraceSDUWriteTimeoutException
| TraceStartEagerly MiniProtocolNum MiniProtocolDir
| TraceStartOnDemand MiniProtocolNum MiniProtocolDir
| TraceStartOnDemandAny MiniProtocolNum MiniProtocolDir
| TraceStartedOnDemand MiniProtocolNum MiniProtocolDir
| TraceTerminating MiniProtocolNum MiniProtocolDir
| TraceStopping
Expand Down Expand Up @@ -184,6 +185,7 @@ instance Show Trace where
show TraceSDUWriteTimeoutException = "Timed out writing SDU"
show (TraceStartEagerly mid dir) = printf "Eagerly started (%s) in %s" (show mid) (show dir)
show (TraceStartOnDemand mid dir) = printf "Preparing to start (%s) in %s" (show mid) (show dir)
show (TraceStartOnDemandAny mid dir) = printf "Preparing to start on any (%s) in %s" (show mid) (show dir)
show (TraceStartedOnDemand mid dir) = printf "Started on demand (%s) in %s" (show mid) (show dir)
show (TraceTerminating mid dir) = printf "Terminating (%s) in %s" (show mid) (show dir)
show TraceStopping = "Mux stopping"
Expand Down
5 changes: 4 additions & 1 deletion network-mux/src/Network/Mux/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ data MiniProtocolState mode m = MiniProtocolState {
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
}

data MiniProtocolStatus = StatusIdle | StatusStartOnDemand | StatusRunning
data MiniProtocolStatus = StatusIdle
| StatusStartOnDemand
| StatusRunning
| StatusStartOnDemandAny
deriving (Eq, Show)

data SDUHeader = SDUHeader {
Expand Down
Loading

0 comments on commit f4d05d1

Please sign in to comment.