Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an --until option to stop syncing at a certain chain point. #187

Merged
merged 6 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Kupo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import Kupo.App
, newFetchTipClient
, newProducer
, readOnlyConsumer
, rollForwardAll
, rollForwardUntil
, withFetchBlockClient
)
import Kupo.App.ChainSync
Expand Down Expand Up @@ -116,6 +118,7 @@ import Kupo.Data.Configuration
, DeferIndexesInstallation (..)
, NodeTipHasBeenReachedException (..)
, isReadOnlyReplica
, untilPredicate
)
import Kupo.Data.FetchBlock
( FetchBlockClient
Expand Down Expand Up @@ -197,6 +200,7 @@ kupoWith tr withProducer withFetchBlock =
, longestRollback
, deferIndexes
, chainProducer
, until
}
} <- ask

Expand Down Expand Up @@ -264,6 +268,7 @@ kupoWith tr withProducer withFetchBlock =
mailbox
patterns
db
(maybe rollForwardAll (rollForwardUntil . untilPredicate) until)
)

-- Database garbage-collector
Expand Down
109 changes: 72 additions & 37 deletions src/Kupo/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ module Kupo.App
, consumer
, readOnlyConsumer

-- * Rollforward variants
, rollForwardAll
, rollForwardUntil

-- * Gardener
, gardener

Expand Down Expand Up @@ -138,6 +142,7 @@ import Kupo.Data.Pattern
, matchBlock
)

import qualified Data.List.NonEmpty as NE
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Kupo.App.ChainSync.Hydra as Hydra
Expand Down Expand Up @@ -324,78 +329,93 @@ newFetchTipClient = \case
(Node.newFetchTipClient response)
atomically $ takeTMVar response

type RollForward m block =
Tracer IO TraceConsumer
-> InputManagement
-> (Tip -> Maybe Point -> DBTransaction m ())
-> Database m
-> Set Pattern
-> NonEmpty (Tip, block)
-> m ()

-- | Consumer process that is reading messages from the 'Mailbox'. Messages are
-- enqueued by another process (the producer).
consumer
:: forall m block.
( MonadSTM m
, MonadLog m
, Monad (DBTransaction m)
, IsBlock block
)
=> Tracer IO TraceConsumer
-> InputManagement
-> (Tip -> Maybe Point -> DBTransaction m ())
-> Mailbox m (Tip, block) (Tip, Point)
-> TVar m (Set Pattern)
-> Database m
-> RollForward m block
-> m Void
consumer tr inputManagement notifyTip mailbox patternsVar Database{..} =
consumer tr inputManagement notifyTip mailbox patternsVar database@Database{..} rollForward =
forever $ do
logWith tr ConsumerWaitingForNextBatch
atomically ((,) <$> flushMailbox mailbox <*> readTVar patternsVar) >>= \case
(Left blks, patterns) ->
rollForward blks patterns
rollForward tr inputManagement notifyTip database patterns blks
(Right pt, _) ->
rollBackward pt
where
rollForward :: (NonEmpty (Tip, block) -> Set Pattern -> m ())
rollForward blks patterns = do
let (lastKnownTip, lastKnownBlk) = last blks
let lastKnownPoint = getPoint lastKnownBlk
let lastKnownSlot = getPointSlotNo lastKnownPoint
let Match{consumed, produced, datums, scripts, policies} =
foldMap (matchBlock codecs patterns . snd) blks
isNonEmptyBlock <- runTransaction $ do
insertCheckpoints (foldr ((:) . getPoint . snd) [] blks)
insertInputs produced
insertPolicies policies
nSpentInputs <- onSpentInputs lastKnownTip lastKnownSlot consumed
-- NOTE: In case where the user has entered a relatively restrictive
-- pattern (e.g. one specific address), we do a best-effort at not
-- storing all the garbage of the world and only store scripts and
-- binary_data from the block if there's at least one transaction
-- that is relevant to that configuration.
-- Note that this isn't done from within 'matchBlock' because we
-- only know if we've spent inputs after running the above database
-- operation.
let isNonEmptyBlock = nSpentInputs > 0 || not (null produced)
when isNonEmptyBlock $ do
insertBinaryData datums
insertScripts scripts
notifyTip lastKnownTip (Just lastKnownPoint)
return isNonEmptyBlock
logWith tr $ ConsumerRollForward
{ slotNo = lastKnownSlot
, inputs = length produced
, binaryData = if isNonEmptyBlock then length datums else 0
, scripts = if isNonEmptyBlock then length scripts else 0
}

rollBackward :: (Tip, Point) -> m ()
rollBackward (tip, pt) = do
logWith tr (ConsumerRollBackward { point = getPointSlotNo pt })
runTransaction $ do
lastKnownSlot <- rollbackTo (getPointSlotNo pt)
notifyTip tip lastKnownSlot

