Skip to content

Commit

Permalink
Full rework of the BlockFetch logic for bulk sync mode (#1179)
Browse files Browse the repository at this point in the history
Integrates a new implementation of the BulkSync mode, where blocks are
downloaded from alternative peers as soon as the node has no more blocks
to validate while there are longstanding requests in flight.

This PR depends on the new implementation of the BulkSync mode
(IntersectMBO/ouroboros-network#4919).
`cabal.project` is made to point to a back-port of the BulkSync
implementation on `ouroboros-network-0.16.1.1`.

### CSJ Changes

CSJ is involved because the new BulkSync mode requires to change the
dynamo if it is also serving blocks, and it is not sending them promptly
enough. The dynamo choice has an influence in the blocks that are chosen
to be downloaded by BlockFetch.

For this sake, b93c379 gives the ability to order the ChainSync clients,
so the dynamo role can be rotated among them whenever BlockFetch
requests it.

b1c0bf8 provides the implementation of the rotation operation.

### BlockFetch tests

c4bfa37 allows to specify in tests in which order to start the peers,
which has an effect on what peer is chosen as initial dynamo.

c594c09 in turn adds a new BlockFetch test to show that syncing isn't
slowed down by peers that don't send blocks.

### Integration of BlockFetch changes

The collection of ChainSync client handles now needs to be passed
between BlockFetch and ChainSync so dynamo rotations can be requested by
BlockFetch.

The parameter `bfcMaxConcurrencyBulkSync` has been removed since blocks
are not coordinated to be downloaded concurrently.

These changes are in 6926278.

### ChainSel changes

Now BlockFetch requires the ability to detect if ChainSel has run out of
blocks to validate. This motivates 73187ba, which implements a mechanism
to measure if ChainSel is waiting for more blocks (starves), and
determines for how long.

The above change is not sufficient to measure starvation. The queue to
send blocks for validation used to allow only for one block to sit in
the queue. This would interfere with the ability to measure starvation
since BlockFetch would block waiting for the queue to become empty, and
the queue would quickly become empty after taking just 1 block. For
download delays to be amortized, a larger queue capacity was needed.
This is the reason why a fix similar to
IntersectMBO/ouroboros-network#2721 is part of
0d3fc28.

### Miscellaneous fixes

#### CSJ jump size adjustment

When syncing from mainnet, we discovered that CSJ wouldn't sync the
blocks from the Byron era. This was because the jump size was set to the
length of the genesis window of the Shelley era, which is much larger
than Byron's. When the jump size is larger than the genesis window, the
dynamo will block on the forecast horizon before offering a jump that
allows the chain selection to advance. In this case, CSJ and chain
selection will deadlock.

For this reason we set the default jump size to the size of Byron's
genesis window in 028883a. This didn't show an impact on syncing time in
our measures. Future work (as part of deploying Genesis) might involve
allowing the jump size to vary between different eras.

#### GDD rate limit

GDD evaluation showed an overhead of 10% if run after every header
arrives via ChainSync. Therefore, in b7fa122 we limited how often it
could run, so multiple header arrivals could be handled by a single GDD
evaluation.

#### Candidate fragment comparison in the ChainSync client

We stumbled upon a test case where the candidate fragments of the dynamo
and an objector were no longer than the current selection (both peers
were adversarial). This was problematic because BlockFetch would refuse
to download blocks from these candidates, and ChainSync in turn would
wait for the selection to advance in order to download more headers.

The fix in e27a73c is to have the ChainSync client disconnect a peer
which is about to block on the forecast horizon if its candidate isn't
better than the selection.

#### Candidate fragment truncations

At the moment, it is possible for a candidate fragment to be truncated
by CSJ when a jumper jumps to a point that is not younger than the tip
of its current candidate fragment. We encountered tests where the jump
point could be so old that it would fall behind the immutable tip, and
GDD would ignore the peer when computing the Limit on Eagerness. This in
turn would cause the selection to advance into potentially adversarial
chains.

The fix in dc5f6f7 is to have GDD never drop candidates. When the
candidate does not intersect the current selection, the LoE is not
advanced. This is a situation guaranteed to be unblocked by the
ChainSync client since it will either disconnect the peer or bring the
candidate to intersect with the current selection.
  • Loading branch information
dnadales authored Jan 8, 2025
2 parents a020b7b + 6333df0 commit 911cfb0
Show file tree
Hide file tree
Showing 44 changed files with 1,413 additions and 517 deletions.
10 changes: 10 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,13 @@ if(os(windows))

-- https://github.com/ulidtko/cabal-doctest/issues/85
constraints: Cabal < 3.13

source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: bb0a7d0ff41e265a8ec47bc94377cb4d65e0b498
--sha256: sha256-P7m+nsjtogNQsdpXQnaH1kWxYibEWa0UC6iNGg0+bH4=
subdir:
ouroboros-network
ouroboros-network-api
ouroboros-network-protocols
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Breaking

- Adapted to Genesis-related changes in `ouroboros-consensus` ([#1179](https://github.com/IntersectMBO/ouroboros-consensus/pull/1179)).
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.Node.Genesis (
-- * 'GenesisConfig'
GenesisConfig (..)
, GenesisConfigFlags (..)
, LoEAndGDDConfig (..)
, defaultGenesisConfigFlags
, disableGenesisConfig
, enableGenesisConfigDefault
, mkGenesisConfig
-- * NodeKernel helpers
, GenesisNodeKernelArgs (..)
, LoEAndGDDNodeKernelArgs (..)
, mkGenesisNodeKernelArgs
, setGetLoEFragment
) where

import Control.Monad (join)
import Data.Maybe (fromMaybe)
import Data.Traversable (for)
import GHC.Generics (Generic)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(CSJConfig (..), CSJEnabledConfig (..),
Expand All @@ -34,57 +42,143 @@ import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch
(GenesisBlockFetchConfiguration (..))

-- | Whether to en-/disable the Limit on Eagerness and the Genesis Density
-- Disconnector.
data LoEAndGDDConfig a =
LoEAndGDDEnabled !a
| LoEAndGDDDisabled
deriving stock (Show, Functor, Foldable, Traversable)
deriving stock (Eq, Generic, Show, Functor, Foldable, Traversable)

-- | Aggregating the various configs for Genesis-related subcomponents.
data GenesisConfig = GenesisConfig {
gcChainSyncLoPBucketConfig :: !ChainSyncLoPBucketConfig
--
-- Usually, 'enableGenesisConfigDefault' or 'disableGenesisConfig' can be used.
-- See the haddocks of the types of the individual fields for details.
data GenesisConfig = GenesisConfig
{ gcBlockFetchConfig :: !GenesisBlockFetchConfiguration
, gcChainSyncLoPBucketConfig :: !ChainSyncLoPBucketConfig
, gcCSJConfig :: !CSJConfig
, gcLoEAndGDDConfig :: !(LoEAndGDDConfig ())
, gcLoEAndGDDConfig :: !(LoEAndGDDConfig LoEAndGDDParams)
, gcHistoricityCutoff :: !(Maybe HistoricityCutoff)
} deriving stock (Eq, Generic, Show)

-- | Genesis configuration flags and low-level args, as parsed from config file or CLI
data GenesisConfigFlags = GenesisConfigFlags
{ gcfEnableCSJ :: Bool
, gcfEnableLoEAndGDD :: Bool
, gcfEnableLoP :: Bool
, gcfBlockFetchGracePeriod :: Maybe Integer
, gcfBucketCapacity :: Maybe Integer
, gcfBucketRate :: Maybe Integer
, gcfCSJJumpSize :: Maybe Integer
, gcfGDDRateLimit :: Maybe DiffTime
} deriving stock (Eq, Generic, Show)

defaultGenesisConfigFlags :: GenesisConfigFlags
defaultGenesisConfigFlags = GenesisConfigFlags
{ gcfEnableCSJ = True
, gcfEnableLoEAndGDD = True
, gcfEnableLoP = True
, gcfBlockFetchGracePeriod = Nothing
, gcfBucketCapacity = Nothing
, gcfBucketRate = Nothing
, gcfCSJJumpSize = Nothing
, gcfGDDRateLimit = Nothing
}

-- TODO justification/derivation from other parameters
enableGenesisConfigDefault :: GenesisConfig
enableGenesisConfigDefault = GenesisConfig {
gcChainSyncLoPBucketConfig = ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig {
csbcCapacity = 100_000 -- number of tokens
, csbcRate = 500 -- tokens per second leaking, 1/2ms
}
, gcCSJConfig = CSJEnabled CSJEnabledConfig {
csjcJumpSize = 3 * 2160 * 20 -- mainnet forecast range
}
, gcLoEAndGDDConfig = LoEAndGDDEnabled ()
-- Duration in seconds of one Cardano mainnet Shelley stability window
-- (3k/f slots times one second per slot) plus one extra hour as a
-- safety margin.
, gcHistoricityCutoff = Just $ HistoricityCutoff $ 3 * 2160 * 20 + 3600
}
enableGenesisConfigDefault = mkGenesisConfig $ Just defaultGenesisConfigFlags

-- | Disable all Genesis components, yielding Praos behavior.
disableGenesisConfig :: GenesisConfig
disableGenesisConfig = GenesisConfig {
gcChainSyncLoPBucketConfig = ChainSyncLoPBucketDisabled
disableGenesisConfig = mkGenesisConfig Nothing

mkGenesisConfig :: Maybe GenesisConfigFlags -> GenesisConfig
mkGenesisConfig Nothing = -- disable Genesis
GenesisConfig
{ gcBlockFetchConfig = GenesisBlockFetchConfiguration
{ gbfcGracePeriod = 0 -- no grace period when Genesis is disabled
}
, gcChainSyncLoPBucketConfig = ChainSyncLoPBucketDisabled
, gcCSJConfig = CSJDisabled
, gcLoEAndGDDConfig = LoEAndGDDDisabled
, gcHistoricityCutoff = Nothing
}
mkGenesisConfig (Just GenesisConfigFlags{..}) =
GenesisConfig
{ gcBlockFetchConfig = GenesisBlockFetchConfiguration
{ gbfcGracePeriod
}
, gcChainSyncLoPBucketConfig = if gcfEnableLoP
then ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig
{ csbcCapacity
, csbcRate
}
else ChainSyncLoPBucketDisabled
, gcCSJConfig = if gcfEnableCSJ
then CSJEnabled CSJEnabledConfig
{ csjcJumpSize
}
else CSJDisabled
, gcLoEAndGDDConfig = if gcfEnableLoEAndGDD
then LoEAndGDDEnabled LoEAndGDDParams{lgpGDDRateLimit}
else LoEAndGDDDisabled
, -- Duration in seconds of one Cardano mainnet Shelley stability window
-- (3k/f slots times one second per slot) plus one extra hour as a
-- safety margin.
gcHistoricityCutoff = Just $ HistoricityCutoff $ 3 * 2160 * 20 + 3600
}
where
-- The minimum amount of time during which the Genesis BlockFetch logic will
-- download blocks from a specific peer (even if it is not performing well
-- during that period).
defaultBlockFetchGracePeriod = 10 -- seconds

-- LoP parameters. Empirically, it takes less than 1ms to validate a header,
-- so leaking one token per 2ms is conservative. The capacity of 100_000
-- tokens corresponds to 200s, which is definitely enough to handle long GC
-- pauses; we could even make this more conservative.
defaultCapacity = 100_000 -- number of tokens
defaultRate = 500 -- tokens per second leaking, 1/2ms

-- The larger Shelley forecast range (3 * 2160 * 20) works in more recent
-- ranges of slots, but causes syncing to block in Byron. A future
-- improvement would be to make this era-dynamic, such that we can use the
-- larger (and hence more efficient) larger CSJ jump size in Shelley-based
-- eras.
defaultCSJJumpSize = 2 * 2160 -- Byron forecast range

-- Limiting the performance impact of the GDD.
defaultGDDRateLimit = 1.0 -- seconds

gbfcGracePeriod = fromInteger $ fromMaybe defaultBlockFetchGracePeriod gcfBlockFetchGracePeriod
csbcCapacity = fromInteger $ fromMaybe defaultCapacity gcfBucketCapacity
csbcRate = fromInteger $ fromMaybe defaultRate gcfBucketRate
csjcJumpSize = fromInteger $ fromMaybe defaultCSJJumpSize gcfCSJJumpSize
lgpGDDRateLimit = fromMaybe defaultGDDRateLimit gcfGDDRateLimit

newtype LoEAndGDDParams = LoEAndGDDParams
{ -- | How often to evaluate GDD. 0 means as soon as possible.
-- Otherwise, no faster than once every T seconds, where T is the
-- value of the field.
lgpGDDRateLimit :: DiffTime
} deriving stock (Eq, Generic, Show)

-- | Genesis-related arguments needed by the NodeKernel initialization logic.
data GenesisNodeKernelArgs m blk = GenesisNodeKernelArgs {
gnkaLoEAndGDDArgs :: !(LoEAndGDDConfig (LoEAndGDDNodeKernelArgs m blk))
}

data LoEAndGDDNodeKernelArgs m blk = LoEAndGDDNodeKernelArgs {
-- | A TVar containing an action that returns the 'ChainDB.GetLoEFragment'
-- action. We use this extra indirection to update this action after we
-- opened the ChainDB (which happens before we initialize the NodeKernel).
-- After that, this TVar will not be modified again.
gnkaGetLoEFragment :: !(LoEAndGDDConfig (StrictTVar m (ChainDB.GetLoEFragment m blk)))
lgnkaLoEFragmentTVar :: !(StrictTVar m (ChainDB.GetLoEFragment m blk))
, lgnkaGDDRateLimit :: DiffTime
}

-- | Create the initial 'GenesisNodeKernelArgs" (with a temporary
-- 'ChainDB.GetLoEFragment' that will be replaced via 'setGetLoEFragment') and a
-- function to update the 'ChainDbArgs' accordingly.
Expand All @@ -95,20 +189,24 @@ mkGenesisNodeKernelArgs ::
, Complete ChainDbArgs m blk -> Complete ChainDbArgs m blk
)
mkGenesisNodeKernelArgs gcfg = do
gnkaGetLoEFragment <- for (gcLoEAndGDDConfig gcfg) $ \() ->
newTVarIO $ pure $
gnkaLoEAndGDDArgs <- for (gcLoEAndGDDConfig gcfg) $ \p -> do
loeFragmentTVar <- newTVarIO $ pure $
-- Use the most conservative LoE fragment until 'setGetLoEFragment'
-- is called.
ChainDB.LoEEnabled $ AF.Empty AF.AnchorGenesis
let updateChainDbArgs = case gnkaGetLoEFragment of
pure LoEAndGDDNodeKernelArgs
{ lgnkaLoEFragmentTVar = loeFragmentTVar
, lgnkaGDDRateLimit = lgpGDDRateLimit p
}
let updateChainDbArgs = case gnkaLoEAndGDDArgs of
LoEAndGDDDisabled -> id
LoEAndGDDEnabled varGetLoEFragment -> \cfg ->
LoEAndGDDEnabled lgnkArgs -> \cfg ->
cfg { ChainDB.cdbsArgs =
(ChainDB.cdbsArgs cfg) { ChainDB.cdbsLoE = getLoEFragment }
}
where
getLoEFragment = join $ readTVarIO varGetLoEFragment
pure (GenesisNodeKernelArgs {gnkaGetLoEFragment}, updateChainDbArgs)
getLoEFragment = join $ readTVarIO $ lgnkaLoEFragmentTVar lgnkArgs
pure (GenesisNodeKernelArgs{gnkaLoEAndGDDArgs}, updateChainDbArgs)

-- | Set 'gnkaGetLoEFragment' to the actual logic for determining the current
-- LoE fragment.
Expand All @@ -124,9 +222,10 @@ setGetLoEFragment readGsmState readLoEFragment varGetLoEFragment =
where
getLoEFragment :: ChainDB.GetLoEFragment m blk
getLoEFragment = atomically $ readGsmState >>= \case
-- When the Honest Availability Assumption cannot currently be guaranteed, we should not select
-- any blocks that would cause our immutable tip to advance, so we
-- return the most conservative LoE fragment.
-- When the Honest Availability Assumption cannot currently be
-- guaranteed, we should not select any blocks that would cause our
-- immutable tip to advance, so we return the most conservative LoE
-- fragment.
GSM.PreSyncing ->
pure $ ChainDB.LoEEnabled $ AF.Empty AF.AnchorGenesis
-- When we are syncing, return the current LoE fragment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ import Ouroboros.Consensus.MiniProtocol.BlockFetch.Server
(TraceBlockFetchServerEvent)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(TraceChainSyncClientEvent)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as CSJumping
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
(TraceChainSyncServerEvent)
import Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
(TraceLocalTxSubmissionServerEvent (..))
import Ouroboros.Consensus.Node.GSM (TraceGsmEvent)
import Ouroboros.Network.Block (Tip)
import Ouroboros.Network.BlockFetch (FetchDecision,
TraceFetchClientState, TraceLabelPeer)
import Ouroboros.Network.BlockFetch (TraceFetchClientState,
TraceLabelPeer)
import Ouroboros.Network.BlockFetch.Decision.Trace
(TraceDecisionEvent)
import Ouroboros.Network.KeepAlive (TraceKeepAliveClient)
import Ouroboros.Network.TxSubmission.Inbound
(TraceTxSubmissionInbound)
Expand All @@ -54,7 +57,7 @@ data Tracers' remotePeer localPeer blk f = Tracers
{ chainSyncClientTracer :: f (TraceLabelPeer remotePeer (TraceChainSyncClientEvent blk))
, chainSyncServerHeaderTracer :: f (TraceLabelPeer remotePeer (TraceChainSyncServerEvent blk))
, chainSyncServerBlockTracer :: f (TraceChainSyncServerEvent blk)
, blockFetchDecisionTracer :: f [TraceLabelPeer remotePeer (FetchDecision [Point (Header blk)])]
, blockFetchDecisionTracer :: f (TraceDecisionEvent remotePeer (Header blk))
, blockFetchClientTracer :: f (TraceLabelPeer remotePeer (TraceFetchClientState (Header blk)))
, blockFetchServerTracer :: f (TraceLabelPeer remotePeer (TraceBlockFetchServerEvent blk))
, txInboundTracer :: f (TraceLabelPeer remotePeer (TraceTxSubmissionInbound (GenTxId blk) (GenTx blk)))
Expand All @@ -69,6 +72,7 @@ data Tracers' remotePeer localPeer blk f = Tracers
, consensusErrorTracer :: f SomeException
, gsmTracer :: f (TraceGsmEvent (Tip blk))
, gddTracer :: f (TraceGDDEvent remotePeer blk)
, csjTracer :: f (CSJumping.TraceEvent remotePeer)
}

instance (forall a. Semigroup (f a))
Expand All @@ -92,6 +96,7 @@ instance (forall a. Semigroup (f a))
, consensusErrorTracer = f consensusErrorTracer
, gsmTracer = f gsmTracer
, gddTracer = f gddTracer
, csjTracer = f csjTracer
}
where
f :: forall a. Semigroup a
Expand Down Expand Up @@ -123,6 +128,7 @@ nullTracers = Tracers
, consensusErrorTracer = nullTracer
, gsmTracer = nullTracer
, gddTracer = nullTracer
, csjTracer = nullTracer
}

showTracers :: ( Show blk
Expand Down Expand Up @@ -157,6 +163,7 @@ showTracers tr = Tracers
, consensusErrorTracer = showTracing tr
, gsmTracer = showTracing tr
, gddTracer = showTracing tr
, csjTracer = showTracing tr
}

{-------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 911cfb0

Please sign in to comment.