Skip to content

Inbound governor performance improvement #5104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions docs/network-spec/connection-manager.tex
Original file line number Diff line number Diff line change
Expand Up @@ -2100,15 +2100,18 @@ \subsubsection{\RemoteIdle}
connection is used (\warm{} or \hot{}) or not (\cold{}) by the outbound side.

\subsubsection{\RemoteWarm}
A connection enters \RemoteWarm{} state once any of the mini-protocols starts
to operate. Once all hot mini-protocols start, the state will transition to
\RemoteHot{}. Note that this is slightly different than the notion of a \warm{}
peer, for which all \established{} and \warm{} mini-protocols are active, but
\hot{} ones are idle.
A connection dwells in \RemoteWarm{} if there are strictly only any warm or established
responder protocols running. Note also that an established protocol is one that may run
in both hot and warm states, but cannot be the only type running to maintain hot state
once all proper hot protocols have terminated. In other words, the connection must be
demoted in that case.

\subsubsection{\RemoteHot}
A connection enters \RemoteHot{} transition once all hot protocols started, if
any of them terminates the connection will be put in \RemoteWarm{}.
A connection enters \RemoteHot{} state once any hot responder protocol has started.
In particular, if a hot responder is the first to start, the state cycles through \RemoteWarm{}
first. Once all hot responders terminate, the connection will be put in \RemoteWarm{} regardless
of whether there are any warm or established responders left. In the latter case, if there aren't any
other protocols running, the connection will then follow up with further demotion to \RemoteIdle{}.

\subsection{Transitions}

Expand Down Expand Up @@ -2166,11 +2169,10 @@ \subsubsection{\MuxTerminated}
termination of the connection, as it can detect this by itself.

\subsubsection{\PromotedToHotRemote}
The inbound governor detects when all \hot{} mini-protocols started. In such
The inbound governor detects when any \hot{} mini-protocols have started. In such
case a \RemoteWarm{} connection is put in \RemoteHot{} state.

\subsubsection{\DemotedToWarmRemote}
Dually to \PromotedToHotRemote{} state transition, as soon as any of the \hot{}
mini-protocols terminates, the connection will transition to \RemoteWarm{}
Dually to \PromotedToHotRemote{} state transition, as soon as all of the \hot{}
mini-protocols terminate, the connection will transition to \RemoteWarm{}
state.

18 changes: 10 additions & 8 deletions network-mux/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@
## next release

### Breaking changes
* Bearer writeMany function for vector IO
* An optional read buffer for Bearer
* Polling of the egress queue

* run, miniProtocolJob, monitor now accept Tracers record
instead of `Tracer m Trace` type.

### Non-breaking changes
* Define msHeaderLength instead of using '8'
* Benchmark for Socket Bearer
* Use ByteString.Builder for the ingress queues
* Signal the kernal that we require at least the full SDU's worth of data

## 0.8.0.0 -- 205-05-13
## 0.8.0.0 -- 2025-05-13

### Breaking changes

* `MakeBearer` accepts optional `ReadBuffer`
* added fields `egressInterval`, `writeMany`, `batchSize` to `Bearer`
* writeMany provides vector IO, egressInterval supports polling of egress queue
for tuning latency vs. network efficiency
* `socketAsBearer` additionally takes `ReadBuffer`, egress
interval `DiffTime` for egress polling, and batchSize
* changed `IngressQueue` type synonym
Expand All @@ -27,6 +25,10 @@
### Non-breaking changes

* added `makeSocketBearer'`, `ReadBuffer`, `withReadBufferIO`
* Define msHeaderLength instead of using '8'
* Benchmark for Socket Bearer
* Use ByteString.Builder for the ingress queues
* Signal the kernal that we require at least the full SDU's worth of data

## 0.7.0.0 -- 2025-02-25

Expand Down
7 changes: 4 additions & 3 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ putStrLn_ = BSC.putStrLn . BSC.pack
debugTracer :: Show a => Tracer IO a
debugTracer = showTracing (Tracer putStrLn_)

nullTracers :: (Applicative m) => Tracers m
nullTracers = Tracers nullTracer nullTracer
--
-- Protocols
--
Expand Down Expand Up @@ -133,7 +135,7 @@ serverWorker bearer = do
putStrLn $ "Result: " ++ show result
Mx.stop mux

