Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New start strategy for miniprotocols #5048

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions network-mux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Breaking changes

* Use the correct tracing message for `StartOnDemand`.
* Implement `StartOnDemandAny`, which will start the miniprotocol as soon as
any `StartOnDemand` protocol starts.

### Non-breaking changes

## 0.6.0.0 -- 2025-01-02
Expand Down
122 changes: 88 additions & 34 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import Data.Monoid.Synchronisation (FirstToFinish (..))
import Control.Applicative
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Concurrent.JobPool qualified as JobPool
import Control.Exception (SomeAsyncException (..))
import Control.Exception (SomeAsyncException (..), assert)
import Control.Monad
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
Expand Down Expand Up @@ -342,7 +342,10 @@ data StartOnDemandOrEagerly =
-- mini-protocol. Must be used only when initial message is sent by the
-- remote side.
| StartOnDemand
deriving Eq
-- | Like `StartOnDemand`, but start a mini-protocol if data is received for
-- any mini-protocol set to `StartOnDemand`.
| StartOnDemandAny
karknu marked this conversation as resolved.
Show resolved Hide resolved
deriving (Eq, Show)

data MiniProtocolAction m where
MiniProtocolAction :: forall m a.
Expand All @@ -355,12 +358,16 @@ data MiniProtocolAction m where

type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir)

newtype MonitorCtx m mode = MonitorCtx {
data MonitorCtx m mode = MonitorCtx {
-- | Mini-Protocols started on demand and waiting to be scheduled.
--
mcOnDemandProtocols :: Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m)

mcOnDemandProtocols :: !(Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m))
-- | Mini-Protocols started on demand any and waiting to be scheduled.
-- Disjoint from `mcOnDemandProtocols`.
--
, mcOnDemandAnyProtocols :: !(Map MiniProtocolKey
(MiniProtocolState mode m, MiniProtocolAction m))
}

-- | The monitoring loop does two jobs:
Expand All @@ -383,10 +390,11 @@ monitor :: forall mode m.
-> StrictTVar m Status
-> m ()
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
go (MonitorCtx Map.empty)
go (MonitorCtx Map.empty Map.empty)
where
go :: MonitorCtx m mode -> m ()
go !monitorCtx@MonitorCtx { mcOnDemandProtocols } = do
go monitorCtx@MonitorCtx { mcOnDemandProtocols
, mcOnDemandAnyProtocols } = do
result <- atomically $ runFirstToFinish $
-- wait for a mini-protocol thread to terminate
FirstToFinish (EventJobResult <$> JobPool.waitForJob jobpool)
Expand All @@ -403,7 +411,13 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
return (EventStartOnDemand ptclState ptclAction)
)
mcOnDemandProtocols

<> foldMap
(\(ptclState, _ptclAction) ->
FirstToFinish $ do
checkNonEmptyQueue (miniProtocolIngressQueue ptclState)
return EventStartOnDemandAny
)
mcOnDemandAnyProtocols
case result of
-- Protocols that runs to completion are not automatically restarted.
EventJobResult (MiniProtocolShutdown pnum pmode) -> do
Expand Down Expand Up @@ -473,7 +487,25 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
(ptclState, ptclAction)
mcOnDemandProtocols
}
traceWith tracer (TraceStartedOnDemand miniProtocolNum
traceWith tracer (TraceStartOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
go monitorCtx'

EventControlCmd (CmdStartProtocolThread
StartOnDemandAny
ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
}
}
ptclAction) -> do
let monitorCtx' = monitorCtx { mcOnDemandAnyProtocols =
Map.insert (protocolKey ptclState)
(ptclState, ptclAction)
mcOnDemandAnyProtocols
}
traceWith tracer (TraceStartOnDemandAny miniProtocolNum
(protocolDirEnum miniProtocolDir))
go monitorCtx'

Expand All @@ -492,31 +524,51 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
-- muxer threads