rollForwardAll
:: forall m block.
( MonadSTM m
, MonadLog m
, Monad (DBTransaction m)
, IsBlock block
)
=> RollForward m block
rollForwardAll tr inputManagement notifyTip Database{..} patterns blks = do
let (lastKnownTip, lastKnownBlk) = last blks
let lastKnownPoint = getPoint lastKnownBlk
let lastKnownSlot = getPointSlotNo lastKnownPoint
let Match{consumed, produced, datums, scripts, policies} =
foldMap (matchBlock codecs patterns . snd) blks
isNonEmptyBlock <- runTransaction $ do
insertCheckpoints (foldr ((:) . getPoint . snd) [] blks)
insertInputs produced
insertPolicies policies
nSpentInputs <- onSpentInputs lastKnownTip lastKnownSlot consumed
-- NOTE: In case where the user has entered a relatively restrictive
-- pattern (e.g. one specific address), we do a best-effort at not
-- storing all the garbage of the world and only store scripts and
-- binary_data from the block if there's at least one transaction
-- that is relevant to that configuration.
-- Note that this isn't done from within 'matchBlock' because we
-- only know if we've spent inputs after running the above database
-- operation.
let isNonEmptyBlock = nSpentInputs > 0 || not (null produced)
when isNonEmptyBlock $ do
insertBinaryData datums
insertScripts scripts
notifyTip lastKnownTip (Just lastKnownPoint)
return isNonEmptyBlock
logWith tr $ ConsumerRollForward
{ slotNo = lastKnownSlot
, inputs = length produced
, binaryData = if isNonEmptyBlock then length datums else 0
, scripts = if isNonEmptyBlock then length scripts else 0
}
where
codecs = Codecs
{ toResult = resultToRow
, toBinaryData = binaryDataToRow
, toScript = scriptToRow
, toPolicy = policyToRow
}

onSpentInputs
:: Tip
-> SlotNo
Expand All @@ -417,6 +437,21 @@ consumer tr inputManagement notifyTip mailbox patternsVar Database{..} =
unstableWindow =
getLongestRollback longestRollback


rollForwardUntil
:: forall m block.
( MonadSTM m
, MonadLog m
, Monad (DBTransaction m)
, IsBlock block
)
=> (Point -> Bool)
-> RollForward m block
rollForwardUntil until tr inputManagement notifyTip database patterns blks = do
let blksBefore = NE.takeWhile (until . getPoint . snd) blks
whenJust (nonEmpty blksBefore) $
rollForwardAll tr inputManagement notifyTip database patterns