Mx.run nullTracer mux bearer
Mx.run nullTracers mux bearer
where
ptcls :: [MiniProtocolInfo ResponderMode]
ptcls = [ MiniProtocolInfo {
Expand Down Expand Up @@ -193,7 +195,7 @@ clientWorker bearer n msg = do
putStrLn $ "Result: " ++ show result
Mx.stop mux

Mx.run nullTracer mux bearer
Mx.run nullTracers mux bearer
where
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
ptcls = [ MiniProtocolInfo {
Expand All @@ -208,4 +210,3 @@ echoClient :: Int -> Int -> ByteString
-> ReqRespClient ByteString ByteString IO Int
echoClient !n 0 _ = SendMsgDone (pure n)
echoClient !n m rawmsg = SendMsgReq rawmsg (pure . echoClient (n+1) (m-1))

42 changes: 24 additions & 18 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module Network.Mux
, traceBearerState
, BearerState (..)
, Trace (..)
, Tracers (..)
, WithBearer (..)
) where

Expand Down Expand Up @@ -212,11 +213,11 @@ run :: forall m (mode :: Mode).
, MonadTimer m
, MonadMask m
)
=> Tracer m Trace
=> Tracers m
-> Mux mode m
-> Bearer m
-> m ()
run tracer
run tracers@Tracers { muxTracer = tracer }
Mux { muxMiniProtocols,
muxControlCmdQueue,
muxStatus
Expand All @@ -238,7 +239,7 @@ run tracer
-- Wait for someone to shut us down by calling muxStop or an error.
-- Outstanding jobs are shut down Upon completion of withJobPool.
withTimeoutSerial $ \timeout ->
monitor tracer
monitor tracers
timeout
jobpool
egressQueue
Expand All @@ -250,6 +251,7 @@ run tracer
-- deadlock of mini-protocol completion action.
`catch` \(SomeAsyncException e) -> do
atomically $ writeTVar muxStatus (Failed $ toException e)
traceWith tracer $ TraceState Dead
throwIO e
where
muxerJob egressQueue =
Expand All @@ -272,12 +274,15 @@ miniProtocolJob
, MonadThread m
, MonadThrow (STM m)
)
=> Tracer m Trace
=> Tracers m
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> JobPool.Job Group m JobResult
miniProtocolJob tracer egressQueue
miniProtocolJob Tracers {
muxTracer = tracer,
channelTracer }
egressQueue
MiniProtocolState {
miniProtocolInfo =
MiniProtocolInfo {
Expand All @@ -300,7 +305,7 @@ miniProtocolJob tracer egressQueue
labelThisThread (case miniProtocolNum of
MiniProtocolNum a -> "prtcl-" ++ show a)
w <- newTVarIO BL.empty
let chan = muxChannel tracer egressQueue (Wanton w)
let chan = muxChannel channelTracer egressQueue (Wanton w)
miniProtocolNum miniProtocolDirEnum
miniProtocolIngressQueue
(result, remainder) <- miniProtocolAction chan
Expand Down Expand Up @@ -390,14 +395,16 @@ monitor :: forall mode m.
, Alternative (STM m)
, MonadThrow (STM m)
)
=> Tracer m Trace
=> Tracers m
-> TimeoutFn m
-> JobPool.JobPool Group m JobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m Status
-> m ()
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
monitor tracers@Tracers {
muxTracer = tracer }
timeout jobpool egressQueue cmdQueue muxStatus =
go (MonitorCtx Map.empty Map.empty)
where
go :: MonitorCtx m mode -> m ()
Expand Down Expand Up @@ -433,9 +440,9 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
go monitorCtx

EventJobResult (MiniProtocolException pnum pmode e) -> do
traceWith tracer (TraceState Dead)
traceWith tracer (TraceExceptionExit pnum pmode e)
atomically $ writeTVar muxStatus $ Failed e
traceWith tracer (TraceExceptionExit pnum pmode e)
traceWith tracer (TraceState Dead)
throwIO e

-- These two cover internal and protocol errors. The muxer exception is
Expand All @@ -447,11 +454,10 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
-- the source of the failure, e.g. specific mini-protocol. If we're
-- propagating exceptions, we don't need to log them.
EventJobResult (MuxerException e) -> do
traceWith tracer (TraceState Dead)
atomically $ writeTVar muxStatus $ Failed e
traceWith tracer (TraceState Dead)
throwIO e
EventJobResult (DemuxerException e) -> do
traceWith tracer (TraceState Dead)
r <- atomically $ do
size <- JobPool.readGroupSize jobpool MiniProtocolJob
case size of
Expand All @@ -460,6 +466,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
>> return True
_ -> writeTVar muxStatus (Failed e)
>> return False
traceWith tracer (TraceState Dead)
unless r (throwIO e)

EventControlCmd (CmdStartProtocolThread
Expand All @@ -478,14 +485,14 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
Nothing ->
JobPool.forkJob jobpool $
miniProtocolJob
tracer
tracers
egressQueue
ptclState
ptclAction
Just cap ->
JobPool.forkJobOn cap jobpool $
miniProtocolJob
tracer
tracers
egressQueue
ptclState
ptclAction
Expand Down Expand Up @@ -585,14 +592,14 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
Nothing ->
JobPool.forkJob jobpool $
miniProtocolJob
tracer
tracers
egressQueue
ptclState
ptclAction
Just cap ->
JobPool.forkJobOn cap jobpool $
miniProtocolJob
tracer
tracers
egressQueue
ptclState
ptclAction
Expand Down Expand Up @@ -654,7 +661,7 @@ muxChannel
-> IngressQueue m
-> ByteChannel m
muxChannel tracer egressQueue want@(Wanton w) mc md q =
Channel { send, recv}
Channel { send, recv }
where
-- Limit for the message buffer between send and mux thread.
perMiniProtocolBufferSize :: Int64
Expand Down Expand Up @@ -797,4 +804,3 @@ runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus}
<|> return (Left $ toException (Shutdown Nothing st))
Failed e -> readTMVar completionVar
<|> return (Left $ toException (Shutdown (Just e) st))

55 changes: 46 additions & 9 deletions network-mux/src/Network/Mux/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Network.Mux.Trace
( Error (..)
, handleIOException
, Trace (..)
, Tracers (..)
, BearerState (..)
, WithBearer (..)
, TraceLabelPeer (..)
Expand All @@ -22,6 +23,7 @@ import Text.Printf
import Control.Exception hiding (throwIO)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer)
import Data.Bifunctor (Bifunctor (..))
import Data.Word
import GHC.Generics (Generic (..))
Expand Down Expand Up @@ -118,9 +120,19 @@ data BearerState = Mature
-- closed.
deriving (Eq, Show)

-- todo The Trace type mixes tags which are output by
-- separate components but share the type. It would make more sense
-- to break this up into separate types. Care must be
-- excercised to ensure that a particular tracer goes
-- into the component that outputs the desired tags. For instance,
-- the low level bearer tags are not output by the tracer which
-- is passed to Mux via 'Tracers'.

-- | Enumeration of Mux events that can be traced.
--
data Trace =
-- low level bearer trace tags (these are not traced by the tracer
-- which is passed to Mux)
TraceRecvHeaderStart
| TraceRecvHeaderEnd SDUHeader
| TraceRecvDeltaQObservation SDUHeader Time
Expand All @@ -131,27 +143,37 @@ data Trace =
| TraceSendStart SDUHeader
| TraceSendEnd
| TraceState BearerState
| TraceCleanExit MiniProtocolNum MiniProtocolDir
| TraceExceptionExit MiniProtocolNum MiniProtocolDir SomeException
| TraceChannelRecvStart MiniProtocolNum
| TraceChannelRecvEnd MiniProtocolNum Int
| TraceChannelSendStart MiniProtocolNum Int
| TraceChannelSendEnd MiniProtocolNum
| TraceSDUReadTimeoutException
| TraceSDUWriteTimeoutException
| TraceTCPInfo StructTCPInfo Word16
-- low level handshake bearer tags (not traced by tracer in Mux)
| TraceHandshakeStart
| TraceHandshakeClientEnd DiffTime
| TraceHandshakeServerEnd
| forall e. Exception e => TraceHandshakeClientError e DiffTime
| forall e. Exception e => TraceHandshakeServerError e
| TraceSDUReadTimeoutException
| TraceSDUWriteTimeoutException
-- mid level channel tags traced independently by each mini protocol
-- job in Mux, for each complete message, by the 'channelTracer'
-- within 'Tracers'
| TraceChannelRecvStart MiniProtocolNum
| TraceChannelRecvEnd MiniProtocolNum Int
| TraceChannelSendStart MiniProtocolNum Int
| TraceChannelSendEnd MiniProtocolNum
-- high level Mux tags traced by the main Mux/Connection handler
-- thread forked by CM. These may be monitored by the inbound
-- governor information channel tracer. These should be traced
-- by muxTracer of 'Tracers' and their ordering
-- is significant at call sites or bad things will happen.
-- You have been warned.
| TraceCleanExit MiniProtocolNum MiniProtocolDir
| TraceExceptionExit MiniProtocolNum MiniProtocolDir SomeException
| TraceStartEagerly MiniProtocolNum MiniProtocolDir
| TraceStartOnDemand MiniProtocolNum MiniProtocolDir
| TraceStartOnDemandAny MiniProtocolNum MiniProtocolDir
| TraceStartedOnDemand MiniProtocolNum MiniProtocolDir
| TraceTerminating MiniProtocolNum MiniProtocolDir
| TraceStopping
| TraceStopped
| TraceTCPInfo StructTCPInfo Word16

instance Show Trace where
show TraceRecvHeaderStart = printf "Bearer Receive Header Start"
Expand Down Expand Up @@ -208,3 +230,18 @@ instance Show Trace where
show (TraceTCPInfo _ len) = printf "TCPInfo len %d" len
#endif

-- | Bundle of tracers passed to mux
-- Consult the 'Trace' type to determine which
-- tags are required/expected to be served by these tracers.
-- In principle, the channelTracer can be == muxTracer
-- but performance likely degrades in typical conditions
-- unnecessarily.
--
data Tracers m = Tracers {
channelTracer :: Tracer m Trace,
-- ^ a low level tracer for events emitted by a bearer. It emits events as frequently
-- as receiving individual `SDU`s from the network.
muxTracer :: Tracer m Trace
-- ^ mux events which are emitted less frequently. It emits events which allow one
-- to observe the current state of a mini-protocol.
}
Loading