From 47fce0de9e9e452d7c84c24ecbd659d5deee55ab Mon Sep 17 00:00:00 2001 From: Lars Kuhtz Date: Thu, 9 Jan 2025 01:57:16 -0800 Subject: [PATCH] WIP make minimal payload provider run as devnet --- node/src/ChainwebNode.hs | 26 +- src/Chainweb/Chainweb.hs | 223 ++++++++++------ src/Chainweb/Chainweb/ChainResources.hs | 279 ++++++++++++++++++--- src/Chainweb/Chainweb/CheckReachability.hs | 1 + src/Chainweb/Chainweb/CutResources.hs | 171 ++++--------- src/Chainweb/Chainweb/MempoolSyncClient.hs | 57 +++-- src/Chainweb/Chainweb/PeerResources.hs | 9 +- src/Chainweb/CutDB.hs | 15 +- src/Chainweb/CutDB/Sync.hs | 6 +- src/Chainweb/Miner/Coordinator.hs | 213 +++++++++++++--- src/Chainweb/Miner/Miners.hs | 31 ++- src/Chainweb/Miner/PayloadCache.hs | 16 +- src/Chainweb/PayloadProvider.hs | 16 +- src/Chainweb/PayloadProvider/Minimal.hs | 56 ++++- src/Chainweb/PayloadProvider/P2P.hs | 10 +- src/Chainweb/RestAPI.hs | 15 +- src/Chainweb/Sync/WebBlockHeaderStore.hs | 58 +++-- src/P2P/Node.hs | 163 ++++++++---- 18 files changed, 929 insertions(+), 436 deletions(-) diff --git a/node/src/ChainwebNode.hs b/node/src/ChainwebNode.hs index 6ded0f6f06..a6af7852dd 100644 --- a/node/src/ChainwebNode.hs +++ b/node/src/ChainwebNode.hs @@ -91,7 +91,7 @@ import Chainweb.Logging.Config import Chainweb.Logging.Miner import Chainweb.Mempool.Consensus (ReintroducedTxsLog) import Chainweb.Mempool.InMemTypes (MempoolStats(..)) -import Chainweb.Miner.Coordinator (MiningStats) +-- import Chainweb.Miner.Coordinator (MiningStats) import Chainweb.Pact.Backend.DbCache (DbCacheStats) import Chainweb.Pact.Service.PactQueue (PactQueueStats) import Chainweb.Pact.RestAPI.Server (PactCmdLog(..)) @@ -212,7 +212,7 @@ runMonitorLoop actionLabel logger = runForeverThrottled 10 -- 10 bursts in case of failure (10 * mega) -- allow restart every 10 seconds in case of failure -runCutMonitor :: Logger logger => logger -> CutDb tbl -> IO () +runCutMonitor :: Logger logger => logger -> CutDb -> IO () runCutMonitor logger db = L.withLoggerLabel ("component", "cut-monitor") logger $ \l -> runMonitorLoop "ChainwebNode.runCutMonitor" l $ do logFunctionJson l Info . cutToCutHashes Nothing @@ -240,21 +240,19 @@ instance ToJSON BlockUpdate where {-# INLINE toEncoding #-} {-# INLINE toJSON #-} -runBlockUpdateMonitor :: CanReadablePayloadCas tbl => Logger logger => logger -> CutDb tbl -> IO () +runBlockUpdateMonitor :: Logger logger => logger -> CutDb -> IO () runBlockUpdateMonitor logger db = L.withLoggerLabel ("component", "block-update-monitor") logger $ \l -> runMonitorLoop "ChainwebNode.runBlockUpdateMonitor" l $ do blockDiffStream db & S.mapM toUpdate & S.mapM_ (logFunctionJson l Info) where - payloadDb = view cutDbPayloadDb db - txCount :: BlockHeader -> IO Int - txCount bh = do - bp <- lookupPayloadDataWithHeight payloadDb (Just $ view blockHeight bh) (view blockPayloadHash bh) >>= \case - Nothing -> error "block payload not found" - Just x -> return x - return $ length $ view payloadDataTransactions bp + txCount bh = return (-1) + -- bp <- lookupPayloadDataWithHeight (Just $ view blockHeight bh) (view blockPayloadHash bh) >>= \case + -- Nothing -> error "block payload not found" + -- Just x -> return x + -- return $ length $ view payloadDataTransactions bp toUpdate :: Either BlockHeader BlockHeader -> IO BlockUpdate toUpdate (Right bh) = BlockUpdate @@ -289,7 +287,7 @@ runRtsMonitor logger = L.withLoggerLabel ("component", "rts-monitor") logger go logFunctionJson logger Info stats approximateThreadDelay 60_000_000 {- 1 minute -} -runQueueMonitor :: Logger logger => logger -> CutDb tbl -> IO () +runQueueMonitor :: Logger logger => logger -> CutDb -> IO () runQueueMonitor logger cutDb = L.withLoggerLabel ("component", "queue-monitor") logger go where go l = do @@ -403,8 +401,8 @@ withNodeLogger logCfg chainwebCfg v f = runManaged $ do $ mkTelemetryLogger @NewMinedBlock mgr teleLogConfig orphanedBlockBackend <- managed $ mkTelemetryLogger @OrphanedBlock mgr teleLogConfig - miningStatsBackend <- managed - $ mkTelemetryLogger @MiningStats mgr teleLogConfig +-- miningStatsBackend <- managed +-- $ mkTelemetryLogger @MiningStats mgr teleLogConfig requestLogBackend <- managed $ mkTelemetryLogger @RequestResponseLog mgr teleLogConfig queueStatsBackend <- managed @@ -441,7 +439,7 @@ withNodeLogger logCfg chainwebCfg v f = runManaged $ do , logHandler endpointBackend , logHandler newBlockBackend , logHandler orphanedBlockBackend - , logHandler miningStatsBackend + -- , logHandler miningStatsBackend , logHandler requestLogBackend , logHandler queueStatsBackend , logHandler reintroBackend diff --git a/src/Chainweb/Chainweb.hs b/src/Chainweb/Chainweb.hs index 50b2701981..5951f08af0 100644 --- a/src/Chainweb/Chainweb.hs +++ b/src/Chainweb/Chainweb.hs @@ -106,7 +106,7 @@ import Control.Concurrent.MVar (MVar, readMVar) import Control.DeepSeq import Control.Lens hiding ((.=), (<.>)) import Control.Monad -import Control.Monad.Catch (fromException) +import Control.Monad.Catch (fromException, MonadThrow (throwM)) import Data.Foldable import qualified Data.HashMap.Strict as HM @@ -185,6 +185,11 @@ import qualified Pact.Types.Command as P import Chainweb.PayloadProvider import Chainweb.PayloadProvider.Minimal import Chainweb.RestAPI.Utils (SomeServer) +import Control.Exception + +import Chainweb.Sync.WebBlockHeaderStore +import Chainweb.BlockHeader +import qualified Data.HashSet as HS -- -------------------------------------------------------------------------- -- -- Chainweb Resources @@ -192,7 +197,7 @@ import Chainweb.RestAPI.Utils (SomeServer) data Chainweb logger = Chainweb { _chainwebHostAddress :: !HostAddress , _chainwebChains :: !(HM.HashMap ChainId (ChainResources logger)) - , _chainwebCutResources :: !(CutResources logger) + , _chainwebCutResources :: !CutResources , _chainwebMiner :: !(Maybe (MinerResources logger)) , _chainwebCoordinator :: !(Maybe (MiningCoordination logger)) , _chainwebLogger :: !logger @@ -231,15 +236,15 @@ withChainweb -> (StartedChainweb logger -> IO ()) -> IO () withChainweb c logger rocksDb pactDbDir backupDir resetDb inner = - withPeerResources v (view configP2p confWithBootstraps) logger $ \logger' peer -> + withPeerResources v (_configP2p confWithBootstraps) logger $ \logger' peerRes -> withSocket serviceApiPort serviceApiHost $ \serviceSock -> do let conf' = confWithBootstraps - & set configP2p (_peerResConfig peer) + & set configP2p (_peerResConfig peerRes) & set (configServiceApi . serviceApiConfigPort) (fst serviceSock) withChainwebInternal conf' logger' - peer + peerRes serviceSock rocksDb pactDbDir @@ -294,7 +299,7 @@ withChainwebInternal -> Bool -> (StartedChainweb logger -> IO ()) -> IO () -withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir resetDb inner = do +withChainwebInternal conf logger peerRes serviceSock rocksDb pactDbDir backupDir resetDb inner = do unless (_configOnlySyncPact conf || _configReadOnlyReplay conf) $ initializePayloadDb v payloadDb @@ -330,7 +335,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re (\cid x -> do -- let mcfg = validatingMempoolConfig cid v (_configBlockGasLimit conf) (_configMinGasPrice conf) - -- FIXME: shouldn't this be done in a configuration validation? -- NOTE: the gas limit may be set based on block height in future, so this approach may not be valid. let maxGasLimit = fromIntegral <$> maxBlockGasLimit v maxBound @@ -343,25 +347,18 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re ] _ -> return () - -- withChainResources - -- v - -- cid - -- rocksDb - -- (chainLogger cid) - -- mcfg - -- payloadDb - -- pactDbDir - -- (pactConfig maxGasLimit) - -- txFailuresCounter - -- x + -- Initialize all chain resources, including payload providers withChainResources (chainLogger cid) v cid rocksDb - (_peerResManager peer) + (_peerResManager peerRes) pactDbDir (pactConfig maxGasLimit) + (_peerResConfig peerRes) + myInfo + peerDb defaultMinimalProviderConfig x ) @@ -373,6 +370,32 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re ) cidsList where + v = _configChainwebVersion conf + + cids :: HS.HashSet ChainId + cids = chainIds v + + cidsList :: [ChainId] + cidsList = toList cids + + mgr :: HTTP.Manager + mgr = _peerResManager peerRes + + payloadDb :: PayloadDb RocksDbTable + payloadDb = newPayloadDb rocksDb + + peer :: Peer + peer = _peerResPeer peerRes + + myInfo :: PeerInfo + myInfo = _peerInfo peer + + peerDb :: PeerDb + peerDb = _peerResDb peerRes + + p2pConfig :: P2pConfiguration + p2pConfig = _peerResConfig peerRes + pactConfig maxGasLimit = PactServiceConfig { _pactReorgLimit = _configReorgLimit conf , _pactPreInsertCheckTimeout = _configPreInsertCheckTimeout conf @@ -380,8 +403,8 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re , _pactResetDb = resetDb , _pactAllowReadsInLocal = _configAllowReadsInLocal conf , _pactUnlimitedInitialRewind = - isJust (_cutDbParamsInitialHeightLimit cutConfig) || - isJust (_cutDbParamsInitialCutFile cutConfig) + isJust (_cutDbParamsInitialHeightLimit cutDbParams) || + isJust (_cutDbParamsInitialCutFile cutDbParams) , _pactNewBlockGasLimit = maybe id min maxGasLimit (_configBlockGasLimit conf) , _pactLogGas = _configLogGas conf , _pactModuleCacheLimit = _configModuleCacheLimit conf @@ -396,41 +419,63 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re , _pactMiner = Nothing } + -- FIXME: make this configurable + cutDbParams :: CutDbParams + cutDbParams = (defaultCutDbParams v $ _cutFetchTimeout cutConf) + { _cutDbParamsLogLevel = Info + , _cutDbParamsTelemetryLevel = Info + , _cutDbParamsInitialHeightLimit = _cutInitialBlockHeightLimit cutConf + , _cutDbParamsFastForwardHeightLimit = _cutFastForwardBlockHeightLimit cutConf + , _cutDbParamsReadOnly = _configOnlySyncPact conf || _configReadOnlyReplay conf + } + where + cutConf = _configCuts conf + + -- Logger + + backupLogger :: logger + backupLogger = addLabel ("component", "backup") logger + pruningLogger :: T.Text -> logger pruningLogger l = addLabel ("sub-component", l) $ setComponent "database-pruning" logger - cidsList :: [ChainId] - cidsList = toList cids - - payloadDb :: PayloadDb RocksDbTable - payloadDb = newPayloadDb rocksDb - - chainLogger :: ChainId -> logger - chainLogger cid = addLabel ("chain", toText cid) logger + chainLogger :: HasChainId c => c -> logger + chainLogger cid = addLabel ("chain", toText (_chainId cid)) logger initLogger :: logger initLogger = setComponent "init" logger + providerLogger :: HasChainId p => HasPayloadProviderType p => p -> logger + providerLogger p = addLabel ("provider", toText (_payloadProviderType p)) + $ chainLogger p + logg :: LogFunctionText logg = logFunctionText initLogger + chainLogg :: HasChainId c => c -> LogFunctionText + chainLogg = logFunctionText . chainLogger + + providerLogg :: HasChainId p => HasPayloadProviderType p => p -> LogFunctionText + providerLogg = logFunctionText . providerLogger + -- Initialize global resources + -- TODO: Can this be moved to a top-level function or broken down a bit to + -- avoid excessive indentation? + global :: HM.HashMap ChainId (ChainResources logger) -> IO () global cs = do let !webchain = mkWebBlockHeaderDb v (HM.map _chainResBlockHeaderDb cs) - -- FIXME FIXME FIXME -- !pact = mkWebPactExecutionService (HM.map _chainResPact cs) - providers = error "Chainweb.Chainweb.withChainwebInternal.global: provider initialization not yet implemented" + !providers = payloadProvidersForAllChains cs !cutLogger = setComponent "cut" logger - !mgr = _peerResManager peer logg Debug "start initializing cut resources" logFunctionJson logger Info InitializingCutResources - withCutResources cutConfig peer cutLogger rocksDb webchain providers mgr $ \cuts -> do + withCutResources cutLogger cutDbParams p2pConfig myInfo peerDb rocksDb webchain providers mgr $ \cuts -> do logg Debug "finished initializing cut resources" let !mLogger = setComponent "miner" logger @@ -512,12 +557,17 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re else do initialCut <- _cut mCutDb + -- synchronize payload providers. this also initializes + -- mining. + synchronizeProviders webchain providers initialCut + -- FIXME: synchronize all payload providers -- logg Info "start synchronizing Pact DBs to initial cut" -- logFunctionJson logger Info InitialSyncInProgress -- synchronizePactDb pactSyncChains initialCut -- logg Info "finished synchronizing Pact DBs to initial cut" + -- withPactData cs cuts $ \pactData -> do do logg Debug "start initializing miner resources" @@ -540,7 +590,7 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re , _chainwebMiner = m , _chainwebCoordinator = mc , _chainwebLogger = logger - , _chainwebPeer = peer + , _chainwebPeer = peerRes , _chainwebPayloadProviders = providers , _chainwebManager = mgr -- , _chainwebPactData = pactData @@ -558,36 +608,24 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re } } - -- withPactData - -- :: HM.HashMap ChainId (ChainResources logger) - -- -> CutResources logger - -- -> ([(ChainId, PactServerData logger tbl)] -> IO b) - -- -> IO b - -- withPactData cs cuts m = do - -- let l = sortBy (compare `on` fst) (HM.toList cs) - -- m $ l <&> fmap (\cr -> PactServerData - -- { _pactServerDataCutDb = _cutResCutDb cuts - -- , _pactServerDataMempool = _chainResMempool cr - -- , _pactServerDataLogger = _chainResLogger cr - -- , _pactServerDataPact = _chainResPact cr - -- , _pactServerDataPayloadDb = _chainResPayloadDb cr - -- }) - - v = _configChainwebVersion conf - cids = chainIds v - backupLogger = addLabel ("component", "backup") logger - - -- FIXME: make this configurable - cutConfig :: CutDbParams - cutConfig = (defaultCutDbParams v $ _cutFetchTimeout cutConf) - { _cutDbParamsLogLevel = Info - , _cutDbParamsTelemetryLevel = Info - , _cutDbParamsInitialHeightLimit = _cutInitialBlockHeightLimit cutConf - , _cutDbParamsFastForwardHeightLimit = _cutFastForwardBlockHeightLimit cutConf - , _cutDbParamsReadOnly = _configOnlySyncPact conf || _configReadOnlyReplay conf - } + synchronizeProviders :: WebBlockHeaderDb -> PayloadProviders -> Cut -> IO () + synchronizeProviders wbh providers c = do + mapConcurrently_ syncOne (_cutHeaders c) where - cutConf = _configCuts conf + syncOne hdr = withPayloadProvider providers hdr $ \provider -> do + providerLogg provider Info $ + "sync payload provider to " + <> sshow (view blockHeight hdr) + <> ":" <> sshow (view blockHash hdr) + finfo <- forkInfoForHeader wbh hdr Nothing + providerLogg provider Debug $ "syncToBlock with fork info " <> sshow finfo + r <- syncToBlock provider Nothing finfo `catch` \(e :: SomeException) -> do + providerLogg provider Warn $ "syncToBlock for " <> sshow finfo <> " failed with :" <> sshow e + throwM e + unless (r == _forkInfoTargetState finfo) $ do + providerLogg provider Error $ "unexpectedResult" + error "Chainweb.Chainweb.synchronizeProviders: unexpected result state" + -- FIXME -- synchronizePactDb :: HM.HashMap ChainId (ChainResources logger) -> Cut -> IO () -- synchronizePactDb cs targetCut = do @@ -604,6 +642,21 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re -- void $ _pactSyncToBlock pact bh -- logCr Debug "pact db synchronized" + -- withPactData + -- :: HM.HashMap ChainId (ChainResources logger) + -- -> CutResources + -- -> ([(ChainId, PactServerData logger tbl)] -> IO b) + -- -> IO b + -- withPactData cs cuts m = do + -- let l = sortBy (compare `on` fst) (HM.toList cs) + -- m $ l <&> fmap (\cr -> PactServerData + -- { _pactServerDataCutDb = _cutResCutDb cuts + -- , _pactServerDataMempool = _chainResMempool cr + -- , _pactServerDataLogger = _chainResLogger cr + -- , _pactServerDataPact = _chainResPact cr + -- , _pactServerDataPayloadDb = _chainResPayloadDb cr + -- }) + -- -------------------------------------------------------------------------- -- -- Throttling @@ -662,6 +715,8 @@ runChainweb -> IO () runChainweb cw nowServing = do logg Debug "start chainweb node" + + -- Create OpenAPI Validation Middlewars mkValidationMiddleware <- interleaveIO $ OpenAPIValidation.mkValidationMiddleware (_chainwebLogger cw) (_chainwebVersion cw) (_chainwebManager cw) p2pValidationMiddleware <- @@ -679,7 +734,7 @@ runChainweb cw nowServing = do concurrentlies_ - -- 1. Start serving Rest API + -- 1. Start serving P2P Rest API [ (if tls then serve else servePlain) $ httpLog . throttle (_chainwebPutPeerThrottler cw) @@ -691,7 +746,7 @@ runChainweb cw nowServing = do -- 2. Start Clients (with a delay of 500ms) , threadDelay 500000 >> clients - -- 3. Start serving local API + -- 3. Start serving local service API , threadDelay 500000 >> do serveServiceApi $ serviceHttpLog @@ -708,7 +763,8 @@ runChainweb cw nowServing = do mpClients <- mempoolSyncClients concurrentlies_ $ concat [ miner - , cutNetworks mgr (_chainwebCutResources cw) + , cutNetworks (_chainwebCutResources cw) + , runP2pNodesOfAllChains chainVals , mpClients ] @@ -728,16 +784,12 @@ runChainweb cw nowServing = do peerDb = _peerResDb (_chainwebPeer cw) + -- FIXME export the SomeServer instead of DBs? + -- I.e. the handler would be created in the chain resource. + -- chainDbsToServe :: [(ChainId, BlockHeaderDb)] chainDbsToServe = proj _chainResBlockHeaderDb - a = mapM (_providerResP2pApi . _chainResPayloadProvider) <$> chains - - payloadsToServe :: [(ChainId, SomeServer)] - payloadsToServe = catMaybes - $ mapM (fmap snd . _providerResP2pApi . _chainResPayloadProvider) - <$> chains - mempoolsToServe :: [(ChainId, Mempool.MempoolBackend Pact4.UnparsedTransaction)] -- mempoolsToServe = proj _chainResMempool mempoolsToServe = [] @@ -746,9 +798,14 @@ runChainweb cw nowServing = do -- pactDbsToServe = _chainwebPactData cw pactDbsToServe = [] - memP2pToServe :: [(NetworkId, PeerDb)] + memP2pPeersToServe :: [(NetworkId, PeerDb)] -- memP2pToServe = (\(i, _) -> (MempoolNetwork i, peerDb)) <$> chains - memP2pToServe = [] + memP2pPeersToServe = [] + + -- TODO use the peerDbs from the respective chains + -- (even though those are currently all the same) + payloadP2pPeersToServe :: [(NetworkId, PeerDb)] + payloadP2pPeersToServe = (\(i, _) -> (ChainNetwork i, peerDb)) <$> chains loggServerError msg (Just r) e = "HTTP server error (" <> msg <> "): " <> sshow e <> ". Request: " <> sshow r @@ -797,8 +854,9 @@ runChainweb cw nowServing = do { _chainwebServerCutDb = Just cutDb , _chainwebServerBlockHeaderDbs = chainDbsToServe , _chainwebServerMempools = mempoolsToServe - , _chainwebServerPayloads = payloadsToServe - , _chainwebServerPeerDbs = (CutNetwork, cutPeerDb) : memP2pToServe + , _chainwebServerPayloads = payloadsToServeOnP2pApi chains + , _chainwebServerPeerDbs = (CutNetwork, cutPeerDb) + : memP2pPeersToServe <> payloadP2pPeersToServe } mw) (monitorConnectionsClosedByClient clientClosedConnectionsCounter) @@ -816,8 +874,8 @@ runChainweb cw nowServing = do { _chainwebServerCutDb = Just cutDb , _chainwebServerBlockHeaderDbs = chainDbsToServe , _chainwebServerMempools = mempoolsToServe - , _chainwebServerPayloads = payloadsToServe - , _chainwebServerPeerDbs = (CutNetwork, cutPeerDb) : memP2pToServe + , _chainwebServerPayloads = payloadsToServeOnP2pApi chains + , _chainwebServerPeerDbs = (CutNetwork, cutPeerDb) : memP2pPeersToServe } mw) (monitorConnectionsClosedByClient clientClosedConnectionsCounter) @@ -874,8 +932,8 @@ runChainweb cw nowServing = do { _chainwebServerCutDb = Just cutDb , _chainwebServerBlockHeaderDbs = chainDbsToServe , _chainwebServerMempools = mempoolsToServe - , _chainwebServerPayloads = payloadsToServe - , _chainwebServerPeerDbs = (CutNetwork, cutPeerDb) : memP2pToServe + , _chainwebServerPayloads = payloadsToServeOnServiceApi chains + , _chainwebServerPeerDbs = (CutNetwork, cutPeerDb) : memP2pPeersToServe } (_chainwebCoordinator cw) (HeaderStream . _configHeaderStream $ _chainwebConfig cw) @@ -894,7 +952,7 @@ runChainweb cw nowServing = do cutDb = _cutResCutDb $ _chainwebCutResources cw cutPeerDb :: PeerDb - cutPeerDb = _peerResDb $ _cutResPeer $ _chainwebCutResources cw + cutPeerDb = _cutResPeerDb $ _chainwebCutResources cw miner :: [IO ()] miner = maybe [] (\m -> [ runMiner (_chainwebVersion cw) m ]) $ _chainwebMiner cw @@ -924,3 +982,4 @@ runChainweb cw nowServing = do enabled conf = do logg Info "Mempool p2p sync enabled" return $ map (runMempoolSyncClient mgr conf (_chainwebPeer cw)) chainVals + diff --git a/src/Chainweb/Chainweb/ChainResources.hs b/src/Chainweb/Chainweb/ChainResources.hs index 5d4b16e914..5c4fdcc6e4 100644 --- a/src/Chainweb/Chainweb/ChainResources.hs +++ b/src/Chainweb/Chainweb/ChainResources.hs @@ -1,16 +1,19 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE ImportQualifiedPost #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeAbstractions #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE TypeAbstractions #-} -{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE ViewPatterns #-} -- | -- Module: Chainweb.Chainweb.ChainResources @@ -25,63 +28,188 @@ module Chainweb.Chainweb.ChainResources ( ChainResources(..) , chainResBlockHeaderDb , chainResLogger +, chainResPayloadProvider , withChainResources +, payloadsToServeOnP2pApi +, payloadsToServeOnServiceApi +, payloadProvidersForAllChains +, runP2pNodesOfAllChains -- * Payload Provider , ProviderResources(..) , withPayloadProviderResources +, providerResPayloadProvider +, providerResServiceApi +, providerResP2pApiResources + +-- * Payload Provider P2P Resources +, PayloadP2pResources(..) +, payloadP2pResources +, runPayloadP2pNodes + +-- * Payload Provider Service API Resources +, PayloadServiceApiResources(..) +, payloadServiceApiResources ) where import Chainweb.BlockHeaderDB +import Chainweb.BlockPayloadHash import Chainweb.ChainId +import Chainweb.Chainweb.Configuration (ServiceApiConfig(_serviceApiPayloadBatchLimit)) import Chainweb.Logger import Chainweb.Pact.Types import Chainweb.PayloadProvider import Chainweb.PayloadProvider.Minimal -import Chainweb.PayloadProvider.P2P.RestAPI (payloadApi) +import Chainweb.PayloadProvider.P2P.RestAPI import Chainweb.PayloadProvider.P2P.RestAPI.Server +import Chainweb.RestAPI.NetworkID import Chainweb.RestAPI.Utils +import Chainweb.Storage.Table import Chainweb.Storage.Table.RocksDB +import Chainweb.Utils import Chainweb.Version import Control.Lens hiding ((.=), (<.>)) import Data.Maybe +import Data.PQueue (PQueue) +import Data.Text qualified as T import Network.HTTP.Client qualified as HTTP +import P2P.Node +import P2P.Node.Configuration +import P2P.Node.PeerDB (PeerDb) +import P2P.Peer (PeerInfo) +import P2P.Session +import P2P.TaskQueue import Prelude hiding (log) +import Data.HashMap.Strict qualified as HM +import Data.Foldable + +-- -------------------------------------------------------------------------- -- +-- Payload P2P Network Resources +-- +-- The following is the default implementation that works for the current +-- providers. It is not necessarily true that all payload providers use this. + +-- | Payload P2P network resources. +-- +-- This includes both client and server components. +-- +data PayloadP2pResources = PayloadP2pResources + { _payloadResPeerDb :: !PeerDb + -- ^ The respective Peer resources for the payload P2P network + , _payloadResP2pNode :: !P2pNode + -- ^ The P2P Network for fetching payloads + -- + -- The network doesn't restrict the API network endpoints that are used + -- in the client sessions. + , _payloadResP2pApi :: !SomeApi + -- ^ API endpoints that are included in the node P2P API + , _payloadResP2pServer :: !SomeServer + -- ^ API endpoints that are are served by the node P2P API + } + +instance HasChainwebVersion PayloadP2pResources where + _chainwebVersion = _chainwebVersion . _payloadResPeerDb + +payloadP2pResources + :: forall (v :: ChainwebVersionT) (c :: ChainIdT) (p :: PayloadProviderType) logger tbl + . Logger logger + => ReadableTable tbl RankedBlockPayloadHash (PayloadType p) + => KnownChainwebVersionSymbol v + => KnownChainIdSymbol c + => IsPayloadProvider p + => logger + -> P2pConfiguration + -> PeerInfo + -> PeerDb + -> tbl + -- Payload Table + -> PQueue (Task ClientEnv (PayloadType p)) + -- ^ task queue for scheduling tasks with the task server + -> HTTP.Manager + -> IO PayloadP2pResources +payloadP2pResources logger p2pConfig myInfo peerDb tbl queue mgr = do + p2pNode <- mkP2pNode False "payload" (session 10 queue) + -- FIXME add a node for regularly synchronizing peers + return $ PayloadP2pResources + { _payloadResPeerDb = peerDb + , _payloadResP2pNode = p2pNode + , _payloadResP2pApi = SomeApi (payloadApi @v @c @p) + , _payloadResP2pServer = somePayloadServer @_ @v @c @p 20 tbl + } + where + p2pLogger = addLabel ("sub-component", "p2p") logger + + mkP2pNode :: Bool -> T.Text -> P2pSession -> IO P2pNode + mkP2pNode doPeerSync label s = p2pCreateNode $ P2pNodeParameters + { _p2pNodeParamsMyPeerInfo = myInfo + , _p2pNodeParamsSession = s + , _p2pNodeParamsSessionTimeout = _p2pConfigSessionTimeout p2pConfig + , _p2pNodeParamsMaxSessionCount = _p2pConfigMaxSessionCount p2pConfig + , _p2pNodeParamsIsPrivate = _p2pConfigPrivate p2pConfig + , _p2pNodeParamsDoPeerSync = doPeerSync + , _p2pNodeParamsManager = mgr + , _p2pNodeParamsPeerDb = peerDb + , _p2pNodeParamsLogFunction = logFunction (addLabel ("session", label) p2pLogger) + , _p2pNodeParamsNetworkId = CutNetwork + } + +-- | IO actions for running Payload P2p Nodes +-- +runPayloadP2pNodes :: PayloadP2pResources -> [IO ()] +runPayloadP2pNodes r = [ p2pRunNode (_payloadResP2pNode r) ] + +-- -------------------------------------------------------------------------- -- +-- Payload Service API Resources + +data PayloadServiceApiResources = PayloadServiceApiResources + { _payloadResServiceApi :: !SomeApi + , _payloadResServiceServer :: !SomeServer + } + +payloadServiceApiResources + :: forall (v :: ChainwebVersionT) (c :: ChainIdT) (p :: PayloadProviderType) tbl + . IsPayloadProvider p + => KnownChainwebVersionSymbol v + => KnownChainIdSymbol c + => ReadableTable tbl RankedBlockPayloadHash (PayloadType p) + => ServiceApiConfig + -> tbl + -> PayloadServiceApiResources +payloadServiceApiResources config pdb = PayloadServiceApiResources + { _payloadResServiceApi = SomeApi (payloadApi @v @c @p) + , _payloadResServiceServer = somePayloadServer @_ @v @c @p batchLimit pdb + } + where + batchLimit = int $ _serviceApiPayloadBatchLimit config --- import Control.Concurrent.MVar --- import Chainweb.Mempool.Consensus qualified as MPCon --- import Chainweb.Mempool.InMem qualified as Mempool --- import Chainweb.Mempool.InMemTypes qualified as Mempool --- import Chainweb.Mempool.Mempool (MempoolBackend) --- import Chainweb.Pact.Service.PactInProcApi --- import Chainweb.Payload.PayloadStore --- import Chainweb.Pact4.Transaction qualified as Pact4 --- import Chainweb.WebPactExecutionService --- import Chainweb.Counter --- import Data.Proxy -- -------------------------------------------------------------------------- -- -- Payload Provider Resources -data ProviderResources logger = ProviderResources +-- | Payload Provider Resources +-- +data ProviderResources = ProviderResources { _providerResPayloadProvider :: !SomePayloadProvider - , _providerResServiceApi :: !(Maybe (SomeApi, SomeServer)) - -- ^ API endpoints that are included in and served by the node service - -- API - , _providerResP2pApi :: !(Maybe (SomeApi, SomeServer)) - -- ^ API endpoints that are included in and served by the node P2P API + , _providerResServiceApi :: !(Maybe PayloadServiceApiResources) + , _providerResP2pApiResources :: !(Maybe PayloadP2pResources) } makeLenses ''ProviderResources -instance HasChainwebVersion (ProviderResources logger) where +instance HasChainwebVersion ProviderResources where _chainwebVersion = _chainwebVersion . _providerResPayloadProvider {-# INLINE _chainwebVersion #-} -instance HasChainId (ProviderResources logger) where +instance HasChainId ProviderResources where _chainId = _chainId . _providerResPayloadProvider {-# INLINE _chainId #-} + -- FIXME + -- initialize payload store + -- payloadStore <- newWebPayloadStore mgr pact payloadDb (logFunction logger) + -- Where is this done? The queue is used by the P2p Session and + -- the Payload Provider. + withPayloadProviderResources :: Logger logger => HasChainwebVersion v @@ -89,25 +217,37 @@ withPayloadProviderResources => logger -> v -> c + -> P2pConfiguration + -> PeerInfo + -> PeerDb -> RocksDb -> HTTP.Manager -> MinimalProviderConfig - -> (ProviderResources logger -> IO a) + -> (ProviderResources -> IO a) -> IO a -withPayloadProviderResources logger v c rdb mgr mpConfig inner = do - SomeChainwebVersionT @v' _ <- return $ someChainwebVersionVal (_chainwebVersion v) - SomeChainIdT @c' _ <- return $ someChainIdVal (_chainId c) +withPayloadProviderResources logger v c p2pConfig myInfo peerDb rdb mgr mpConfig inner = do + SomeChainwebVersionT @v' _ <- return $ someChainwebVersionVal v + SomeChainIdT @c' _ <- return $ someChainIdVal c case provider of MinimalProvider -> do + + -- FIXME this should be better abstracted. + -- Should we put the api and server into the payload provider + -- itself? Would it be an issue if the payload provider would keep a + -- reference to it? + -- + -- It would allow the server to be integrated more closely with the + -- provider. + p <- newMinimalPayloadProvider logger v c rdb mgr mpConfig let pdb = view minimalPayloadDb p + let queue = view minimalPayloadQueue p + p2pRes <- + payloadP2pResources @v' @c' @'MinimalProvider logger p2pConfig myInfo peerDb pdb queue mgr inner ProviderResources { _providerResPayloadProvider = SomePayloadProvider p , _providerResServiceApi = Nothing - , _providerResP2pApi = Just - ( SomeApi (payloadApi @v' @c' @'MinimalProvider) - , somePayloadServer @_ @v' @c' @'MinimalProvider 20 pdb - ) + , _providerResP2pApiResources = Just p2pRes } PactProvider -> error "Chainweb.PayloadProvider.P2P.RestAPI.somePayloadApi: providerResources not implemented for Pact" @@ -125,11 +265,21 @@ withPayloadProviderResources logger v c rdb mgr mpConfig inner = do data ChainResources logger = ChainResources { _chainResBlockHeaderDb :: !BlockHeaderDb , _chainResLogger :: !logger - , _chainResPayloadProvider :: !(ProviderResources logger) + , _chainResPayloadProvider :: !ProviderResources } makeLenses ''ChainResources +_chainResP2pApiResources + :: ChainResources logger + -> Maybe PayloadP2pResources +_chainResP2pApiResources = _providerResP2pApiResources . _chainResPayloadProvider + +_chainResServiceApiResources + :: ChainResources logger + -> Maybe PayloadServiceApiResources +_chainResServiceApiResources = _providerResServiceApi . _chainResPayloadProvider + instance HasChainwebVersion (ChainResources logger) where _chainwebVersion = _chainwebVersion . _chainResBlockHeaderDb {-# INLINE _chainwebVersion #-} @@ -150,18 +300,69 @@ withChainResources -> FilePath -- ^ database directory for pact databases -> PactServiceConfig + -> P2pConfiguration + -> PeerInfo + -> PeerDb -> MinimalProviderConfig -- ^ FIXME create a a type that bundles different provider configs -> (ChainResources logger -> IO a) -> IO a -withChainResources logger v c rdb mgr pactDbDir pConf mConf inner = +withChainResources logger v c rdb mgr _pactDbDir _pConf p2pConf myInfo peerDb mConf inner = + + -- This uses the the CutNetwork for fetching block headers. withBlockHeaderDb rdb (_chainwebVersion v) (_chainId c) $ \cdb -> do - withPayloadProviderResources logger v c rdb mgr mConf $ \provider -> do - inner ChainResources - { _chainResBlockHeaderDb = cdb - , _chainResPayloadProvider = provider - , _chainResLogger = logger - } + + -- Payload Providers are using per chain payload networks for fetching + -- block headers. + withPayloadProviderResources + logger v c p2pConf myInfo peerDb rdb mgr mConf $ \provider -> do + + inner ChainResources + { _chainResBlockHeaderDb = cdb + , _chainResPayloadProvider = provider + , _chainResLogger = logger + } + +-- | Return P2P Payload Servers for all chains +-- +payloadsToServeOnP2pApi + :: [(ChainId, ChainResources logger)] + -> [(ChainId, SomeServer)] +payloadsToServeOnP2pApi chains = catMaybes + $ mapM (fmap _payloadResP2pServer . _chainResP2pApiResources) + <$> chains + +-- | Return Service API Payload Servers for all chains +-- +payloadsToServeOnServiceApi + :: [(ChainId, ChainResources logger)] + -> [(ChainId, SomeServer)] +payloadsToServeOnServiceApi chains = catMaybes + $ mapM (fmap _payloadResServiceServer . _chainResServiceApiResources) + <$> chains + +-- | Return the payload providers for all chains +-- +payloadProvidersForAllChains + :: HM.HashMap ChainId (ChainResources logger) + -> PayloadProviders +payloadProvidersForAllChains chains = PayloadProviders + $ (_providerResPayloadProvider . _chainResPayloadProvider) + <$> chains + +-- | Returns actions for running the P2P nodes for all chains. +-- +runP2pNodesOfAllChains + :: Foldable l + => l (ChainResources logger) + -> [IO ()] +runP2pNodesOfAllChains + = fmap p2pRunNode + . fmap _payloadResP2pNode + . catMaybes + . fmap _providerResP2pApiResources + . fmap _chainResPayloadProvider + . toList -- -------------------------------------------------------------------------- -- -- ATTIC (mostly pact related) diff --git a/src/Chainweb/Chainweb/CheckReachability.hs b/src/Chainweb/Chainweb/CheckReachability.hs index 1480ef00f5..6bc8e6deb2 100644 --- a/src/Chainweb/Chainweb/CheckReachability.hs +++ b/src/Chainweb/Chainweb/CheckReachability.hs @@ -51,6 +51,7 @@ import Chainweb.Utils import Chainweb.Version import P2P.Node.PeerDB +import P2P.Node.RestAPI.Client import P2P.Peer -- -------------------------------------------------------------------------- -- diff --git a/src/Chainweb/Chainweb/CutResources.hs b/src/Chainweb/Chainweb/CutResources.hs index 994d3982df..27c0241c8a 100644 --- a/src/Chainweb/Chainweb/CutResources.hs +++ b/src/Chainweb/Chainweb/CutResources.hs @@ -21,28 +21,19 @@ -- should be kept after intialization of the node is complete. -- module Chainweb.Chainweb.CutResources -( CutSyncResources(..) -, CutResources(..) -, cutsCutDb +( CutResources(..) , withCutResources , cutNetworks ) where -import Control.Lens hiding ((.=), (<.>)) -import Control.Monad import Control.Monad.Catch -import qualified Data.Text as T - import Prelude hiding (log) import qualified Network.HTTP.Client as HTTP -import System.LogLevel - -- internal modules -import Chainweb.Chainweb.PeerResources import Chainweb.CutDB import qualified Chainweb.CutDB.Sync as C import Chainweb.Logger @@ -54,156 +45,84 @@ import Chainweb.WebBlockHeaderDB import Chainweb.Storage.Table.RocksDB import P2P.Node +import P2P.Node.Configuration import P2P.Peer -import P2P.Session import P2P.TaskQueue import Chainweb.PayloadProvider +import qualified Data.Text as T +import P2P.Session (P2pSession) +import P2P.Node.PeerDB (PeerDb) -- -------------------------------------------------------------------------- -- -- Cuts Resources -data CutSyncResources logger = CutSyncResources - { _cutResSyncSession :: !P2pSession - , _cutResSyncLogger :: !logger - } - -data CutResources logger = CutResources - { _cutResCutConfig :: !CutDbParams - , _cutResPeer :: !(PeerResources logger) +data CutResources = CutResources + { _cutResPeerDb :: !PeerDb , _cutResCutDb :: !CutDb - , _cutResLogger :: !logger - , _cutResCutSync :: !(CutSyncResources logger) - , _cutResHeaderSync :: !(CutSyncResources logger) + , _cutResCutP2pNode :: !P2pNode + -- ^ P2P Network for pushing and synchronizing cuts. + , _cutResHeaderP2pNode :: !P2pNode + -- ^ P2P Network for fetching block headers on demand via a task queue. } -makeLensesFor - [ ("_cutResCutDb", "cutsCutDb") - ] ''CutResources - -instance HasChainwebVersion (CutResources logger) where +instance HasChainwebVersion CutResources where _chainwebVersion = _chainwebVersion . _cutResCutDb {-# INLINE _chainwebVersion #-} withCutResources :: Logger logger - => CutDbParams - -> PeerResources logger - -> logger + => logger + -> CutDbParams + -> P2pConfiguration + -> PeerInfo + -> PeerDb -> RocksDb -> WebBlockHeaderDb -> PayloadProviders -> HTTP.Manager - -> (CutResources logger -> IO a) + -> (CutResources -> IO a) -> IO a -withCutResources cutDbParams peer logger rdb webchain providers mgr f = do +withCutResources logger cutDbParams p2pConfig myInfo peerDb rdb webchain providers mgr f = do -- initialize blockheader store headerStore <- newWebBlockHeaderStore mgr webchain (logFunction logger) - -- FIXME - -- initialize payload store - -- payloadStore <- newWebPayloadStore mgr pact payloadDb (logFunction logger) - -- initialize cutHashes store let cutHashesStore = cutHashesTable rdb - withCutDb cutDbParams (logFunction logger) headerStore providers cutHashesStore $ \cutDb -> + withCutDb cutDbParams (logFunction logger) headerStore providers cutHashesStore $ \cutDb -> do + cutP2pNode <- mkP2pNode True "cut" $ + C.syncSession myInfo cutDb + headerP2pNode <- mkP2pNode False "header" $ + session 10 (_webBlockHeaderStoreQueue headerStore) f $ CutResources - { _cutResCutConfig = cutDbParams - , _cutResPeer = peer + { _cutResPeerDb = peerDb , _cutResCutDb = cutDb - , _cutResLogger = logger - , _cutResCutSync = CutSyncResources - { _cutResSyncSession = C.syncSession v (_peerInfo $ _peerResPeer peer) cutDb - , _cutResSyncLogger = addLabel ("sync", "cut") syncLogger - } - , _cutResHeaderSync = CutSyncResources - { _cutResSyncSession = session 10 (_webBlockHeaderStoreQueue headerStore) - , _cutResSyncLogger = addLabel ("sync", "header") syncLogger - } - - -- FIXME - -- , _cutResPayloadSync = CutSyncResources - -- { _cutResSyncSession = session 10 (_webBlockPayloadStoreQueue payloadStore) - -- , _cutResSyncLogger = addLabel ("sync", "payload") syncLogger - -- } - + , _cutResCutP2pNode = cutP2pNode + , _cutResHeaderP2pNode = headerP2pNode } where - v = _chainwebVersion webchain syncLogger = addLabel ("sub-component", "sync") logger + mkP2pNode :: Bool -> T.Text -> P2pSession -> IO P2pNode + mkP2pNode doPeerSync label s = p2pCreateNode $ P2pNodeParameters + { _p2pNodeParamsMyPeerInfo = myInfo + , _p2pNodeParamsSession = s + , _p2pNodeParamsSessionTimeout = _p2pConfigSessionTimeout p2pConfig + , _p2pNodeParamsMaxSessionCount = _p2pConfigMaxSessionCount p2pConfig + , _p2pNodeParamsIsPrivate = _p2pConfigPrivate p2pConfig + , _p2pNodeParamsDoPeerSync = doPeerSync + , _p2pNodeParamsManager = mgr + , _p2pNodeParamsPeerDb = peerDb + , _p2pNodeParamsLogFunction = logFunction (addLabel ("sync", label) syncLogger) + , _p2pNodeParamsNetworkId = CutNetwork + } + -- | The networks that are used by the cut DB. -- -cutNetworks - :: Logger logger - => HTTP.Manager - -> CutResources logger - -> [IO ()] -cutNetworks mgr cuts = - [ runCutNetworkCutSync mgr cuts - , runCutNetworkHeaderSync mgr cuts --- , runCutNetworkPayloadSync mgr cuts +cutNetworks :: CutResources -> [IO ()] +cutNetworks cuts = + [ p2pRunNode (_cutResCutP2pNode cuts) + , p2pRunNode (_cutResHeaderP2pNode cuts) ] --- | P2P Network for pushing Cuts --- -runCutNetworkCutSync - :: Logger logger - => HTTP.Manager - -> CutResources logger - -> IO () -runCutNetworkCutSync mgr c - = mkCutNetworkSync mgr True c "cut sync" $ _cutResCutSync c - --- | P2P Network for Block Headers --- -runCutNetworkHeaderSync - :: Logger logger - => HTTP.Manager - -> CutResources logger - -> IO () -runCutNetworkHeaderSync mgr c - = mkCutNetworkSync mgr False c "block header sync" $ _cutResHeaderSync c - --- | P2P Network for Block Payloads --- --- runCutNetworkPayloadSync --- :: Logger logger --- => HTTP.Manager --- -> CutResources logger --- -> IO () --- runCutNetworkPayloadSync mgr c --- = mkCutNetworkSync mgr False c "block payload sync" $ _cutResPayloadSync c - --- | P2P Network for Block Payloads --- --- This uses the 'CutNetwork' for syncing peers. The network doesn't restrict --- the API network endpoints that are used in the client sessions. --- -mkCutNetworkSync - :: Logger logger - => HTTP.Manager - -> Bool - -- ^ Do peer synchronization - -> CutResources logger - -> T.Text - -> CutSyncResources logger - -> IO () -mkCutNetworkSync mgr doPeerSync cuts label cutSync = bracket create destroy $ \n -> - p2pStartNode (_peerResConfig $ _cutResPeer cuts) n - where - v = _chainwebVersion cuts - peer = _peerResPeer $ _cutResPeer cuts - logger = _cutResSyncLogger cutSync - peerDb = _peerResDb $ _cutResPeer cuts - s = _cutResSyncSession cutSync - - create = do - !n <- p2pCreateNode v CutNetwork peer (logFunction logger) peerDb mgr doPeerSync s - logFunctionText logger Debug $ label <> ": initialized" - return n - - destroy n = do - p2pStopNode n - logFunctionText logger Info $ label <> ": stopped" diff --git a/src/Chainweb/Chainweb/MempoolSyncClient.hs b/src/Chainweb/Chainweb/MempoolSyncClient.hs index d73beae6f7..a5ee9be6bb 100644 --- a/src/Chainweb/Chainweb/MempoolSyncClient.hs +++ b/src/Chainweb/Chainweb/MempoolSyncClient.hs @@ -53,6 +53,15 @@ import qualified Servant.Client as Sv -- -------------------------------------------------------------------------- -- -- Mempool sync. +-- FIXME this should be moved into Pact. +-- +-- While Pact is still part of the chainweb-node process, the pact networks will +-- be run via the global p2p infrastructure. +-- +-- However, These network should show up in the node initialization under +-- payloadProviderNetworks and should probably be bundled with the payload +-- networks from pact. + -- | Synchronize the local mempool over the P2P network. -- runMempoolSyncClient @@ -64,29 +73,31 @@ runMempoolSyncClient -> ChainResources logger -- ^ chain resources -> IO () -runMempoolSyncClient mgr memP2pConfig peerRes chain = bracket create destroy go - where - create = do - logg Debug "starting mempool p2p sync" - p2pCreateNode v netId peer (logFunction syncLogger) peerDb mgr True $ - mempoolSyncP2pSession chain (_mempoolP2pConfigPollInterval memP2pConfig) - go n = do - -- Run P2P client node - logg Debug "mempool sync p2p node initialized, starting session" - p2pStartNode p2pConfig n - - destroy n = p2pStopNode n `finally` logg Debug "mempool sync p2p node stopped" - - v = _chainwebVersion chain - peer = _peerResPeer peerRes - p2pConfig = _peerResConfig peerRes - & set p2pConfigMaxSessionCount (_mempoolP2pConfigMaxSessionCount memP2pConfig) - & set p2pConfigSessionTimeout (_mempoolP2pConfigSessionTimeout memP2pConfig) - peerDb = _peerResDb peerRes - netId = MempoolNetwork $ _chainId chain - - logg = logFunctionText syncLogger - syncLogger = setComponent "mempool-sync" $ _chainResLogger chain +runMempoolSyncClient mgr memP2pConfig peerRes chain = + error "Chainweb.Chainweb.MempoolSyncClient.mempoolSyncP2pSession: only supported for pact service which is currently disabled" +-- bracket create destroy go +-- where +-- create = do +-- logg Debug "starting mempool p2p sync" +-- p2pCreateNode v netId peer (logFunction syncLogger) peerDb mgr True $ +-- mempoolSyncP2pSession chain (_mempoolP2pConfigPollInterval memP2pConfig) +-- go n = do +-- -- Run P2P client node +-- logg Debug "mempool sync p2p node initialized, starting session" +-- p2pStartNode p2pConfig n +-- +-- destroy n = p2pStopNode n `finally` logg Debug "mempool sync p2p node stopped" +-- +-- v = _chainwebVersion chain +-- peer = _peerResPeer peerRes +-- p2pConfig = _peerResConfig peerRes +-- & set p2pConfigMaxSessionCount (_mempoolP2pConfigMaxSessionCount memP2pConfig) +-- & set p2pConfigSessionTimeout (_mempoolP2pConfigSessionTimeout memP2pConfig) +-- peerDb = _peerResDb peerRes +-- netId = MempoolNetwork $ _chainId chain +-- +-- logg = logFunctionText syncLogger +-- syncLogger = setComponent "mempool-sync" $ _chainResLogger chain -- | FIXME: -- diff --git a/src/Chainweb/Chainweb/PeerResources.hs b/src/Chainweb/Chainweb/PeerResources.hs index e823821fc4..4416e325a0 100644 --- a/src/Chainweb/Chainweb/PeerResources.hs +++ b/src/Chainweb/Chainweb/PeerResources.hs @@ -102,6 +102,10 @@ data PeerResources logger = PeerResources , _peerLogger :: !logger } +instance HasChainwebVersion (PeerResources logger) where + _chainwebVersion = _chainwebVersion . _peerResDb + {-# INLINE _chainwebVersion #-} + makeLenses ''PeerResources -- | Allocate Peer resources. All P2P networks of a chainweb node share a single @@ -238,7 +242,8 @@ withPeerSocket conf act = withSocket port interface $ \(p, s) -> -- Run PeerDb for a Chainweb Version startPeerDb_ :: ChainwebVersion -> P2pConfiguration -> IO PeerDb -startPeerDb_ v = startPeerDb v nids +startPeerDb_ v c = + startPeerDb v nids (_p2pConfigPrivate c) (_p2pConfigKnownPeers c) where nids = HS.singleton CutNetwork `HS.union` HS.map MempoolNetwork cids @@ -246,7 +251,7 @@ startPeerDb_ v = startPeerDb v nids cids = chainIds v withPeerDb_ :: ChainwebVersion -> P2pConfiguration -> (PeerDb -> IO a) -> IO a -withPeerDb_ v conf = bracket (startPeerDb_ v conf) (stopPeerDb conf) +withPeerDb_ v conf = bracket (startPeerDb_ v conf) stopPeerDb -- -------------------------------------------------------------------------- -- -- Connection Manager diff --git a/src/Chainweb/CutDB.hs b/src/Chainweb/CutDB.hs index cf1a674a43..a63ae532e2 100644 --- a/src/Chainweb/CutDB.hs +++ b/src/Chainweb/CutDB.hs @@ -783,10 +783,12 @@ cutHashesToBlockHeaderMap conf logfun headerStore providers hs = hsid = _cutId hs go = do - -- FIXME: pass candidate payloads to payload provider - -- This can be done via the evaluation ctx - -- plds <- emptyTable - -- casInsertBatch plds $ HM.elems $ _cutHashesPayloads hs + -- We collect candidate payloads locally in a table and provide them to + -- the payload provider by inserting them in the evluation contexts for + -- the respective blocks + -- + plds <- emptyTable + tableInsertBatch plds $ HM.toList $ _cutHashesPayloads hs hdrs <- emptyTable casInsertBatch hdrs $ HM.elems $ _cutHashesHeaders hs @@ -796,7 +798,7 @@ cutHashesToBlockHeaderMap conf logfun headerStore providers hs = (headers :> missing) <- S.each (HM.toList $ _cutHashes hs) & S.map (fmap _bhwhHash) - & S.mapM (tryGetBlockHeader hdrs localPayload) + & S.mapM (tryGetBlockHeader hdrs plds localPayload) & S.partitionEithers & S.fold_ (\x (cid, h) -> HM.insert cid h x) mempty id & S.fold (\x (cid, h) -> HM.insert cid h x) mempty id @@ -819,10 +821,11 @@ cutHashesToBlockHeaderMap conf logfun headerStore providers hs = -- return (Left cv) -- e -> throwM e - tryGetBlockHeader hdrs localPayload cv@(cid, _) = do + tryGetBlockHeader hdrs plds localPayload cv@(cid, _) = do fmap Right $ forM cv $ getBlockHeader headerStore hdrs + plds providers localPayload cid diff --git a/src/Chainweb/CutDB/Sync.hs b/src/Chainweb/CutDB/Sync.hs index 18b2865a7e..68f37e17e4 100644 --- a/src/Chainweb/CutDB/Sync.hs +++ b/src/Chainweb/CutDB/Sync.hs @@ -84,11 +84,10 @@ catchupStepSize :: CutHeight catchupStepSize = 100 syncSession - :: ChainwebVersion - -> PeerInfo + :: PeerInfo -> CutDb -> P2pSession -syncSession v p db logg env pinf = do +syncSession p db logg env pinf = do race_ (S.mapM_ send $ S.map (cutToCutHashes (Just p)) $ cutStream db) (forever $ receive >> approximateThreadDelay 2000000 {- 2 seconds -}) @@ -102,6 +101,7 @@ syncSession v p db logg env pinf = do logg @T.Text Error "unexpectedly exited cut sync session" return False where + v = _chainwebVersion db cenv = CutClientEnv v env send c = do diff --git a/src/Chainweb/Miner/Coordinator.hs b/src/Chainweb/Miner/Coordinator.hs index 27423be02f..f63fc8d2c4 100644 --- a/src/Chainweb/Miner/Coordinator.hs +++ b/src/Chainweb/Miner/Coordinator.hs @@ -103,6 +103,9 @@ import Streaming.Prelude qualified as S import System.LogLevel (LogLevel(..)) import System.Random (randomRIO) +import Chainweb.Ranked +import Control.Concurrent.Async +import qualified Data.List as L -- -------------------------------------------------------------------------- -- -- Utils @@ -283,6 +286,68 @@ _newPayloadRankedHash :: NewPayload -> RankedBlockHash _newPayloadRankedHash p = RankedBlockHash (_newPayloadParentHeight p) (_newPayloadParentHash p) +-- -------------------------------------------------------------------------- -- +-- Logging Tools + +class Brief a where + brief :: a -> T.Text + +toTextShort :: HasTextRepresentation a => a -> T.Text +toTextShort = T.take 6 . toText + +instance Brief CutHeight where brief = toText . int @_ @Natural +instance Brief BlockHeight where brief = toText . int @_ @Natural +instance Brief CutId where brief = toTextShort +instance Brief ChainId where brief = toText +instance Brief BlockHash where brief = toTextShort +instance Brief BlockPayloadHash where brief = toTextShort +instance Brief BlockHeader where brief = brief . view blockHash +instance Brief ParentHeader where brief = brief . _parentHeader + +instance (Brief a, Brief b) => Brief (a,b) where + brief (a,b) = "(" <> brief a <> "," <> brief b <> ")" + +instance Brief a => Brief (Ranked a) where + brief (Ranked r h) = sshow r <> ":" <> brief h + +instance (Brief a, Brief b) => Brief (Either a b) where + brief (Left a) = "left:" <> brief a + brief (Right b) = "right:" <> brief b + +instance Brief a => Brief (Maybe a) where + brief (Just a) = brief a + brief Nothing = "nothing" + +instance Brief a => Brief [a] where + brief l = "[" <> (T.intercalate "," $ brief <$> l) <> "]" + +instance Brief BlockHashWithHeight where + brief a = brief (_bhwhHeight a) <> ":" <> brief (_bhwhHash a) + +instance Brief CutHashes where + brief c = T.intercalate ":" + [ brief (_cutHashesId c) + , brief (_cutHashesHeight c) + , brief (L.sort $ HM.toList $ _cutHashes c) + ] + +instance Brief Cut where + brief = brief . cutToCutHashes Nothing + +instance Brief SolvedWork where + brief (SolvedWork hdr) = "SolvedWork" <> ":" <> brief hdr + +instance Brief NewPayload where + brief np = brief (_newPayloadChainId np) + <> ":" <> brief (_newPayloadRankedHash np) + +instance Brief WorkState where + brief (WorkNotReady rh) = "WorkNotReady" <> ":" <> brief rh + brief (WorkStale rh _) = "WorkStale" <> ":" <> brief rh + brief (WorkBlocked rh _) = "WorkBlocked" <> ":" <> brief rh + brief (WorkReady rh _ _ _) = "WorkReady" <> ":" <> brief rh + brief (WorkSolved rh _ _) = "WorkSolved" <> ":" <> brief rh + -- -------------------------------------------------------------------------- -- -- WorkState Transition Function @@ -413,6 +478,18 @@ newMiningState c = do cids :: [ChainId] cids = HS.toList (chainIds v) +updateStateVar :: LogFunction -> ChainId -> TVar WorkState -> WorkState -> IO () +updateStateVar lf cid var new = do + + -- Logging. This can race, but we don't care + cur <- readTVarIO var + lf @T.Text Debug $ "update work state" + <> "; chain: " <> toText cid + <> "; cur: " <> brief cur + <> "; new: " <> brief new + + atomically $ writeTVar var new + -- TODO: consider storing the mining state more efficiently: -- -- Do not recompute cut extensions more often than needed. @@ -423,12 +500,13 @@ newMiningState c = do -- | Update work state for all chains for a new cut. -- updateForCut - :: (ChainValue BlockHash -> IO BlockHeader) + :: LogFunction + -> (ChainValue BlockHash -> IO BlockHeader) -> PayloadCaches -> MiningState -> Cut -> IO () -updateForCut hdb caches ms c = do +updateForCut lf hdb caches ms c = do t <- BlockCreationTime <$> getCurrentTimeIntegral forM_ (M.toList $ _miningState ms) $ \(cid, var) -> forChain t cid var (caches ^?! ix cid) @@ -436,38 +514,62 @@ updateForCut hdb caches ms c = do forChain t cid var cache = do ps <- workParents hdb c cid cur <- readTVarIO var + + -- logging + -- cs <- sizeIO cache + -- cl <- getLatestIO cache (_workRankedHash cur) + -- ch <- payloadHashesIO cache + -- lf @T.Text Debug $ "updateForCut for chain: " <> brief cid + -- <> "; cur: " <> brief cur + -- <> "; cut: " <> brief (c ^?! ixg cid) + -- <> "; parent: " <> brief (_workParent <$> ps) + -- <> "; cache size: " <> sshow cs + -- <> "; cache depth: " <> sshow (_payloadCacheDepth cache) + -- <> "; cache latest: " <> brief cl + -- <> "; cache hashes: " <> brief ch + case onParents t ps cur of Nothing -> return () Just !new -- Check whether the parent header is still the same | _workRankedHash new == _workRankedHash cur -> - atomically $ writeTVar var new + updateStateVar lf cid var new -- if the parent header changed, check if a payload is available - | otherwise -> getLatestIO cache (_workRankedHash cur) >>= \case - Nothing -> atomically $ writeTVar var new + | otherwise -> getLatestIO cache (_workRankedHash new) >>= \case + Nothing -> updateStateVar lf cid var new Just pld -> case onPayload t pld new of - Nothing -> return () - Just !newnew -> atomically $ writeTVar var newnew + Nothing -> updateStateVar lf cid var new + Just !newnew -> updateStateVar lf cid var newnew -updateForPayload :: MiningState -> NewPayload -> IO () -updateForPayload ms pld = do +updateForPayload :: LogFunction -> MiningState -> NewPayload -> IO () +updateForPayload lf ms pld = do t <- BlockCreationTime <$> getCurrentTimeIntegral cur <- readTVarIO var + + -- lf @T.Text Debug $ "updateForPayload on chain: " <> toText cid + -- <> "; cur: " <> brief cur + -- <> "; new payload: " <> brief pld + case onPayload t pld cur of Nothing -> return () - Just !new -> atomically $ writeTVar var new + Just !new -> updateStateVar lf cid var new where cid = _chainId pld var = ms ^?! ixg cid -updateForSolved :: MiningState -> SolvedWork -> IO () -updateForSolved ms sw = do +updateForSolved :: LogFunction -> MiningState -> SolvedWork -> IO () +updateForSolved lf ms sw = do cur <- readTVarIO var + + -- lf @T.Text Debug $ "updateForSolved on chain: " <> toText cid + -- <> "; cur: " <> brief cur + -- <> "; sw: " <> brief sw + case onSolved sw cur of Nothing -> return () - Just !new -> atomically $ writeTVar var new + Just !new -> updateStateVar lf cid var new where cid = _chainId sw var = ms ^?! ixg cid @@ -477,7 +579,7 @@ awaitAnyReady s = msum $ awaitWorkReady <$> _miningState s where awaitWorkReady :: TVar WorkState -> STM WorkHeader awaitWorkReady var = readTVar var >>= \case - WorkReady _ _ _ w -> return $ w + WorkReady _ _ _ w -> return w _ -> retry -- -------------------------------------------------------------------------- -- @@ -530,14 +632,15 @@ runCoordination . Logger l => MiningCoordination l -> IO () -runCoordination mr = - runForever lf "miningCoordination" $ - eventStream cdb caches - & S.mapM_ - -- There is a race with solved events. Does it matter? - -- We could synchronize those by delivering those via an - -- STM variable, too. - (either (updateForCut f caches state) (updateForPayload state)) +runCoordination mr = do + + -- Initialize Work State for provider caches, without this isolated networks + -- fail to start mining. + initializeState + + concurrentlies_ + $ updateWork + : (updateCache <$> HM.toList caches) where lf :: LogFunction lf = logFunction $ _coordLogger mr @@ -553,8 +656,44 @@ runCoordination mr = cdb = _coordCutDb mr + providers = view cutDbPayloadProviders $ _coordCutDb mr + + -- Update the payload cache with the latest payloads from the the provider + -- + updateCache (cid, cache) = runForever lf label $ do + withPayloadProvider providers cid $ \provider -> do + payloadStream provider + & S.chain (\_ -> lf @T.Text Info $ "update cache on chain " <> toText cid) + & S.mapM_ (insertIO cache) + where + label = "miningCoordination.updateCache." <> toText cid + + -- Update the work state + -- + updateWork = runForever lf "miningCoordination" $ + eventStream cdb caches + & S.chain (\e -> lf @T.Text Info $ "coordination event: " <> brief e) + & S.mapM_ + -- There is a race with solved events. Does it matter? + -- We could synchronize those by delivering those via an + -- STM variable, too. + (either (updateForCut lf f caches state) (updateForPayload lf state)) + + -- FIXME: this is probably more aggressivel than needed + initializeState = do + lf @T.Text Info $ "initialize mining state" + forConcurrently_ (HM.toList caches) $ \(cid, cache) -> do + lf @T.Text Info $ "initialize mining state for chain " <> brief cid + pld <- withPayloadProvider providers cid latestPayloadIO + insertIO cache pld + curRh <- _workRankedHash <$> readTVarIO (_miningState state ^?! ix cid) + l <- awaitLatestIO cache curRh + updateForPayload lf state l + curCut <- _cut $ cdb + updateForCut lf f caches state curCut + -- | Note that this stream is lossy. It always delivers the latest available --- item and any previous items that have not been consumed. +-- item and skips over any previous items that have not been consumed. -- -- We want this behavior, because we want to alway operate on the latest -- available cut and payload. @@ -644,6 +783,8 @@ randomWork logFun state = do go [] = do + logFun @T.Text Warn $ "randomWork: no work is ready. Awaiting work" + -- We shall check for the following conditions: -- -- 1. No chain is ready and we haven't received neither new cuts nor @@ -679,9 +820,12 @@ randomWork logFun state = do go ((cid, var):t) = readTVarIO var >>= \case WorkReady _ _ _ wh -> do - logFun @T.Text Debug $ "newWork: picked chain " <> toText cid + logFun @T.Text Debug $ "randomWork: picked chain " <> brief cid return wh - _ -> go t + e -> do + logFun @T.Text Info $ "randomWork: not ready for " <> brief cid + <> "; state: " <> brief e + go t awaitTimeout var = do @@ -745,11 +889,15 @@ solve solve mr solved@(SolvedWork hdr) = lookupIO cache cacheKey (view blockPayloadHash hdr) >>= \case - Nothing -> throwM NoAsscociatedPayload - -- FIXME Why not log this directly here? + Nothing -> do + ch <- payloadHashesIO cache + lf Error $ "solve: no payload for " <> brief hdr + <> "; cache key: " <> brief cacheKey + <> "; cache content: " <> brief ch + throwM NoAsscociatedPayload + -- FIXME Do we really need to restart the coordinator? Just np -> do - c <- _cut cdb now <- getCurrentTimeIntegral let pld = _newPayloadEncodedPayloadData np @@ -759,7 +907,7 @@ solve mr solved@(SolvedWork hdr) = -- Publish CutHashes to CutDb and log success Right (bh, Just ch) -> do - updateForSolved (_coordState mr) solved + updateForSolved lf (_coordState mr) solved publish cdb ch logMinedBlock lf bh np @@ -774,9 +922,12 @@ solve mr solved@(SolvedWork hdr) = lf Info $ orphandMsg now p bh msg throwM e where + cid = _chainId solved cdb = _coordCutDb mr - cache = (_coordPayloadCache mr) HM.! _chainId solved - cacheKey = _rankedBlockHash hdr + wdb = fmap _chainValueValue . casLookupM (view cutDbWebBlockHeaderDb (_coordCutDb mr)) + caches = _coordPayloadCache mr + cache = caches HM.! cid + cacheKey = RankedBlockHash (view blockHeight hdr - 1) (view blockParent hdr) lf :: LogFunction lf = logFunction $ _coordLogger mr diff --git a/src/Chainweb/Miner/Miners.hs b/src/Chainweb/Miner/Miners.hs index 42d1e1bbd6..c631ad2bd2 100644 --- a/src/Chainweb/Miner/Miners.hs +++ b/src/Chainweb/Miner/Miners.hs @@ -70,7 +70,9 @@ import Chainweb.Utils import Chainweb.Utils.Serialization import Chainweb.Version -import Data.LogMessage (LogFunction) +import Data.LogMessage (LogFunction, LogFunctionText) +import System.LogLevel +import Control.Concurrent.STM -------------------------------------------------------------------------------- -- Local Mining @@ -134,18 +136,37 @@ mempoolNoopMiner lf chainRes = -- localPOW :: Logger logger - => LogFunction + => LogFunctionText -> MiningCoordination logger -> CutDb -> IO () localPOW lf coord cdb = runForever lf "Chainweb.Miner.Miners.localPOW" $ do c <- _cut cdb + lf Debug "request new work for localPOW miner" wh <- work coord - race (awaitNewCutByChainId cdb (_workHeaderChainId wh) c) (go wh) >>= \case - Left _ -> return () + let cid = _workHeaderChainId wh + lf Debug $ "run localPOW miner on chain " <> toText cid + race (awaitNewCutByChainId cdb cid c) (go wh) >>= \case + Left _ -> do + lf Debug "abondond work due to chain update" + return () Right new -> do + lf Debug $ "solved work on chain " <> toText cid solve coord new - void $ awaitNewCut cdb c + + -- There is a potential race here, if the solved block got orphaned. + -- If work isn't updated quickly enough, it can happen that the + -- miner uses an old header. We resolve that by awaiting that the + -- chain is at least as high as the solved work. + -- This can still dead-lock if for some reason the solved work is + -- invalid. + awaitHeight (_chainId new) (view solvedWorkHeight new) where go :: WorkHeader -> IO SolvedWork go = mine @Blake2s_256 (Nonce 0) + + awaitHeight cid h = atomically $ do + c <- _cutStm cdb + let h' = view blockHeight $ c ^?! ixg cid + guard (h <= h') + diff --git a/src/Chainweb/Miner/PayloadCache.hs b/src/Chainweb/Miner/PayloadCache.hs index c3a4d33767..75cf934258 100644 --- a/src/Chainweb/Miner/PayloadCache.hs +++ b/src/Chainweb/Miner/PayloadCache.hs @@ -112,6 +112,14 @@ sizeSTM pc = fromIntegral . M.size <$> readTVar (_payloadCacheMap pc) sizeIO :: PayloadCache -> IO Natural sizeIO pc = fromIntegral . M.size <$> readTVarIO (_payloadCacheMap pc) +payloadRankedHashesSTM :: PayloadCache -> STM [BlockPayloadHash] +payloadRankedHashesSTM pc = fmap _newPayloadBlockPayloadHash . M.elems + <$> readTVar (_payloadCacheMap pc) + +payloadRankedHashesIO :: PayloadCache -> IO [BlockPayloadHash] +payloadRankedHashesIO pc = fmap _newPayloadBlockPayloadHash . M.elems + <$> readTVarIO (_payloadCacheMap pc) + payloadHashesSTM :: PayloadCache -> STM [BlockPayloadHash] payloadHashesSTM pc = fmap _newPayloadBlockPayloadHash . M.elems <$> readTVar (_payloadCacheMap pc) @@ -122,6 +130,8 @@ payloadHashesIO pc = fmap _newPayloadBlockPayloadHash . M.elems -- | Get the most recent payload for the given parent hash -- +-- FIXME: This can return a later parent header. Is that acceptable? +-- getLatestSTM :: PayloadCache -> RankedBlockHash @@ -131,6 +141,8 @@ getLatestSTM pc rh = -- | Get the most recent payload for the given parent hash -- +-- FIXME: This can return a later parent header. Is that acceptable? +-- getLatestIO :: PayloadCache -> RankedBlockHash @@ -255,7 +267,9 @@ prune -> BlockHeight -> M.Map (RankedBlockHash, Int) v -> M.Map (RankedBlockHash, Int) v -prune d h m = snd $ M.split pivot m +prune d h m + | h < fromIntegral d = m + | otherwise = snd $ M.split pivot m where pivot = (RankedBlockHash (h - fromIntegral d) nullBlockHash, minBound) diff --git a/src/Chainweb/PayloadProvider.hs b/src/Chainweb/PayloadProvider.hs index 6e6fe0272d..8416a7742e 100644 --- a/src/Chainweb/PayloadProvider.hs +++ b/src/Chainweb/PayloadProvider.hs @@ -42,7 +42,7 @@ module Chainweb.PayloadProvider , SomePayloadProvider(..) -- * Payload Providers for all Chains -, PayloadProviders +, PayloadProviders(..) , payloadProviders , withPayloadProvider ) where @@ -198,7 +198,7 @@ data EvaluationCtx = EvaluationCtx -- -- The BlockPayloadHash is first computed when the respective payload is -- created for mining and before it is included in a block. - , _evaluationCtxPayloaddData :: !(Maybe EncodedPayloadData) + , _evaluationCtxPayloadData :: !(Maybe EncodedPayloadData) -- ^ Optional external payload data. This may be -- the complete, self contained block payload or it may just contain -- complementary data that aids with the validation. @@ -241,14 +241,15 @@ data NewBlockCtx = NewBlockCtx blockHeaderToEvaluationCtx :: ParentHeader -> BlockPayloadHash + -> Maybe EncodedPayloadData -> EvaluationCtx -blockHeaderToEvaluationCtx (ParentHeader ph) pld = EvaluationCtx +blockHeaderToEvaluationCtx (ParentHeader ph) pld pldData = EvaluationCtx { _evaluationCtxParentCreationTime = view blockCreationTime ph , _evaluationCtxParentHash = view blockHash ph , _evaluationCtxParentHeight = parentHeight , _evaluationCtxMinerReward = blockMinerReward v height , _evaluationCtxPayloadHash = pld - , _evaluationCtxPayloaddData = Nothing + , _evaluationCtxPayloadData = pldData } where parentHeight = view blockHeight ph @@ -599,12 +600,15 @@ instance IxedGet PayloadProviders where withPayloadProvider :: HasCallStack + => HasChainId c => PayloadProviders - -> ChainId + -> c -> (forall p . PayloadProvider p => p -> a) -> a -withPayloadProvider (PayloadProviders ps) cid f = case HM.lookup cid ps of +withPayloadProvider (PayloadProviders ps) c f = case HM.lookup cid ps of Just (SomePayloadProvider p) -> f p Nothing -> error $ "PayloadProviders: unknown ChainId " <> sshow cid <> ". This is a bug" + where + cid = _chainId c diff --git a/src/Chainweb/PayloadProvider/Minimal.hs b/src/Chainweb/PayloadProvider/Minimal.hs index e4f9b3a6a3..8bb70ef63b 100644 --- a/src/Chainweb/PayloadProvider/Minimal.hs +++ b/src/Chainweb/PayloadProvider/Minimal.hs @@ -78,6 +78,7 @@ module Chainweb.PayloadProvider.Minimal , pMinimalProviderConfig , MinimalPayloadProvider , minimalPayloadDb +, minimalPayloadQueue , newMinimalPayloadProvider ) where @@ -106,11 +107,15 @@ import Control.Monad.Catch import Control.Monad.Except (throwError) import Data.ByteString qualified as B import Data.HashSet qualified as HS +import Data.PQueue (PQueue) +import Data.Text qualified as T import GHC.Generics (Generic) import Network.HTTP.Client qualified as HTTP import Numeric.Natural import P2P.TaskQueue import Servant.Client +import System.LogLevel +import Data.LogMessage (LogFunction, LogFunctionText) -- -------------------------------------------------------------------------- -- @@ -190,12 +195,15 @@ data MinimalPayloadProvider = MinimalPayloadProvider -- ^ FIXME: should this be moved into the Payload Store? -- -- For now we just prune after each successful syncToBlock. + , _minimalLogger :: LogFunction } - deriving (Generic) minimalPayloadDb :: Getter MinimalPayloadProvider (PDB.PayloadDb RocksDbTable) minimalPayloadDb = to (_payloadStoreTable . _minimalPayloadStore) +minimalPayloadQueue :: Getter MinimalPayloadProvider (PQueue (Task ClientEnv Payload)) +minimalPayloadQueue = to (_payloadStoreQueue . _minimalPayloadStore) + newMinimalPayloadProvider :: Logger logger => HasChainwebVersion v @@ -212,9 +220,10 @@ newMinimalPayloadProvider logger v c rdb mgr conf error "Chainweb.PayloadProvider.Minimal.PayloadDB.configuration: chain does not use minimal provider" | otherwise = do pdb <- PDB.initPayloadDb $ PDB.configuration v c rdb - store <- newPayloadStore mgr (logFunction logger) pdb payloadClient + store <- newPayloadStore mgr (logFunction pldStoreLogger) pdb payloadClient var <- newEmptyTMVarIO candidates <- emptyTable + logFunctionText providerLogger Info "minimal payload provider started" return MinimalPayloadProvider { _minimalChainwebVersion = _chainwebVersion v , _minimalChainId = _chainId c @@ -223,7 +232,12 @@ newMinimalPayloadProvider logger v c rdb mgr conf , _minimalMinerInfo = _mpcRedeemAccount conf , _minimalPayloadStore = store , _minimalCandidatePayloads = candidates + , _minimalLogger = logFunction providerLogger } + where + providerLogger = setComponent "payload-provider" + $ addLabel ("provider", "minimal") logger + pldStoreLogger = addLabel ("sub-component", "payloadStore") providerLogger payloadClient :: ChainwebVersion @@ -309,7 +323,7 @@ validatePayload p pld ctx = do checkEq PayloadInvalidHash (_evaluationCtxPayloadHash ctx) (view payloadHash pld) - case _evaluationCtxPayloaddData ctx of + case _evaluationCtxPayloadData ctx of Nothing -> return () Just x -> checkEq PayloadInvalidPayloadData x (encodedPayloadData pld) where @@ -329,6 +343,9 @@ encodedPayloadData = EncodedPayloadData . runPutS . encodePayload encodedPayloadDataSize :: EncodedPayloadData -> Natural encodedPayloadDataSize (EncodedPayloadData bs) = int $ B.length bs +decodePayloadData :: MonadThrow m => EncodedPayloadData -> m Payload +decodePayloadData (EncodedPayloadData bs) = runGetS decodePayload bs + -- -------------------------------------------------------------------------- -- -- Payload Provider API @@ -338,7 +355,7 @@ instance PayloadProvider MinimalPayloadProvider where latestPayloadSTM = minimalLatestPayloadStm latestPayloadIO = minimalLatestPayloadIO --- | Fetch a payload for a evaluation context and insert it into the candidate +-- | Fetch a payload for an evaluation context and insert it into the candidate -- table. -- getPayloadForContext @@ -347,6 +364,7 @@ getPayloadForContext -> EvaluationCtx -> IO Payload getPayloadForContext p h ctx = do + insertPayloadData (_evaluationCtxPayloadData ctx) pld <- Rest.getPayload (_minimalPayloadStore p) (_minimalCandidatePayloads p) @@ -357,21 +375,31 @@ getPayloadForContext p h ctx = do (_evaluationCtxRankedPayloadHash ctx) casInsert (_minimalCandidatePayloads p) pld return pld + where + insertPayloadData Nothing = return () + insertPayloadData (Just epld) = case decodePayloadData epld of + Right pld -> casInsert (_minimalCandidatePayloads p) pld + Left e -> do + lf Warn $ "failed to decode encoded payload from evaluation ctx: " <> sshow e + + lf :: LogFunctionText + lf = _minimalLogger p -- | Concurrently fetch all payloads in an evaluation context and insert them -- into the candidate table. -- -- This version blocks until all payloads are fetched (or a timeout occurs). -- --- Should the exposed a version that is fire-and-forget? +-- Should we also expose a version that is fire-and-forget? -- minimalPrefetchPayloads :: MinimalPayloadProvider -> Maybe Hints -> ForkInfo -> IO () -minimalPrefetchPayloads p h = - mapConcurrently_ (getPayloadForContext p h) . _forkInfoTrace +minimalPrefetchPayloads p h i = do + logg p Info "prefetch payloads" + mapConcurrently_ (getPayloadForContext p h) $ _forkInfoTrace i -- | -- @@ -400,18 +428,24 @@ minimalSyncToBlock -> ForkInfo -> IO ConsensusState minimalSyncToBlock p h i = do + logg p Info "syncToBlock called" validatePayloads p h i - -- TODO Check whether block production is requested + -- Produce new block case _forkInfoNewBlockCtx i of Nothing -> return () - Just ctx -> atomically - $ writeTMVar (_minimalPayloadVar p) - $ makeNewPayload p latestState ctx + Just ctx -> do + logg p Info $ "create new payload for sync state: " <> sshow latestState + atomically + $ writeTMVar (_minimalPayloadVar p) + $ makeNewPayload p latestState ctx return $ _forkInfoTargetState i where latestState = _consensusStateLatest $ _forkInfoTargetState i +logg :: MinimalPayloadProvider -> LogLevel -> T.Text -> IO () +logg p l t = _minimalLogger p l t + makeNewPayload :: MinimalPayloadProvider -> SyncState diff --git a/src/Chainweb/PayloadProvider/P2P.hs b/src/Chainweb/PayloadProvider/P2P.hs index 05c2a77579..eba9f67087 100644 --- a/src/Chainweb/PayloadProvider/P2P.hs +++ b/src/Chainweb/PayloadProvider/P2P.hs @@ -34,7 +34,7 @@ import Chainweb.Storage.Table import Chainweb.Utils import Chainweb.Version import Control.Monad.Catch -import Data.Aeson (ToJSON, object, (.=)) +import Data.Aeson (object, (.=)) import Data.Functor import Data.LogMessage import Data.PQueue @@ -143,6 +143,12 @@ instance -- payload store. (We can offer more complex payload providers to provide there -- own version if desired.) -- +-- FIXME: +-- +-- The queue needs to be hooked up to a p2p node. Where is that done? Given +-- that a single p2p node can serve many payload stores, the queue should +-- probably be passed as a parameter. +-- newPayloadStore :: Table tbl RankedBlockPayloadHash a => HTTP.Manager @@ -219,7 +225,7 @@ getPayload s candidateStore priority maybeOrigin v cid payloadHash = do t <- queryPayloadTask payloadHash pQueueInsert queue t awaitTask t - (Just !x) -> return x + Just !x -> return x where mgr = _payloadStoreMgr s tbl = _payloadStoreTable s diff --git a/src/Chainweb/RestAPI.hs b/src/Chainweb/RestAPI.hs index 1f30deee36..5cd65d51b1 100644 --- a/src/Chainweb/RestAPI.hs +++ b/src/Chainweb/RestAPI.hs @@ -49,14 +49,6 @@ module Chainweb.RestAPI , someServiceApiServer , serviceApiApplication , serveServiceApiSocket - --- * Chainweb API Client - --- ** BlockHeaderDb API Client -, module Chainweb.BlockHeaderDB.RestAPI.Client - --- ** P2P API Client -, module P2P.Node.RestAPI.Client ) where import Control.Monad (guard) @@ -80,11 +72,10 @@ import System.Clock import Chainweb.Backup import Chainweb.BlockHeaderDB -import Chainweb.BlockHeaderDB.RestAPI.Client import Chainweb.BlockHeaderDB.RestAPI.Server import Chainweb.ChainId import Chainweb.Chainweb.Configuration -import Chainweb.Chainweb.MinerResources (MiningCoordination) +import Chainweb.Miner.Coordinator (MiningCoordination) import Chainweb.CutDB import Chainweb.CutDB.RestAPI.Server import Chainweb.HostAddress @@ -92,8 +83,7 @@ import Chainweb.Logger (Logger) import Chainweb.Mempool.Mempool (MempoolBackend) import qualified Chainweb.Mempool.RestAPI.Server as Mempool import qualified Chainweb.Miner.RestAPI.Server as Mining -import qualified Chainweb.Pact.RestAPI.Server as PactAPI -import Chainweb.Payload.PayloadStore +-- import qualified Chainweb.Pact.RestAPI.Server as PactAPI import Chainweb.Payload.RestAPI import Chainweb.RestAPI.Backup import Chainweb.RestAPI.Config @@ -108,7 +98,6 @@ import Chainweb.Version import Network.X509.SelfSigned import P2P.Node.PeerDB -import P2P.Node.RestAPI.Client import P2P.Node.RestAPI.Server -- -------------------------------------------------------------------------- -- diff --git a/src/Chainweb/Sync/WebBlockHeaderStore.hs b/src/Chainweb/Sync/WebBlockHeaderStore.hs index 28b6aeea52..cad941512b 100644 --- a/src/Chainweb/Sync/WebBlockHeaderStore.hs +++ b/src/Chainweb/Sync/WebBlockHeaderStore.hs @@ -31,6 +31,7 @@ module Chainweb.Sync.WebBlockHeaderStore ( WebBlockHeaderStore(..) , newWebBlockHeaderStore , getBlockHeader +, forkInfoForHeader -- * , WebBlockPayloadStore(..) @@ -80,6 +81,7 @@ import P2P.TaskQueue import Servant.Client import System.LogLevel import Utils.Logging.Trace +import GHC.Stack -- -------------------------------------------------------------------------- -- -- Response Timeout Constants @@ -297,7 +299,11 @@ memoInsert cas m k a = tableLookup cas k >>= \case -- - safe: 6 times of the graph diameter block heights, ~ 9 min -- - final: 4 epochs, 120 * 4 block heights, ~ 4 hours -- -consensusState :: WebBlockHeaderDb -> BlockHeader -> IO ConsensusState +consensusState + :: HasCallStack + => WebBlockHeaderDb + -> BlockHeader + -> IO ConsensusState consensusState wdb hdr = do db <- getWebBlockHeaderDb wdb hdr safeHdr <- fromJuste <$> seekAncestor db hdr safeHeight @@ -309,8 +315,8 @@ consensusState wdb hdr = do } where WindowWidth w = _versionWindow (_chainwebVersion hdr) - finalHeight = max 0 (int height - w * 4) - safeHeight = max 0 (int height - 6 * diam) + finalHeight = int @Int @_ $ max 0 (int height - int w * 4) + safeHeight = int @Int @_ $ max 0 (int height - 6 * int diam) height = view blockHeight hdr diam = diameterAt hdr height @@ -322,15 +328,24 @@ consensusState wdb hdr = do forkInfoForHeader :: WebBlockHeaderDb -> BlockHeader + -> Maybe EncodedPayloadData -> IO ForkInfo -forkInfoForHeader wdb hdr = do - phdr <- ParentHeader <$> lookupParentHeader wdb hdr - state <- consensusState wdb hdr - return $ ForkInfo - { _forkInfoTrace = [blockHeaderToEvaluationCtx phdr pld] - , _forkInfoTargetState = state - , _forkInfoNewBlockCtx = Just nbctx - } +forkInfoForHeader wdb hdr pldData + | isGenesisBlockHeader hdr = do + state <- consensusState wdb hdr + return $ ForkInfo + { _forkInfoTrace = [] + , _forkInfoTargetState = state + , _forkInfoNewBlockCtx = Just nbctx + } + | otherwise = do + phdr <- ParentHeader <$> lookupParentHeader wdb hdr + state <- consensusState wdb hdr + return $ ForkInfo + { _forkInfoTrace = [blockHeaderToEvaluationCtx phdr pld pldData] + , _forkInfoTargetState = state + , _forkInfoNewBlockCtx = Just nbctx + } where pld = view blockPayloadHash hdr nbctx = NewBlockCtx @@ -359,10 +374,12 @@ instance Exception GetBlockHeaderFailure getBlockHeaderInternal :: BlockHeaderCas candidateHeaderCas -- ^ CandidateHeaderCas is a content addressed store for BlockHeaders + => ReadableTable candidatePldTbl BlockPayloadHash EncodedPayloadData => WebBlockHeaderStore -- ^ Block Header Store for all Chains -> candidateHeaderCas -- ^ Ephemeral store for block headers under consideration + -> candidatePldTbl -> PayloadProviders -> Maybe (BlockPayloadHash, EncodedPayloadOutputs) -- ^ Payload and Header data for the block, in case that it is @@ -378,7 +395,8 @@ getBlockHeaderInternal getBlockHeaderInternal headerStore candidateHeaderCas - payloadProviders + candidatePldTbl + providers localPayload priority maybeOrigin @@ -432,7 +450,8 @@ getBlockHeaderInternal getBlockHeaderInternal headerStore candidateHeaderCas - payloadProviders + candidatePldTbl + providers localPayload priority maybeOrigin' @@ -449,7 +468,8 @@ getBlockHeaderInternal void $ getBlockHeaderInternal headerStore candidateHeaderCas - payloadProviders + candidatePldTbl + providers localPayload priority maybeOrigin' @@ -458,9 +478,10 @@ getBlockHeaderInternal validateInductiveChainM (tableLookup chainDb) header -- Get the Payload Provider and - withPayloadProvider payloadProviders cid $ \provider -> do + withPayloadProvider providers cid $ \provider -> do let hints = Hints <$> maybeOrigin' - finfo <- forkInfoForHeader wdb header + pld <- tableLookup candidatePldTbl (view blockPayloadHash header) + finfo <- forkInfoForHeader wdb header pld runConcurrently -- instruct the payload provider to fetch payload data and prepare @@ -660,8 +681,10 @@ newWebPayloadStore mgr pact payloadDb logfun = do getBlockHeader :: BlockHeaderCas candidateHeaderCas + => ReadableTable candidatePldTbl BlockPayloadHash EncodedPayloadData => WebBlockHeaderStore -> candidateHeaderCas + -> candidatePldTbl -> PayloadProviders -> Maybe (BlockPayloadHash, EncodedPayloadOutputs) -> ChainId @@ -669,13 +692,14 @@ getBlockHeader -> Maybe PeerInfo -> BlockHash -> IO BlockHeader -getBlockHeader headerStore candidateHeaderCas providers localPayload cid priority maybeOrigin h +getBlockHeader headerStore candidateHeaderCas candidatePldTbl providers localPayload cid priority maybeOrigin h = ((\(ChainValue _ b) -> b) <$> go) `catch` \(TaskFailed _es) -> throwM $ TreeDbKeyNotFound @BlockHeaderDb h where go = getBlockHeaderInternal headerStore candidateHeaderCas + candidatePldTbl providers localPayload priority diff --git a/src/P2P/Node.hs b/src/P2P/Node.hs index 078c5d4200..48830c90f5 100644 --- a/src/P2P/Node.hs +++ b/src/P2P/Node.hs @@ -11,6 +11,7 @@ {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} -- | -- Module: P2P.Node @@ -40,11 +41,17 @@ module P2P.Node , withPeerDb -- * P2P Node +, P2pNodeParameters(..) +, P2pNode , p2pCreateNode , p2pStartNode +, p2pStartNodeInactive , p2pStopNode +, p2pRunNode , guardPeerDb , getNewPeerManager +, setActive +, setInactive -- * Logging and Monitoring @@ -123,7 +130,6 @@ import Data.LogMessage import Network.X509.SelfSigned -import P2P.Node.Configuration import P2P.Node.PeerDB import P2P.Node.RestAPI.Client import P2P.Peer @@ -205,7 +211,6 @@ makeLenses ''P2pSessionInfo -- data P2pNode = P2pNode { _p2pNodeNetworkId :: !NetworkId - , _p2pNodeChainwebVersion :: !ChainwebVersion , _p2pNodePeerInfo :: !PeerInfo , _p2pNodePeerDb :: !PeerDb , _p2pNodeSessions :: !(TVar (M.Map PeerInfo (P2pSessionInfo, Async (Maybe Bool)))) @@ -220,8 +225,15 @@ data P2pNode = P2pNode , _p2pNodeDoPeerSync :: !Bool -- ^ Synchronize peers at start of each session. Note, that this is -- expensive. + , _p2pNodePrivate :: !Bool + -- ^ Whether this node is private + , _p2pNodeMaxSessionCount :: !Natural + , _p2pNodeSessionTimeout :: !Seconds } +_p2pNodeChainwebVersion :: P2pNode -> ChainwebVersion +_p2pNodeChainwebVersion = _chainwebVersion . _p2pNodePeerDb + instance HasChainwebVersion P2pNode where _chainwebVersion = _p2pNodeChainwebVersion @@ -312,6 +324,13 @@ nodeGeometric :: HasCallStack => P2pNode -> Double -> IO Int nodeGeometric node = nodeRandom node . geometric {-# INLINE nodeGeometric #-} +-- | Set a node active, causing it to initiating new sessions. +-- +setActive :: P2pNode -> STM () +setActive node = writeTVar (_p2pNodeActive node) True + +-- | Set a node inactive. An inactive node does not initiate any new sessions. +-- setInactive :: P2pNode -> STM () setInactive node = writeTVar (_p2pNodeActive node) False @@ -506,10 +525,9 @@ syncFromPeer node info = do -- @O(_p2pConfigActivePeerCount conf)@ -- findNextPeer - :: P2pConfiguration - -> P2pNode + :: P2pNode -> IO PeerEntry -findNextPeer conf node = do +findNextPeer node = do candidates <- awaitCandidates -- random circular shift of a set @@ -528,7 +546,6 @@ findNextPeer conf node = do -- this ix expensive but lazy and only forced if p0 is empty let p2 = L.groupBy ((==) `on` _peerEntrySuccessiveFailures) p1 - -- Choose the category to pick from -- -- In 95% of all cases are peer is selected from the highest priority, if possible. @@ -575,7 +592,7 @@ findNextPeer conf node = do -- Retry if there are more active sessions than the maximum number -- of sessions -- - check (int sessionCount < _p2pConfigMaxSessionCount conf) + check (int sessionCount < _p2pNodeMaxSessionCount node) let addrs = S.fromList (_peerAddr <$> M.keys sessions) @@ -612,9 +629,9 @@ findNextPeer conf node = do -- | This can loop forever if there are no peers available for the respective -- network. -- -newSession :: P2pConfiguration -> P2pNode -> IO () -newSession conf node = do - newPeer <- findNextPeer conf node +newSession :: P2pNode -> IO () +newSession node = do + newPeer <- findNextPeer node let newPeerInfo = _peerEntryInfo newPeer logg node Debug $ "Selected new peer " <> encodeToText newPeerInfo <> ", " @@ -629,7 +646,7 @@ newSession conf node = do -- FIXME there are better ways to prevent the node from spinning -- if no suitable (non-failing node) is available. -- cf. GitHub issue #117 - newSession conf node + newSession node True -> do logg node Debug $ "Connected to new peer " <> showInfo newPeerInfo let env = peerClientEnv node newPeerInfo @@ -647,11 +664,11 @@ newSession conf node = do logg node Debug $ "Started peer session " <> showSessionId newPeerInfo newSes loggFun node Info $ JsonLog info where - TimeSpan timeoutMs = secondsToTimeSpan @Double (_p2pConfigSessionTimeout conf) + TimeSpan timeoutMs = secondsToTimeSpan @Double (_p2pNodeSessionTimeout node) peerDb = _p2pNodePeerDb node syncFromPeer_ pinfo - | _p2pConfigPrivate conf = return True + | _p2pNodePrivate node = return True | _p2pNodeDoPeerSync node = syncFromPeer node pinfo | otherwise = return True @@ -739,20 +756,23 @@ waitAnySession node = do startPeerDb :: ChainwebVersion -> HS.HashSet NetworkId - -> P2pConfiguration + -> Bool + -- ^ Whether this node is private + -> [PeerInfo] + -- ^ Set of statically known peers. -> IO PeerDb -startPeerDb v nids conf = do +startPeerDb v nids isPrivate knownPeers = do !peerDb <- newEmptyPeerDb v forM_ nids $ \nid -> - peerDbInsertPeerInfoList_ True nid (_p2pConfigKnownPeers conf) peerDb - return $ if _p2pConfigPrivate conf + peerDbInsertPeerInfoList_ True nid knownPeers peerDb + return $ if isPrivate then makePeerDbPrivate peerDb else peerDb -- | Stop a 'PeerDb', possibly persisting the db to a file. -- -stopPeerDb :: P2pConfiguration -> PeerDb -> IO () -stopPeerDb _ _ = return () +stopPeerDb :: PeerDb -> IO () +stopPeerDb _ = return () {-# INLINE stopPeerDb #-} -- | Run a computation with a PeerDb @@ -760,57 +780,82 @@ stopPeerDb _ _ = return () withPeerDb :: ChainwebVersion -> HS.HashSet NetworkId - -> P2pConfiguration + -> Bool + -- ^ Whether this node is private + -> [PeerInfo] + -- ^ Set of statically known peers -> (PeerDb -> IO a) -> IO a -withPeerDb v nids conf = bracket (startPeerDb v nids conf) (stopPeerDb conf) +withPeerDb v nids isPrivate knownPeers = + bracket (startPeerDb v nids isPrivate knownPeers) stopPeerDb -- -------------------------------------------------------------------------- -- -- Create -p2pCreateNode - :: ChainwebVersion - -> NetworkId - -> Peer - -> LogFunction - -> PeerDb - -> HTTP.Manager - -> Bool - -> P2pSession - -> IO P2pNode -p2pCreateNode cv nid peer logfun db mgr doPeerSync session = do +data P2pNodeParameters = P2pNodeParameters + { _p2pNodeParamsNetworkId :: !NetworkId + , _p2pNodeParamsMyPeerInfo :: !PeerInfo + , _p2pNodeParamsLogFunction :: !LogFunction + , _p2pNodeParamsPeerDb :: !PeerDb + , _p2pNodeParamsManager :: !HTTP.Manager + , _p2pNodeParamsDoPeerSync :: !Bool + -- ^ whether to synchronize peers on session startup + , _p2pNodeParamsIsPrivate :: !Bool + -- ^ whether the node is private + , _p2pNodeParamsMaxSessionCount :: !Natural + -- ^ Maximum Session count + , _p2pNodeParamsSessionTimeout :: !Seconds + -- ^ Session timeout + , _p2pNodeParamsSession :: !P2pSession + } + +instance HasChainwebVersion P2pNodeParameters where + _chainwebVersion = _chainwebVersion . _p2pNodeParamsPeerDb + +p2pCreateNode :: P2pNodeParameters -> IO P2pNode +p2pCreateNode params = do -- intialize P2P State sessionsVar <- newTVarIO mempty statsVar <- newTVarIO emptyP2pNodeStats rngVar <- newIORef =<< R.newStdGen - activeVar <- newTVarIO True - let !s = P2pNode - { _p2pNodeNetworkId = nid - , _p2pNodeChainwebVersion = cv - , _p2pNodePeerInfo = myInfo - , _p2pNodePeerDb = db - , _p2pNodeSessions = sessionsVar - , _p2pNodeManager = mgr - , _p2pNodeLogFunction = logfun - , _p2pNodeStats = statsVar - , _p2pNodeClientSession = session - , _p2pNodeRng = rngVar - , _p2pNodeActive = activeVar - , _p2pNodeDoPeerSync = doPeerSync - } - - logfun @T.Text Debug "created node" - return s + activeVar <- newTVarIO False + logfun Debug "created node" + return P2pNode + { _p2pNodeNetworkId = _p2pNodeParamsNetworkId params + , _p2pNodePeerInfo = _p2pNodeParamsMyPeerInfo params + , _p2pNodePeerDb = _p2pNodeParamsPeerDb params + , _p2pNodeSessions = sessionsVar + , _p2pNodeManager = _p2pNodeParamsManager params + , _p2pNodeLogFunction = _p2pNodeParamsLogFunction params + , _p2pNodeStats = statsVar + , _p2pNodeClientSession = _p2pNodeParamsSession params + , _p2pNodeRng = rngVar + , _p2pNodeActive = activeVar + , _p2pNodeDoPeerSync = _p2pNodeParamsDoPeerSync params + , _p2pNodePrivate = _p2pNodeParamsIsPrivate params + , _p2pNodeMaxSessionCount = _p2pNodeParamsMaxSessionCount params + , _p2pNodeSessionTimeout = _p2pNodeParamsSessionTimeout params + } where - myInfo = _peerInfo peer + logfun :: LogLevel -> T.Text -> IO () + logfun = _p2pNodeParamsLogFunction params -- -------------------------------------------------------------------------- -- -- Run P2P Node -p2pStartNode :: P2pConfiguration -> P2pNode -> IO () -p2pStartNode conf node = concurrently_ - (runForever (logg node) "P2P.Node.awaitSessions" $ awaitSessions node) - (runForever (logg node) "P2P.Node.newSessions" $ newSession conf node) +p2pStartNodeInactive :: P2pNode -> IO () +p2pStartNodeInactive node = do + atomically (setInactive node) + concurrently_ + (runForever (logg node) "P2P.Node.awaitSessions" $ awaitSessions node) + (runForever (logg node) "P2P.Node.newSessions" $ newSession node) + +p2pStartNode :: P2pNode -> IO () +p2pStartNode node = do + atomically (setActive node) + concurrently_ + (runForever (logg node) "P2P.Node.awaitSessions" $ awaitSessions node) + (runForever (logg node) "P2P.Node.newSessions" $ newSession node) p2pStopNode :: P2P.Node.P2pNode -> IO () p2pStopNode node = do @@ -819,3 +864,11 @@ p2pStopNode node = do readTVar (_p2pNodeSessions node) mapM_ (uninterruptibleCancel . snd) sessions logg node Info "stopped node" + +-- | Activate and run a node. +-- +-- The node is stopped when an asynchronoous exception is raised in the thread. +-- +p2pRunNode :: P2pNode -> IO () +p2pRunNode n = finally (p2pStartNode n) (p2pStopNode n) +