-- Data has arrived on a channel for a mini-protocol for which we have
-- an on-demand-start protocol thread. So we start it now.
EventStartOnDemand ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
},
miniProtocolStatusVar
}
ptclAction -> do
traceWith tracer (TraceStartOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
atomically $ writeTVar miniProtocolStatusVar StatusRunning
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction
let ptclKey = protocolKey ptclState
monitorCtx' = monitorCtx { mcOnDemandProtocols =
Map.delete ptclKey
mcOnDemandProtocols
-- an on-demand-start protocol thread. So we start it now along with all
-- StartOnDemandAny protocols.
EventStartOnDemand ptclState ptclAction ->
let ptclKey = protocolKey ptclState in
assert (Map.null (mcOnDemandAnyProtocols `Map.intersection` mcOnDemandProtocols)) $ do
doStartOnDemand ptclState ptclAction

-- Also start any StartOnDemandAny protocols
mapM_ (uncurry doStartOnDemand) mcOnDemandAnyProtocols

let monitorCtx' = MonitorCtx { mcOnDemandProtocols =
Map.delete ptclKey mcOnDemandProtocols
, mcOnDemandAnyProtocols = Map.empty
}

go monitorCtx'

-- Data has arrived on a channel for a mini-protocol for which we have
-- an on-demand-start-any protocol thread. So we start them all now.
EventStartOnDemandAny -> do
mapM_ (uncurry doStartOnDemand) mcOnDemandAnyProtocols

go $ monitorCtx { mcOnDemandAnyProtocols = Map.empty }

doStartOnDemand :: MiniProtocolState mode m
-> MiniProtocolAction m
-> m ()
doStartOnDemand ptclState@MiniProtocolState {
miniProtocolInfo = MiniProtocolInfo {
miniProtocolNum,
miniProtocolDir
},
miniProtocolStatusVar
}
ptclAction = do
traceWith tracer (TraceStartedOnDemand miniProtocolNum
(protocolDirEnum miniProtocolDir))
atomically $ modifyTVar miniProtocolStatusVar (\a -> assert (a /= StatusRunning) StatusRunning)
JobPool.forkJob jobpool $
miniProtocolJob
tracer
egressQueue
ptclState
ptclAction

checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue q = do
buf <- readTVar q
Expand All @@ -536,6 +588,7 @@ data MonitorEvent mode m =
| EventControlCmd (ControlCmd mode m)
| EventStartOnDemand (MiniProtocolState mode m)
(MiniProtocolAction m)
| EventStartOnDemandAny

-- | The mux forks off a number of threads and its main thread waits and
-- monitors them all. This type covers the different thread and their possible
Expand Down Expand Up @@ -680,8 +733,9 @@ runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus}
unless (status == StatusIdle) $
throwSTM (ProtocolAlreadyRunning ptclNum ptclDir' status)
let !status' = case startMode of
StartOnDemand -> StatusStartOnDemand
StartEagerly -> StatusRunning
StartOnDemand -> StatusStartOnDemand
StartOnDemandAny -> StatusStartOnDemandAny
StartEagerly -> StatusRunning
writeTVar miniProtocolStatusVar status'

-- Tell the mux control to start the thread
Expand Down
2 changes: 2 additions & 0 deletions network-mux/src/Network/Mux/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ data Trace =
| TraceSDUWriteTimeoutException
| TraceStartEagerly MiniProtocolNum MiniProtocolDir
| TraceStartOnDemand MiniProtocolNum MiniProtocolDir
| TraceStartOnDemandAny MiniProtocolNum MiniProtocolDir
| TraceStartedOnDemand MiniProtocolNum MiniProtocolDir
| TraceTerminating MiniProtocolNum MiniProtocolDir
| TraceStopping
Expand Down Expand Up @@ -184,6 +185,7 @@ instance Show Trace where
show TraceSDUWriteTimeoutException = "Timed out writing SDU"
show (TraceStartEagerly mid dir) = printf "Eagerly started (%s) in %s" (show mid) (show dir)
show (TraceStartOnDemand mid dir) = printf "Preparing to start (%s) in %s" (show mid) (show dir)
show (TraceStartOnDemandAny mid dir) = printf "Preparing to start on any (%s) in %s" (show mid) (show dir)
show (TraceStartedOnDemand mid dir) = printf "Started on demand (%s) in %s" (show mid) (show dir)
show (TraceTerminating mid dir) = printf "Terminating (%s) in %s" (show mid) (show dir)
show TraceStopping = "Mux stopping"
Expand Down
5 changes: 4 additions & 1 deletion network-mux/src/Network/Mux/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ data MiniProtocolState mode m = MiniProtocolState {
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
}

data MiniProtocolStatus = StatusIdle | StatusStartOnDemand | StatusRunning
data MiniProtocolStatus = StatusIdle
| StatusStartOnDemand
| StatusRunning
| StatusStartOnDemandAny
deriving (Eq, Show)

data SDUHeader = SDUHeader {
Expand Down
Loading