diff --git a/network-mux/CHANGELOG.md b/network-mux/CHANGELOG.md index 35a164700a..0735238671 100644 --- a/network-mux/CHANGELOG.md +++ b/network-mux/CHANGELOG.md @@ -4,6 +4,10 @@ ### Breaking changes +* Use the correct tracing message for `StartOnDemand`. +* Implement `StartOnDemandAny`, which will start the miniprotocol as soon as + any `StartOnDemand` protocol starts. + ### Non-breaking changes ## 0.6.0.0 -- 2025-01-02 diff --git a/network-mux/src/Network/Mux.hs b/network-mux/src/Network/Mux.hs index 34f66cb1ef..0aa0569af3 100644 --- a/network-mux/src/Network/Mux.hs +++ b/network-mux/src/Network/Mux.hs @@ -58,7 +58,7 @@ import Data.Monoid.Synchronisation (FirstToFinish (..)) import Control.Applicative import Control.Concurrent.Class.MonadSTM.Strict import Control.Concurrent.JobPool qualified as JobPool -import Control.Exception (SomeAsyncException (..)) +import Control.Exception (SomeAsyncException (..), assert) import Control.Monad import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadFork @@ -342,7 +342,10 @@ data StartOnDemandOrEagerly = -- mini-protocol. Must be used only when initial message is sent by the -- remote side. | StartOnDemand - deriving Eq + -- | Like `StartOnDemand`, but start a mini-protocol if data is received for + -- any mini-protocol set to `StartOnDemand`. + | StartOnDemandAny + deriving (Eq, Show) data MiniProtocolAction m where MiniProtocolAction :: forall m a. @@ -355,12 +358,16 @@ data MiniProtocolAction m where type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir) -newtype MonitorCtx m mode = MonitorCtx { +data MonitorCtx m mode = MonitorCtx { -- | Mini-Protocols started on demand and waiting to be scheduled. -- - mcOnDemandProtocols :: Map MiniProtocolKey - (MiniProtocolState mode m, MiniProtocolAction m) - + mcOnDemandProtocols :: !(Map MiniProtocolKey + (MiniProtocolState mode m, MiniProtocolAction m)) + -- | Mini-Protocols started on demand any and waiting to be scheduled. + -- Disjoint from `mcOnDemandProtocols`. + -- + , mcOnDemandAnyProtocols :: !(Map MiniProtocolKey + (MiniProtocolState mode m, MiniProtocolAction m)) } -- | The monitoring loop does two jobs: @@ -383,10 +390,11 @@ monitor :: forall mode m. -> StrictTVar m Status -> m () monitor tracer timeout jobpool egressQueue cmdQueue muxStatus = - go (MonitorCtx Map.empty) + go (MonitorCtx Map.empty Map.empty) where go :: MonitorCtx m mode -> m () - go !monitorCtx@MonitorCtx { mcOnDemandProtocols } = do + go monitorCtx@MonitorCtx { mcOnDemandProtocols + , mcOnDemandAnyProtocols } = do result <- atomically $ runFirstToFinish $ -- wait for a mini-protocol thread to terminate FirstToFinish (EventJobResult <$> JobPool.waitForJob jobpool) @@ -403,7 +411,13 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus = return (EventStartOnDemand ptclState ptclAction) ) mcOnDemandProtocols - + <> foldMap + (\(ptclState, _ptclAction) -> + FirstToFinish $ do + checkNonEmptyQueue (miniProtocolIngressQueue ptclState) + return EventStartOnDemandAny + ) + mcOnDemandAnyProtocols case result of -- Protocols that runs to completion are not automatically restarted. EventJobResult (MiniProtocolShutdown pnum pmode) -> do @@ -473,7 +487,25 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus = (ptclState, ptclAction) mcOnDemandProtocols } - traceWith tracer (TraceStartedOnDemand miniProtocolNum + traceWith tracer (TraceStartOnDemand miniProtocolNum + (protocolDirEnum miniProtocolDir)) + go monitorCtx' + + EventControlCmd (CmdStartProtocolThread + StartOnDemandAny + ptclState@MiniProtocolState { + miniProtocolInfo = MiniProtocolInfo { + miniProtocolNum, + miniProtocolDir + } + } + ptclAction) -> do + let monitorCtx' = monitorCtx { mcOnDemandAnyProtocols = + Map.insert (protocolKey ptclState) + (ptclState, ptclAction) + mcOnDemandAnyProtocols + } + traceWith tracer (TraceStartOnDemandAny miniProtocolNum (protocolDirEnum miniProtocolDir)) go monitorCtx' @@ -492,31 +524,51 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus = -- muxer threads -- Data has arrived on a channel for a mini-protocol for which we have - -- an on-demand-start protocol thread. So we start it now. - EventStartOnDemand ptclState@MiniProtocolState { - miniProtocolInfo = MiniProtocolInfo { - miniProtocolNum, - miniProtocolDir - }, - miniProtocolStatusVar - } - ptclAction -> do - traceWith tracer (TraceStartOnDemand miniProtocolNum - (protocolDirEnum miniProtocolDir)) - atomically $ writeTVar miniProtocolStatusVar StatusRunning - JobPool.forkJob jobpool $ - miniProtocolJob - tracer - egressQueue - ptclState - ptclAction - let ptclKey = protocolKey ptclState - monitorCtx' = monitorCtx { mcOnDemandProtocols = - Map.delete ptclKey - mcOnDemandProtocols + -- an on-demand-start protocol thread. So we start it now along with all + -- StartOnDemandAny protocols. + EventStartOnDemand ptclState ptclAction -> + let ptclKey = protocolKey ptclState in + assert (Map.null (mcOnDemandAnyProtocols `Map.intersection` mcOnDemandProtocols)) $ do + doStartOnDemand ptclState ptclAction + + -- Also start any StartOnDemandAny protocols + mapM_ (uncurry doStartOnDemand) mcOnDemandAnyProtocols + + let monitorCtx' = MonitorCtx { mcOnDemandProtocols = + Map.delete ptclKey mcOnDemandProtocols + , mcOnDemandAnyProtocols = Map.empty } + go monitorCtx' + -- Data has arrived on a channel for a mini-protocol for which we have + -- an on-demand-start-any protocol thread. So we start them all now. + EventStartOnDemandAny -> do + mapM_ (uncurry doStartOnDemand) mcOnDemandAnyProtocols + + go $ monitorCtx { mcOnDemandAnyProtocols = Map.empty } + + doStartOnDemand :: MiniProtocolState mode m + -> MiniProtocolAction m + -> m () + doStartOnDemand ptclState@MiniProtocolState { + miniProtocolInfo = MiniProtocolInfo { + miniProtocolNum, + miniProtocolDir + }, + miniProtocolStatusVar + } + ptclAction = do + traceWith tracer (TraceStartedOnDemand miniProtocolNum + (protocolDirEnum miniProtocolDir)) + atomically $ modifyTVar miniProtocolStatusVar (\a -> assert (a /= StatusRunning) StatusRunning) + JobPool.forkJob jobpool $ + miniProtocolJob + tracer + egressQueue + ptclState + ptclAction + checkNonEmptyQueue :: IngressQueue m -> STM m () checkNonEmptyQueue q = do buf <- readTVar q @@ -536,6 +588,7 @@ data MonitorEvent mode m = | EventControlCmd (ControlCmd mode m) | EventStartOnDemand (MiniProtocolState mode m) (MiniProtocolAction m) + | EventStartOnDemandAny -- | The mux forks off a number of threads and its main thread waits and -- monitors them all. This type covers the different thread and their possible @@ -680,8 +733,9 @@ runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus} unless (status == StatusIdle) $ throwSTM (ProtocolAlreadyRunning ptclNum ptclDir' status) let !status' = case startMode of - StartOnDemand -> StatusStartOnDemand - StartEagerly -> StatusRunning + StartOnDemand -> StatusStartOnDemand + StartOnDemandAny -> StatusStartOnDemandAny + StartEagerly -> StatusRunning writeTVar miniProtocolStatusVar status' -- Tell the mux control to start the thread diff --git a/network-mux/src/Network/Mux/Trace.hs b/network-mux/src/Network/Mux/Trace.hs index 39a554d94a..5ab351d07b 100644 --- a/network-mux/src/Network/Mux/Trace.hs +++ b/network-mux/src/Network/Mux/Trace.hs @@ -145,6 +145,7 @@ data Trace = | TraceSDUWriteTimeoutException | TraceStartEagerly MiniProtocolNum MiniProtocolDir | TraceStartOnDemand MiniProtocolNum MiniProtocolDir + | TraceStartOnDemandAny MiniProtocolNum MiniProtocolDir | TraceStartedOnDemand MiniProtocolNum MiniProtocolDir | TraceTerminating MiniProtocolNum MiniProtocolDir | TraceStopping @@ -184,6 +185,7 @@ instance Show Trace where show TraceSDUWriteTimeoutException = "Timed out writing SDU" show (TraceStartEagerly mid dir) = printf "Eagerly started (%s) in %s" (show mid) (show dir) show (TraceStartOnDemand mid dir) = printf "Preparing to start (%s) in %s" (show mid) (show dir) + show (TraceStartOnDemandAny mid dir) = printf "Preparing to start on any (%s) in %s" (show mid) (show dir) show (TraceStartedOnDemand mid dir) = printf "Started on demand (%s) in %s" (show mid) (show dir) show (TraceTerminating mid dir) = printf "Terminating (%s) in %s" (show mid) (show dir) show TraceStopping = "Mux stopping" diff --git a/network-mux/src/Network/Mux/Types.hs b/network-mux/src/Network/Mux/Types.hs index a39ec82497..b3db6c5922 100644 --- a/network-mux/src/Network/Mux/Types.hs +++ b/network-mux/src/Network/Mux/Types.hs @@ -189,7 +189,10 @@ data MiniProtocolState mode m = MiniProtocolState { miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus } -data MiniProtocolStatus = StatusIdle | StatusStartOnDemand | StatusRunning +data MiniProtocolStatus = StatusIdle + | StatusStartOnDemand + | StatusRunning + | StatusStartOnDemandAny deriving (Eq, Show) data SDUHeader = SDUHeader { diff --git a/network-mux/test/Test/Mux.hs b/network-mux/test/Test/Mux.hs index f8049b8cb2..14da04f5b1 100644 --- a/network-mux/test/Test/Mux.hs +++ b/network-mux/test/Test/Mux.hs @@ -1240,15 +1240,29 @@ instance Arbitrary DiffTime where . NonNegative . toRational +-- | An arbitrary instance for `StartOnDemand` & `StartOnDemandAny`. +-- +newtype DummyStart = DummyStart { + unDummyStart :: Mx.StartOnDemandOrEagerly + } deriving (Eq, Show) + +instance Arbitrary DummyStart where + -- Only used for responder side so we don't generate StartEagerly + arbitrary = fmap DummyStart (elements [Mx.StartOnDemand, Mx.StartOnDemandAny]) + + shrink (DummyStart Mx.StartOnDemandAny) = [DummyStart Mx.StartOnDemand] + shrink _ = [] + data DummyApp = DummyApp { daNum :: !Mx.MiniProtocolNum , daAction :: !DummyAppResult + , daStart :: !DummyStart , daRunTime :: !DiffTime , daStartAfter :: !DiffTime } deriving (Eq, Show) instance Arbitrary DummyApp where - arbitrary = DummyApp <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary + arbitrary = DummyApp <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary data DummyApps = DummyResponderApps [DummyApp] @@ -1263,14 +1277,15 @@ instance Arbitrary DummyApps where apps <- mapM genApp $ nub nums mode <- arbitrary case mode of - Mx.InitiatorMode -> return $ DummyInitiatorApps apps + Mx.InitiatorMode -> return $ DummyInitiatorApps $ + map (\a -> a { daStart = DummyStart Mx.StartEagerly }) apps Mx.ResponderMode -> frequency [ (3, return $ DummyResponderApps apps) , (1, return $ DummyResponderAppsKillMux apps) ] Mx.InitiatorResponderMode -> return $ DummyInitiatorResponderApps apps where - genApp num = DummyApp num <$> arbitrary <*> arbitrary <*> arbitrary + genApp num = DummyApp num <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary shrink (DummyResponderApps apps) = [ DummyResponderApps apps' | apps' <- filter (not . null) $ shrinkList (const []) apps @@ -1316,7 +1331,7 @@ instance Arbitrary DummyRestartingApps where Mx.InitiatorResponderMode -> return $ DummyRestartingInitiatorResponderApps apps where genApp num = do - app <- DummyApp num DummyAppSucceed <$> arbitrary <*> arbitrary + app <- DummyApp num DummyAppSucceed <$> arbitrary <*> arbitrary <*> arbitrary restarts <- choose (0, 5) return (app, restarts) @@ -1381,17 +1396,25 @@ prop_mux_start_mX apps runTime = do (-1) nullTracer QueueChannel { writeQueue = mux_r, readQueue = mux_w } - prop_mux_start_m bearer (triggerApp peerBearer) checkRes apps runTime + prop_mux_start_m bearer (triggerApp peerBearer) checkRes apps runTime anyStartAfter where - checkRes :: Mx.StartOnDemandOrEagerly - -> DiffTime + anyStartAfter :: DiffTime + anyStartAfter = + case apps of + DummyResponderApps as -> minimum (map daStartAfter as) + DummyResponderAppsKillMux as -> minimum (map daStartAfter as) + DummyInitiatorApps as -> minimum (map daStartAfter as) + DummyInitiatorResponderApps as -> minimum (map daStartAfter as) + + checkRes :: DiffTime -> ((STM m (Either SomeException ())), DummyApp) -> m (Property, Either SomeException ()) - checkRes startStrat minRunTime (get,da) = do - let totTime = case startStrat of - Mx.StartOnDemand -> daRunTime da + daStartAfter da - Mx.StartEagerly -> daRunTime da + checkRes minRunTime (get,da) = do + let totTime = case unDummyStart (daStart da) of + Mx.StartOnDemand -> daRunTime da + daStartAfter da + Mx.StartOnDemandAny -> daRunTime da + anyStartAfter + Mx.StartEagerly -> daRunTime da r <- atomically get case daAction da of DummyAppSucceed -> @@ -1506,7 +1529,8 @@ prop_mux_restart_m (DummyRestartingResponderApps rapps) = do Right (app, 0) -> do runRestartingApps mux $ M.delete (daNum app) ops Right (app, restarts) -> do - op <- Mx.runMiniProtocol mux (daNum app) Mx.ResponderDirectionOnly Mx.StartOnDemand + op <- Mx.runMiniProtocol mux (daNum app) Mx.ResponderDirectionOnly + (unDummyStart $ daStart app) (dummyRestartingAppToChannel (app, restarts - 1)) runRestartingApps mux $ M.insert (daNum app) op ops @@ -1541,7 +1565,7 @@ prop_mux_restart_m (DummyRestartingInitiatorResponderApps rapps) = do mux (daNum $ fst app) Mx.ResponderDirection - Mx.StartOnDemand + (unDummyStart $ daStart $ fst app) (dummyRestartingAppToChannel (fst app, (Mx.ResponderDirection, snd app))) | app <- rapps ] @@ -1571,12 +1595,13 @@ prop_mux_restart_m (DummyRestartingInitiatorResponderApps rapps) = do let opKey = (dir, daNum app) strat = case dir of Mx.InitiatorDirection -> Mx.StartEagerly - Mx.ResponderDirection -> Mx.StartOnDemand + Mx.ResponderDirection -> unDummyStart $ daStart app op <- Mx.runMiniProtocol mux (daNum app) dir strat (dummyRestartingAppToChannel (app, (dir, restarts - 1))) runRestartingApps mux $ M.insert opKey op ops +-- | Verifying starting and stopping of miniprotocols. Both normal exits and by exception. prop_mux_start_m :: forall m. ( Alternative (STM m) , MonadAsync m @@ -1589,16 +1614,25 @@ prop_mux_start_m :: forall m. , MonadTimer m ) => Mx.Bearer m + -- ^ Mux bearer -> (DummyApp -> m ()) - -> ( Mx.StartOnDemandOrEagerly - -> DiffTime + -- ^ trigger action that starts the app + -> ( DiffTime + -- ^ How long did the test run. -> ((STM m (Either SomeException ())), DummyApp) + -- ^ Result for running the app, along with the app -> m (Property, Either SomeException ()) ) + -- ^ Verify that the app succeded/failed as expected when + -- the test stopped -> DummyApps + -- ^ List of apps to test + -> DiffTime + -- ^ Maximum run time -> DiffTime + -- ^ Time at which StartOnDemandAny should run -> m Property -prop_mux_start_m bearer _ checkRes (DummyInitiatorApps apps) runTime = do +prop_mux_start_m bearer _ checkRes (DummyInitiatorApps apps) runTime _ = do let minis = map (appToInfo Mx.InitiatorDirectionOnly) apps minRunTime = minimum $ runTime : (map daRunTime $ filter (\app -> daAction app == DummyAppFail) apps) @@ -1613,15 +1647,18 @@ prop_mux_start_m bearer _ checkRes (DummyInitiatorApps apps) runTime = do (dummyAppToChannel app) | app <- apps ] - rc <- mapM (checkRes Mx.StartEagerly minRunTime) $ zip getRes apps + rc <- mapM (checkRes minRunTime) $ zip getRes apps wait killer void $ waitCatch mux_aid return (conjoin $ map fst rc) -prop_mux_start_m bearer trigger checkRes (DummyResponderApps apps) runTime = do +prop_mux_start_m bearer trigger checkRes (DummyResponderApps apps) runTime anyStartAfter = do let minis = map (appToInfo Mx.ResponderDirectionOnly) apps - minRunTime = minimum $ runTime : (map (\a -> daRunTime a + daStartAfter a) $ filter (\app -> daAction app == DummyAppFail) apps) + minRunTime = minimum $ runTime : (map (\a -> case unDummyStart (daStart a) of + Mx.StartOnDemandAny -> daRunTime a + anyStartAfter + _ -> daRunTime a + daStartAfter a + ) $ filter (\app -> daAction app == DummyAppFail) apps) mux <- Mx.new minis mux_aid <- async $ Mx.run verboseTracer mux bearer @@ -1629,21 +1666,25 @@ prop_mux_start_m bearer trigger checkRes (DummyResponderApps apps) runTime = do mux (daNum app) Mx.ResponderDirectionOnly - Mx.StartOnDemand + (unDummyStart $ daStart app) (dummyAppToChannel app) | app <- apps ] - triggers <- mapM (async . trigger) $ filter (\app -> daStartAfter app <= minRunTime) apps + triggers <- mapM (async . trigger) $ + filter (\app -> case unDummyStart (daStart app) of + Mx.StartOnDemandAny -> anyStartAfter <= minRunTime + _ -> daStartAfter app <= minRunTime + ) apps killer <- async $ (threadDelay runTime) >> Mx.stop mux - rc <- mapM (checkRes Mx.StartOnDemand minRunTime) $ zip getRes apps + rc <- mapM (checkRes minRunTime) $ zip getRes apps wait killer mapM_ cancel triggers void $ waitCatch mux_aid return (conjoin $ map fst rc) -prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runTime = do +prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runTime _ = do -- Start a mini-protocol on demand, but kill mux before the application is -- triggered. This test assures that mini-protocol completion action does -- not deadlocks. @@ -1655,7 +1696,7 @@ prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runT mux (daNum app) Mx.ResponderDirectionOnly - Mx.StartOnDemand + (unDummyStart $ daStart app) (dummyAppToChannel app) | app <- apps ] @@ -1667,7 +1708,7 @@ prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runT return (property True) -prop_mux_start_m bearer trigger checkRes (DummyInitiatorResponderApps apps) runTime = do +prop_mux_start_m bearer trigger checkRes (DummyInitiatorResponderApps apps) runTime anyStartAfter = do let initMinis = map (appToInfo Mx.InitiatorDirection) apps respMinis = map (appToInfo Mx.ResponderDirection) apps minRunTime = minimum $ runTime : (map (\a -> daRunTime a) $ filter (\app -> daAction app == DummyAppFail) apps) @@ -1686,15 +1727,21 @@ prop_mux_start_m bearer trigger checkRes (DummyInitiatorResponderApps apps) runT mux (daNum app) Mx.ResponderDirection - Mx.StartOnDemand + (unDummyStart $ daStart app) (dummyAppToChannel app) | app <- apps ] - triggers <- mapM (async . trigger) $ filter (\app -> daStartAfter app <= minRunTime) apps + triggers <- mapM (async . trigger) $ + filter (\app -> case unDummyStart (daStart app) of + Mx.StartOnDemandAny -> anyStartAfter <= minRunTime + _ -> daStartAfter app <= minRunTime + ) apps killer <- async $ (threadDelay runTime) >> Mx.stop mux - !rcInit <- mapM (checkRes Mx.StartEagerly minRunTime) $ zip getInitRes apps - !rcResp <- mapM (checkRes Mx.StartOnDemand minRunTime) $ zip getRespRes apps + !rcInit <- mapM (checkRes minRunTime) $ + zip getInitRes $ + map (\a -> a { daStart = DummyStart Mx.StartEagerly }) apps + !rcResp <- mapM (checkRes minRunTime) $ zip getRespRes apps wait killer mapM_ cancel triggers void $ waitCatch mux_aid diff --git a/ouroboros-network-framework/CHANGELOG.md b/ouroboros-network-framework/CHANGELOG.md index 085f55db2b..da228340d2 100644 --- a/ouroboros-network-framework/CHANGELOG.md +++ b/ouroboros-network-framework/CHANGELOG.md @@ -4,6 +4,8 @@ ### Breaking changes +* Add `miniProtocolStart` to `MiniProtocol` to control starting strategy. + ### Non-breaking changes ## 0.15.0.0 -- 2025-01-02 diff --git a/ouroboros-network-framework/demo/connection-manager.hs b/ouroboros-network-framework/demo/connection-manager.hs index af64c266c9..ead658a93f 100644 --- a/ouroboros-network-framework/demo/connection-manager.hs +++ b/ouroboros-network-framework/demo/connection-manager.hs @@ -302,6 +302,7 @@ withBidirectionalConnectionManager snocket makeBearer socket [ let miniProtocolNum = Mux.MiniProtocolNum 1 in MiniProtocol { miniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = Mux.MiniProtocolLimits maxBound, miniProtocolRun = reqRespInitiatorAndResponder @@ -313,6 +314,7 @@ withBidirectionalConnectionManager snocket makeBearer socket [ let miniProtocolNum = Mux.MiniProtocolNum 2 in MiniProtocol { miniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = Mux.MiniProtocolLimits maxBound, miniProtocolRun = reqRespInitiatorAndResponder @@ -324,6 +326,7 @@ withBidirectionalConnectionManager snocket makeBearer socket [ let miniProtocolNum = Mux.MiniProtocolNum 3 in MiniProtocol { miniProtocolNum, + miniProtocolStart = StartOnDemandAny, miniProtocolLimits = Mux.MiniProtocolLimits maxBound, miniProtocolRun = reqRespInitiatorAndResponder diff --git a/ouroboros-network-framework/demo/ping-pong.hs b/ouroboros-network-framework/demo/ping-pong.hs index e782bafde0..fbb6b2234f 100644 --- a/ouroboros-network-framework/demo/ping-pong.hs +++ b/ouroboros-network-framework/demo/ping-pong.hs @@ -107,6 +107,7 @@ demoProtocol0 pingPong = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = pingPong } @@ -198,11 +199,13 @@ demoProtocol1 pingPong pingPong' = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = pingPong } , MiniProtocol { miniProtocolNum = MiniProtocolNum 3, + miniProtocolStart = StartOnDemandAny, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = pingPong' } diff --git a/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs b/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs index 7b267b2520..12ce4120fc 100644 --- a/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs +++ b/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Socket.hs @@ -116,6 +116,7 @@ testProtocols2 reqResp = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 4, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, diff --git a/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Subscription.hs b/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Subscription.hs index 2411510da0..f67e7bd174 100644 --- a/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Subscription.hs +++ b/ouroboros-network-framework/io-tests/Test/Ouroboros/Network/Subscription.hs @@ -84,6 +84,7 @@ testProtocols1 chainSync = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, @@ -100,6 +101,7 @@ testProtocols2 reqResp = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 4, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, diff --git a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs index be1b414504..260eaddc8f 100644 --- a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs +++ b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Socket.hs @@ -118,6 +118,7 @@ testProtocols2 reqResp = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 4, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, diff --git a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Subscription.hs b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Subscription.hs index fbd0e71d56..2d097bc354 100644 --- a/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Subscription.hs +++ b/ouroboros-network-framework/sim-tests/Test/Ouroboros/Network/Subscription.hs @@ -84,6 +84,7 @@ testProtocols1 chainSync = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, @@ -100,6 +101,7 @@ testProtocols2 reqResp = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 4, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs index 93a95045cf..424aa05493 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor.hs @@ -277,7 +277,7 @@ with <- foldM (\acc mpd@MiniProtocolData { mpdMiniProtocol } -> - runResponder csMux mpd Mux.StartOnDemand >>= \case + runResponder csMux mpd >>= \case -- synchronous exceptions when starting -- a mini-protocol are non-recoverable; we -- close the connection and allow the server @@ -378,7 +378,7 @@ with return (Just tConnId, state') Right _ -> - runResponder tMux mpd Mux.StartOnDemand >>= \case + runResponder tMux mpd >>= \case Right completionAction -> do traceWith tracer (TrResponderRestarted tConnId num) let state' = updateMiniProtocol tConnId num completionAction state @@ -579,14 +579,12 @@ runResponder :: forall (mode :: Mux.Mode) initiatorCtx peerAddr m a b. ) => Mux.Mux mode m -> MiniProtocolData mode initiatorCtx peerAddr m a b - -> Mux.StartOnDemandOrEagerly -> m (Either SomeException (STM m (Either SomeException b))) runResponder mux MiniProtocolData { mpdMiniProtocol = miniProtocol, mpdResponderContext = responderContext - } - startStrategy = + } = -- do not catch asynchronous exceptions, which are non recoverable tryJust (\e -> case fromException e of Just (SomeAsyncException _) -> Nothing @@ -596,14 +594,14 @@ runResponder mux Mux.runMiniProtocol mux (miniProtocolNum miniProtocol) Mux.ResponderDirectionOnly - startStrategy + (miniProtocolStart miniProtocol) (runMiniProtocolCb responder responderContext) InitiatorAndResponderProtocol _ responder -> Mux.runMiniProtocol mux (miniProtocolNum miniProtocol) Mux.ResponderDirection - startStrategy + (miniProtocolStart miniProtocol) (runMiniProtocolCb responder responderContext) diff --git a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs index 10986f9dcf..271325ed15 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/InboundGovernor/Event.hs @@ -238,9 +238,10 @@ firstPeerPromotedToWarm ResponderDir -> FirstToFinish $ miniProtocolStatus >>= \case - StatusIdle -> retry - StatusStartOnDemand -> retry - StatusRunning -> return $ AwakeRemote connId + StatusIdle -> retry + StatusStartOnDemand -> retry + StatusStartOnDemandAny -> retry + StatusRunning -> return $ AwakeRemote connId -- | Detect when a first warm peer is promoted to hot (any hot mini-protocols @@ -288,9 +289,10 @@ firstPeerPromotedToHot fn miniProtocolStatus = FirstToFinish $ miniProtocolStatus >>= \case - StatusIdle -> retry - StatusStartOnDemand -> retry - StatusRunning -> return () + StatusIdle -> retry + StatusStartOnDemand -> retry + StatusStartOnDemandAny -> retry + StatusRunning -> return () -- | Detect when all hot mini-protocols terminates, which triggers the @@ -333,9 +335,10 @@ firstPeerDemotedToWarm fn miniProtocolStatus = LastToFinishM $ miniProtocolStatus >>= \case - StatusIdle -> return () - StatusStartOnDemand -> return () - StatusRunning -> retry + StatusIdle -> return () + StatusStartOnDemand -> return () + StatusStartOnDemandAny -> return () + StatusRunning -> retry -- | Await for first peer demoted to cold, i.e. detect the @@ -375,9 +378,10 @@ firstPeerDemotedToCold ResponderDir -> LastToFinishM $ do miniProtocolStatus >>= \case - StatusIdle -> return () - StatusStartOnDemand -> return () - StatusRunning -> retry + StatusIdle -> return () + StatusStartOnDemand -> return () + StatusStartOnDemandAny -> return () + StatusRunning -> retry ) (Mux.miniProtocolStateMap csMux) diff --git a/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs b/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs index 1cebc28e3e..17f11ed7cc 100644 --- a/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs +++ b/ouroboros-network-framework/src/Ouroboros/Network/Mux.hs @@ -52,6 +52,7 @@ module Ouroboros.Network.Mux -- | from "Network.Mux" , Mux.HasInitiator , Mux.HasResponder + , Mux.StartOnDemandOrEagerly (..) ) where import Control.Monad.Class.MonadAsync @@ -234,6 +235,7 @@ type OuroborosBundleWithMinimalCtx (mode :: Mux.Mode) peerAddr bytes m a b = data MiniProtocol (mode :: Mux.Mode) initiatorCtx responderCtx bytes m a b = MiniProtocol { miniProtocolNum :: !MiniProtocolNum, + miniProtocolStart :: !Mux.StartOnDemandOrEagerly, miniProtocolLimits :: !MiniProtocolLimits, miniProtocolRun :: !(RunMiniProtocol mode initiatorCtx responderCtx bytes m a b) } diff --git a/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs b/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs index a6a49ee14e..1353758ea3 100644 --- a/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs +++ b/ouroboros-network-framework/testlib/Test/Ouroboros/Network/ConnectionManager/Experiments.hs @@ -335,6 +335,7 @@ withInitiatorOnlyConnectionManager name timeouts trTracer tracer stdGen snocket mkProto miniProtocolNum nextRequest = [MiniProtocol { miniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = Mx.MiniProtocolLimits maxBound, miniProtocolRun = reqRespInitiator miniProtocolNum nextRequest @@ -550,6 +551,7 @@ withBidirectionalConnectionManager name timeouts mkProto miniProtocolNum nextRequest = [MiniProtocol { miniProtocolNum, + miniProtocolStart = Mx.StartOnDemand, miniProtocolLimits = Mx.MiniProtocolLimits maxBound, miniProtocolRun = reqRespInitiatorAndResponder miniProtocolNum diff --git a/ouroboros-network/CHANGELOG.md b/ouroboros-network/CHANGELOG.md index b91e3c2d2a..2b4aa9ed21 100644 --- a/ouroboros-network/CHANGELOG.md +++ b/ouroboros-network/CHANGELOG.md @@ -4,6 +4,10 @@ ### Breaking changes +* Use `miniProtocolStart` for setting start strategy. + KeepAlive is started with `StartOnDemandAny`, other miniprotocols are + started with `StartOnDemand`. + ### Non-breaking changes ## 0.19.0.0 -- 2025-01-02 diff --git a/ouroboros-network/demo/chain-sync.hs b/ouroboros-network/demo/chain-sync.hs index 042701b820..6494da1de7 100644 --- a/ouroboros-network/demo/chain-sync.hs +++ b/ouroboros-network/demo/chain-sync.hs @@ -221,6 +221,7 @@ demoProtocol2 chainSync = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = chainSync } @@ -329,11 +330,13 @@ demoProtocol3 chainSync blockFetch = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = chainSync } , MiniProtocol { miniProtocolNum = MiniProtocolNum 3, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = blockFetch } diff --git a/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs b/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs index 5866ab49e3..48c512a97a 100644 --- a/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs +++ b/ouroboros-network/io-tests/Test/Ouroboros/Network/Pipe.hs @@ -88,6 +88,7 @@ demoProtocols chainSync = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, diff --git a/ouroboros-network/io-tests/Test/Ouroboros/Network/Socket.hs b/ouroboros-network/io-tests/Test/Ouroboros/Network/Socket.hs index c48bd1a070..9b61003c55 100644 --- a/ouroboros-network/io-tests/Test/Ouroboros/Network/Socket.hs +++ b/ouroboros-network/io-tests/Test/Ouroboros/Network/Socket.hs @@ -81,6 +81,7 @@ testProtocols1 chainSync = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemandAny, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = defaultMiniProtocolLimit }, diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs index a16be96ed2..1e061558ba 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Mux.hs @@ -73,6 +73,7 @@ testProtocols chainSync = OuroborosApplication [ MiniProtocol { miniProtocolNum = MiniProtocolNum 2, + miniProtocolStart = StartOnDemand, miniProtocolLimits = MiniProtocolLimits { maximumIngressQueue = 0xffff }, diff --git a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet/Node/MiniProtocols.hs b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet/Node/MiniProtocols.hs index fb57179fda..9d81b75843 100644 --- a/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet/Node/MiniProtocols.hs +++ b/ouroboros-network/sim-tests-lib/Test/Ouroboros/Network/Testnet/Node/MiniProtocols.hs @@ -291,6 +291,7 @@ applications debugTracer nodeKernel , miniProtocolLimits , miniProtocolRun } = MiniProtocol { miniProtocolNum + , miniProtocolStart = StartEagerly , miniProtocolLimits , miniProtocolRun = case miniProtocolRun of @@ -304,6 +305,7 @@ applications debugTracer nodeKernel { withHot = WithHot [ MiniProtocol { miniProtocolNum = chainSyncMiniProtocolNum + , miniProtocolStart = StartOnDemand , miniProtocolLimits = chainSyncLimits limits , miniProtocolRun = InitiatorAndResponderProtocol @@ -312,6 +314,7 @@ applications debugTracer nodeKernel } , MiniProtocol { miniProtocolNum = blockFetchMiniProtocolNum + , miniProtocolStart = StartOnDemand , miniProtocolLimits = blockFetchLimits limits , miniProtocolRun = InitiatorAndResponderProtocol @@ -322,6 +325,7 @@ applications debugTracer nodeKernel , withWarm = WithWarm [ MiniProtocol { miniProtocolNum = MiniProtocolNum 9 + , miniProtocolStart = StartOnDemand , miniProtocolLimits = pingPongLimits limits , miniProtocolRun = InitiatorAndResponderProtocol @@ -332,6 +336,7 @@ applications debugTracer nodeKernel , withEstablished = WithEstablished $ [ MiniProtocol { miniProtocolNum = keepAliveMiniProtocolNum + , miniProtocolStart = StartOnDemandAny , miniProtocolLimits = keepAliveLimits limits , miniProtocolRun = InitiatorAndResponderProtocol @@ -341,6 +346,7 @@ applications debugTracer nodeKernel ] ++ if aaOwnPeerSharing /= PSTypes.PeerSharingDisabled then [ MiniProtocol { miniProtocolNum = peerSharingMiniProtocolNum + , miniProtocolStart = StartOnDemand , miniProtocolLimits = peerSharingLimits limits , miniProtocolRun = InitiatorAndResponderProtocol diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs index 2d9de752c1..c526fa7a21 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion/NonP2P.hs @@ -141,6 +141,7 @@ mkResponderApp bundle = } = MiniProtocol { miniProtocolNum , miniProtocolLimits + , miniProtocolStart = StartEagerly , miniProtocolRun = ResponderProtocolOnly responder } diff --git a/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs b/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs index 3e546d183f..8b59281817 100644 --- a/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs +++ b/ouroboros-network/src/Ouroboros/Network/NodeToClient.hs @@ -183,21 +183,25 @@ nodeToClientProtocols protocols _version = where localChainSyncMiniProtocol localChainSyncProtocol = MiniProtocol { miniProtocolNum = MiniProtocolNum 5, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = localChainSyncProtocol } localTxSubmissionMiniProtocol localTxSubmissionProtocol = MiniProtocol { miniProtocolNum = MiniProtocolNum 6, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = localTxSubmissionProtocol } localStateQueryMiniProtocol localStateQueryProtocol = MiniProtocol { miniProtocolNum = MiniProtocolNum 7, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = localStateQueryProtocol } localTxMonitorMiniProtocol localTxMonitorProtocol = MiniProtocol { miniProtocolNum = MiniProtocolNum 9, + miniProtocolStart = StartOnDemand, miniProtocolLimits = maximumMiniProtocolLimits, miniProtocolRun = localTxMonitorProtocol } diff --git a/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs b/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs index 69e0a7a34f..21afd728ee 100644 --- a/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs +++ b/ouroboros-network/src/Ouroboros/Network/NodeToNode.hs @@ -257,16 +257,19 @@ nodeToNodeProtocols miniProtocolParameters protocols _version ownPeerSharing = } -> [ MiniProtocol { miniProtocolNum = chainSyncMiniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = chainSyncProtocolLimits miniProtocolParameters, miniProtocolRun = chainSyncProtocol } , MiniProtocol { miniProtocolNum = blockFetchMiniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = blockFetchProtocolLimits miniProtocolParameters, miniProtocolRun = blockFetchProtocol } , MiniProtocol { miniProtocolNum = txSubmissionMiniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = txSubmissionProtocolLimits miniProtocolParameters, miniProtocolRun = txSubmissionProtocol } @@ -282,11 +285,13 @@ nodeToNodeProtocols miniProtocolParameters protocols _version ownPeerSharing = | ownPeerSharing /= PeerSharingDisabled -> [ MiniProtocol { miniProtocolNum = keepAliveMiniProtocolNum, + miniProtocolStart = StartOnDemandAny, miniProtocolLimits = keepAliveProtocolLimits miniProtocolParameters, miniProtocolRun = keepAliveProtocol } , MiniProtocol { miniProtocolNum = peerSharingMiniProtocolNum, + miniProtocolStart = StartOnDemand, miniProtocolLimits = peerSharingProtocolLimits miniProtocolParameters, miniProtocolRun = peerSharingProtocol } @@ -295,6 +300,7 @@ nodeToNodeProtocols miniProtocolParameters protocols _version ownPeerSharing = | otherwise -> [ MiniProtocol { miniProtocolNum = keepAliveMiniProtocolNum, + miniProtocolStart = StartOnDemandAny, miniProtocolLimits = keepAliveProtocolLimits miniProtocolParameters, miniProtocolRun = keepAliveProtocol }