readOnlyConsumer
:: forall m.
( MonadSTM m
Expand Down
15 changes: 15 additions & 0 deletions src/Kupo/Data/Configuration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module Kupo.Data.Configuration
Configuration (..)
, DatabaseLocation (..)
, Since (..)
, Until (..)
, untilPredicate
, InputManagement (..)
, ChainProducer (..)
, LongestRollback (..)
Expand Down Expand Up @@ -57,6 +59,8 @@ import Kupo.Control.MonadTime
)
import Kupo.Data.Cardano
( Point
, SlotNo
, getPointSlotNo
)
import Kupo.Data.Pattern
( Pattern (..)
Expand Down Expand Up @@ -92,6 +96,8 @@ data Configuration = Configuration
-- ^ Port for the API HTTP Server
, since :: !(Maybe Since)
-- ^ Point from when a *new* synchronization should start
, until :: !(Maybe Until)
-- ^ Slot at which to stop indexing and just serve queries
, patterns :: !(Set Pattern)
-- ^ List of address patterns to look for when synchronizing
, inputManagement :: !InputManagement
Expand Down Expand Up @@ -146,6 +152,15 @@ data ChainProducer
data Since = SinceTip | SincePoint Point
deriving (Generic, Eq, Show)

-- | Captures the point up-to which synchronize.
data Until = UntilPoint Point | UntilSlot SlotNo
deriving (Generic, Eq, Show)

untilPredicate :: Until -> Point -> Bool
untilPredicate = \case
UntilPoint until -> (<= until)
UntilSlot until -> (<= until) . getPointSlotNo

-- | Database working directory. 'in-memory' runs the database in hot memory,
-- only suitable for non-permissive patterns or testing.
data DatabaseLocation
Expand Down
22 changes: 19 additions & 3 deletions src/Kupo/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import Control.Monad.Trans.Except
import Data.Char
( toUpper
)
import qualified Data.Text as T
import qualified Data.Text.Read as T
import Kupo.App
( TraceConsumer
, TraceGardener
Expand Down Expand Up @@ -67,6 +65,7 @@ import Kupo.Control.MonadTime
)
import Kupo.Data.Cardano
( pointFromText
, slotNoFromText
)
import Kupo.Data.Configuration
( ChainProducer (..)
Expand All @@ -75,6 +74,7 @@ import Kupo.Data.Configuration
, DeferIndexesInstallation (..)
, InputManagement (..)
, Since (..)
, Until (..)
)
import Kupo.Data.Pattern
( Pattern
Expand All @@ -98,11 +98,14 @@ import Options.Applicative.Types
import Safe
( readMay
)
import qualified Text.URI as URI
import Text.URI
( URI
)

import qualified Data.Text as T
import qualified Data.Text.Read as T
import qualified Text.URI as URI

data Command
= Run !Configuration !(Tracers IO MinSeverities)
| Copy !FilePath !FilePath !(Set Pattern)
Expand Down Expand Up @@ -140,6 +143,7 @@ parserInfo = info (helper <*> parser) $ mempty
<*> serverHostOption
<*> serverPortOption
<*> optional sinceOption
<*> optional untilOption
<*> fmap fromList (many patternOption)
<*> inputManagementOption
<*> pure 129600 -- TODO: should be pulled from genesis parameters
Expand Down Expand Up @@ -307,6 +311,18 @@ sinceOption = option (maybeReader rdr) $ mempty
rdr "tip" = pure SinceTip
rdr s = fmap SincePoint (pointFromText $ toText s)

-- | [--until=SLOT]
untilOption :: Parser Until
untilOption = option (maybeReader asSlot <|> maybeReader asPoint) $ mempty
<> long "until"
<> metavar "POINT|SLOT"
<> helpDoc (Just $ mconcat
[ "A point or slot (inclusive) to sync up-to. Useful for getting point in time snapshots."
])
where
asSlot = fmap UntilSlot . slotNoFromText . toText
asPoint = fmap UntilPoint . pointFromText . toText

-- | [--match=PATTERN]
patternOption :: Parser Pattern
patternOption = option (maybeReader (patternFromText . toText)) $ mempty
Expand Down
1 change: 1 addition & 0 deletions test/Test/Kupo/AppSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ spec = do
, serverHost
, serverPort
, since = Just (SincePoint GenesisPoint)
, until = Nothing
, patterns = fromList [MatchAny IncludingBootstrap]
, inputManagement
, longestRollback
Expand Down
21 changes: 21 additions & 0 deletions test/Test/KupoSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ import Control.Monad.Class.MonadThrow
)
import Kupo.Data.Configuration
( Since (..)
, Until (..)
)
import Kupo.Data.Health
( ConnectionStatus (..)
Expand Down Expand Up @@ -586,6 +587,23 @@ spec = skippableContext "End-to-end" $ do
Health{configuration} <- getHealth
configuration `shouldBe` (Just InstallIndexesIfNotExist)

endToEnd "Does not synchronize beyond a given point when asked (--until)" $ \(configure, runSpec, HttpClient{..}) -> do
let maxSlot = 11037873 -- Somewhat after `somePoint`, but close enough. Note that this slot must still exist (i.e. be active)
-- if we don't want `waitSlot` down below to be waiting forever!
(_, env) <- configure $ \defaultCfg -> defaultCfg
{ since = Just (SincePoint somePoint)
, until = Just (UntilSlot maxSlot)
, patterns = fromList [MatchAny IncludingBootstrap]
, deferIndexes = SkipNonEssentialIndexes
}
runSpec env 30 $ do
waitSlot (>= maxSlot)
points <- listCheckpoints
forM_ points $ \point -> getPointSlotNo point `shouldSatisfy` (<= maxSlot)
-- Ensures that even if we let time pass, we're not synchronizing beyond --until
threadDelay 1
points' <- listCheckpoints
forM_ points' $ \point -> getPointSlotNo point `shouldSatisfy` (<= maxSlot)

-- | Create an 'EndToEndContext' around each child specification item within that 'Spec' tree. The
-- spec items are 'skippable' and only executed if the appropriate environment variables are present.
Expand Down Expand Up @@ -614,6 +632,7 @@ skippableContext prefix skippableSpec = do
, serverHost = "127.0.0.1"
, serverPort = 0
, since = Nothing
, until = Nothing
, patterns = fromList []
, inputManagement = MarkSpentInputs
, longestRollback = 43200
Expand All @@ -635,6 +654,7 @@ skippableContext prefix skippableSpec = do
, serverHost = "127.0.0.1"
, serverPort = 0
, since = Nothing
, until = Nothing
, patterns = fromList []
, inputManagement = MarkSpentInputs
, longestRollback = 43200
Expand All @@ -656,6 +676,7 @@ skippableContext prefix skippableSpec = do
, serverHost = "127.0.0.1"
, serverPort = 0
, since = Nothing
, until = Nothing
, patterns = fromList []
, inputManagement = MarkSpentInputs
, longestRollback = 43200
Expand Down