diff --git a/cardano-db-sync/src/Cardano/DbSync/Fix/ConsumedBy.hs b/cardano-db-sync/src/Cardano/DbSync/Fix/ConsumedBy.hs index 5111108df..9a721fd3f 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Fix/ConsumedBy.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Fix/ConsumedBy.hs @@ -1,6 +1,6 @@ {-# LANGUAGE OverloadedStrings #-} -module Cardano.DbSync.Fix.ConsumedBy (fixConsumedBy) where +module Cardano.DbSync.Fix.ConsumedBy (FixEntry, fixConsumedBy, fixEntriesConsumed) where import Cardano.BM.Trace (Trace, logWarning) import qualified Cardano.Chain.Block as Byron hiding (blockHash) @@ -12,44 +12,45 @@ import Cardano.DbSync.Era.Byron.Util (blockPayload, unTxHash) import Cardano.DbSync.Era.Util import Cardano.DbSync.Error import Cardano.DbSync.Types -import Cardano.Prelude hiding (length) +import Cardano.Prelude hiding (length, (.)) import Database.Persist.SqlBackend.Internal import Ouroboros.Consensus.Byron.Ledger (ByronBlock (..)) import Ouroboros.Consensus.Cardano.Block (HardForkBlock (..)) -fixConsumedBy :: SqlBackend -> Trace IO Text -> Integer -> CardanoBlock -> IO (Integer, Bool) -fixConsumedBy backend tracer lastSize cblk = case cblk of - BlockByron blk -> (\(n, bl) -> (n + lastSize, bl)) <$> fixBlock backend tracer blk - _ -> pure (lastSize, True) +type FixEntry = (DB.TxOutId, DB.TxId) -fixBlock :: SqlBackend -> Trace IO Text -> ByronBlock -> IO (Integer, Bool) +-- | Nothing when the syncing must stop. +fixConsumedBy :: SqlBackend -> Trace IO Text -> CardanoBlock -> IO (Maybe [FixEntry]) +fixConsumedBy backend tracer cblk = case cblk of + BlockByron blk -> fixBlock backend tracer blk + _ -> pure Nothing + +fixBlock :: SqlBackend -> Trace IO Text -> ByronBlock -> IO (Maybe [FixEntry]) fixBlock backend tracer bblk = case byronBlockRaw bblk of - Byron.ABOBBoundary _ -> pure (0, False) + Byron.ABOBBoundary _ -> pure $ Just [] Byron.ABOBBlock blk -> do - runReaderT (fix 0 (blockPayload blk)) backend - where - fix totalSize [] = pure (totalSize, False) - fix totalSize (tx : txs) = do - mn <- runExceptT $ fixTx tx - case mn of - Right n -> fix (totalSize + n) txs - Left err -> do - liftIO $ - logWarning tracer $ - mconcat - [ "While fixing tx " - , textShow tx - , ", encountered error " - , textShow err - ] - pure (totalSize, True) + mEntries <- runReaderT (runExceptT $ mapM fixTx (blockPayload blk)) backend + case mEntries of + Right newEntries -> pure $ Just $ concat newEntries + Left err -> do + liftIO $ + logWarning tracer $ + mconcat + [ "While fixing block " + , textShow bblk + , ", encountered error " + , textShow err + ] + pure Nothing -fixTx :: MonadIO m => Byron.TxAux -> ExceptT SyncNodeError (ReaderT SqlBackend m) Integer +fixTx :: MonadIO m => Byron.TxAux -> ExceptT SyncNodeError (ReaderT SqlBackend m) [FixEntry] fixTx tx = do txId <- liftLookupFail "resolving tx" $ DB.queryTxId txHash resolvedInputs <- mapM resolveTxInputs (toList $ Byron.txInputs (Byron.taTx tx)) - lift $ DB.updateListTxOutConsumedByTxId (prepUpdate txId <$> resolvedInputs) - pure $ fromIntegral $ length resolvedInputs + pure (prepUpdate txId <$> resolvedInputs) where txHash = unTxHash $ Crypto.serializeCborHash (Byron.taTx tx) prepUpdate txId (_, _, txOutId, _) = (txOutId, txId) + +fixEntriesConsumed :: SqlBackend -> Trace IO Text -> [FixEntry] -> IO () +fixEntriesConsumed backend tracer = DB.runDbIohkLogging backend tracer . DB.updateListTxOutConsumedByTxId diff --git a/cardano-db-sync/src/Cardano/DbSync/Sync.hs b/cardano-db-sync/src/Cardano/DbSync/Sync.hs index e11a090a3..c5484c898 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Sync.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Sync.hs @@ -473,26 +473,40 @@ chainSyncClientFixConsumed backend tracer wrongTotalSize = Client.ChainSyncClien { Client.recvMsgIntersectFound = \_blk _tip -> Client.ChainSyncClient $ pure $ - Client.SendMsgRequestNext (pure ()) (clientStNext 0) + Client.SendMsgRequestNext (pure ()) (clientStNext (0, (0, []))) , Client.recvMsgIntersectNotFound = \_tip -> panic "Failed to find intersection with genesis." } - clientStNext :: Integer -> Client.ClientStNext CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO Integer - clientStNext lastSize = + clientStNext :: (Integer, (Integer, [[FixEntry]])) -> Client.ClientStNext CardanoBlock (Point CardanoBlock) (Tip CardanoBlock) IO Integer + clientStNext (sizeFixedTotal, (sizeFixEntries, fixEntries)) = Client.ClientStNext { Client.recvMsgRollForward = \blk _tip -> Client.ChainSyncClient $ do - (lastSize', ended) <- fixConsumedBy backend tracer lastSize blk - logSize lastSize lastSize' - if ended - then pure $ Client.SendMsgDone lastSize' - else pure $ Client.SendMsgRequestNext (pure ()) (clientStNext lastSize') + mNewEntries <- fixConsumedBy backend tracer blk + case mNewEntries of + Nothing -> do + fixAccumulatedEntries fixEntries + pure $ Client.SendMsgDone (sizeFixedTotal + sizeFixEntries) + Just newEntries -> do + let sizeNewEntries = fromIntegral (length newEntries) + (sizeNewFixed, sizeUnfixed, unfixedEntries) <- + fixAccumulatedEntriesMaybe (sizeFixEntries + sizeNewEntries, newEntries : fixEntries) + let sizeNewFixedTotal = sizeFixedTotal + sizeNewFixed + logSize sizeFixedTotal sizeNewFixedTotal + pure $ Client.SendMsgRequestNext (pure ()) (clientStNext (sizeNewFixedTotal, (sizeUnfixed, unfixedEntries))) , Client.recvMsgRollBackward = \_point _tip -> Client.ChainSyncClient $ pure $ - Client.SendMsgRequestNext (pure ()) (clientStNext lastSize) + Client.SendMsgRequestNext (pure ()) (clientStNext (sizeFixedTotal, (sizeFixEntries, fixEntries))) } + fixAccumulatedEntries = fixEntriesConsumed backend tracer . concat . reverse + + fixAccumulatedEntriesMaybe :: (Integer, [[FixEntry]]) -> IO (Integer, Integer, [[FixEntry]]) + fixAccumulatedEntriesMaybe (n, entries) + | n >= 10_000 = fixAccumulatedEntries entries >> pure (n, 0, []) + | otherwise = pure (0, n, entries) + logSize :: Integer -> Integer -> IO () logSize lastSize newSize = do when (newSize `div` 200_000 > lastSize `div` 200_000) $