From 8bcc750ed1b93c6d92d5230016664370cd6400b0 Mon Sep 17 00:00:00 2001 From: lacsomot Date: Tue, 28 May 2024 18:08:09 +0700 Subject: [PATCH] add initInjDBs and NewTestnetNodeWithContext --- node/node.go | 232 ++++++++++++++++++++++++++++++++++++++++++++++++++ node/setup.go | 53 ++++++++++++ 2 files changed, 285 insertions(+) diff --git a/node/node.go b/node/node.go index 6fe6426e25..f9c1ff663b 100644 --- a/node/node.go +++ b/node/node.go @@ -504,6 +504,238 @@ func NewNodeWithContext(ctx context.Context, return node, nil } +// NewNodeWithContext is cancellable version of NewNode. +func NewTestnetNodeWithContext(ctx context.Context, + config *cfg.Config, + privValidator types.PrivValidator, + nodeKey *p2p.NodeKey, + clientCreator proxy.ClientCreator, + genesisDocProvider GenesisDocProvider, + dbProvider cfg.DBProvider, + metricsProvider MetricsProvider, + logger log.Logger, + options ...Option, +) (*Node, error) { + blockStore, stateDB, err := initInjDBs(config, dbProvider, genesisDocProvider) + if err != nil { + return nil, err + } + + stateStore := sm.NewStore(stateDB, sm.StoreOptions{ + DiscardABCIResponses: config.Storage.DiscardABCIResponses, + }) + + state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider) + if err != nil { + return nil, err + } + + csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID) + + // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). + proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics) + if err != nil { + return nil, err + } + + // EventBus and IndexerService must be started before the handshake because + // we might need to index the txs of the replayed block as this might not have happened + // when the node stopped last time (i.e. the node stopped after it saved the block + // but before it indexed the txs) + eventBus, err := createAndStartEventBus(logger) + if err != nil { + return nil, err + } + + indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, + genDoc.ChainID, dbProvider, eventBus, logger) + if err != nil { + return nil, err + } + + // If an address is provided, listen on the socket for a connection from an + // external signing process. + if config.PrivValidatorListenAddr != "" { + // FIXME: we should start services inside OnStart + privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger) + if err != nil { + return nil, fmt.Errorf("error with private validator socket client: %w", err) + } + } + + pubKey, err := privValidator.GetPubKey() + if err != nil { + return nil, fmt.Errorf("can't get pubkey: %w", err) + } + + // Determine whether we should attempt state sync. + stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) + if stateSync && state.LastBlockHeight > 0 { + logger.Info("Found local state with non-zero height, skipping state sync") + stateSync = false + } + + // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, + // and replays any blocks as necessary to sync CometBFT with the app. + consensusLogger := logger.With("module", "consensus") + if !stateSync { + if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { + return nil, err + } + + // Reload the state. It will have the Version.Consensus.App set by the + // Handshake, and may have other modifications as well (ie. depending on + // what happened during block replay). + state, err = stateStore.Load() + if err != nil { + return nil, fmt.Errorf("cannot load state: %w", err) + } + } + + // Determine whether we should do block sync. This must happen after the handshake, since the + // app may modify the validator set, specifying ourself as the only validator. + blockSync := !onlyValidatorIsUs(state, pubKey) + + logNodeStartupInfo(state, pubKey, logger, consensusLogger) + + mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) + + evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger) + if err != nil { + return nil, err + } + + // make block executor for consensus and blocksync reactors to execute blocks + blockExec := sm.NewBlockExecutor( + stateStore, + logger.With("module", "state"), + proxyApp.Consensus(), + mempool, + evidencePool, + blockStore, + sm.BlockExecutorWithMetrics(smMetrics), + ) + + offlineStateSyncHeight := int64(0) + if blockStore.Height() == 0 { + offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight() + if err != nil && err.Error() != "value empty" { + panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight)) + } + } + // Don't start block sync if we're doing a state sync first. + bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger, bsMetrics, offlineStateSyncHeight) + if err != nil { + return nil, fmt.Errorf("could not create blocksync reactor: %w", err) + } + + consensusReactor, consensusState := createConsensusReactor( + config, state, blockExec, blockStore, mempool, evidencePool, + privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight, + ) + + err = stateStore.SetOfflineStateSyncHeight(0) + if err != nil { + panic(fmt.Sprintf("failed to reset the offline state sync height %s", err)) + } + // Set up state sync reactor, and schedule a sync if requested. + // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, + // we should clean this whole thing up. See: + // https://github.com/tendermint/tendermint/issues/4644 + stateSyncReactor := statesync.NewReactor( + *config.StateSync, + proxyApp.Snapshot(), + proxyApp.Query(), + ssMetrics, + ) + stateSyncReactor.SetLogger(logger.With("module", "statesync")) + + nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) + if err != nil { + return nil, err + } + + transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) + + p2pLogger := logger.With("module", "p2p") + sw := createSwitch( + config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, + stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, + ) + + err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) + } + + err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) + if err != nil { + return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + } + + addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) + if err != nil { + return nil, fmt.Errorf("could not create addrbook: %w", err) + } + + // Optionally, start the pex reactor + // + // TODO: + // + // We need to set Seeds and PersistentPeers on the switch, + // since it needs to be able to use these (and their DNS names) + // even if the PEX is off. We can include the DNS name in the NetAddress, + // but it would still be nice to have a clear list of the current "PersistentPeers" + // somewhere that we can return with net_info. + // + // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. + // Note we currently use the addrBook regardless at least for AddOurAddress + var pexReactor *pex.Reactor + if config.P2P.PexReactor { + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) + } + + // Add private IDs to addrbook to block those peers being added + addrBook.AddPrivateIDs(splitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ")) + + node := &Node{ + config: config, + genesisDoc: genDoc, + privValidator: privValidator, + + transport: transport, + sw: sw, + addrBook: addrBook, + nodeInfo: nodeInfo, + nodeKey: nodeKey, + + stateStore: stateStore, + blockStore: blockStore, + bcReactor: bcReactor, + mempoolReactor: mempoolReactor, + mempool: mempool, + consensusState: consensusState, + consensusReactor: consensusReactor, + stateSyncReactor: stateSyncReactor, + stateSync: stateSync, + stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state + pexReactor: pexReactor, + evidencePool: evidencePool, + proxyApp: proxyApp, + txIndexer: txIndexer, + indexerService: indexerService, + blockIndexer: blockIndexer, + eventBus: eventBus, + } + node.BaseService = *service.NewBaseService(logger, "Node", node) + + for _, option := range options { + option(node) + } + + return node, nil +} + // OnStart starts the Node. It implements service.Service. func (n *Node) OnStart() error { now := cmttime.Now() diff --git a/node/setup.go b/node/setup.go index 6d2e9c295b..479e9682e7 100644 --- a/node/setup.go +++ b/node/setup.go @@ -13,6 +13,8 @@ import ( dbm "github.com/cometbft/cometbft-db" + "github.com/cosmos/gogoproto/proto" + abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/blocksync" cfg "github.com/cometbft/cometbft/config" @@ -28,6 +30,7 @@ import ( "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/p2p/pex" "github.com/cometbft/cometbft/privval" + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" "github.com/cometbft/cometbft/proxy" sm "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/state/indexer" @@ -120,6 +123,56 @@ func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.B return } +func initInjDBs(config *cfg.Config, dbProvider cfg.DBProvider, genesisDocProvider GenesisDocProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { + var blockStoreDB dbm.DB + blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config}) + if err != nil { + return + } + + stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config}) + if err != nil { + return + } + + _ = sm.NewStore(stateDB, sm.StoreOptions{ + DiscardABCIResponses: config.Storage.DiscardABCIResponses, + }) + + state, _, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider) + if err != nil { + return nil, nil, err + } + bz, err := blockStoreDB.Get(calcSeenCommitKey(state.LastBlockHeight)) + if err != nil { + return nil, nil, err + } + + // Modify blockstore + pbc := new(cmtproto.Commit) + err = proto.Unmarshal(bz, pbc) + if err != nil { + panic(fmt.Sprintf("error unmarshal : %v", err)) + } + + // Flag commit + pbc.Signatures[0].BlockIdFlag = 2 + + bz, err = proto.Marshal(pbc) + if err != nil { + panic(fmt.Sprintf("error marshal : %v", err)) + } + blockStoreDB.Set(calcSeenCommitKey(state.LastBlockHeight), bz) + + blockStore = store.NewBlockStore(blockStoreDB) + + return +} + +func calcSeenCommitKey(height int64) []byte { + return []byte(fmt.Sprintf("SC:%v", height)) +} + func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { proxyApp := proxy.NewAppConns(clientCreator, metrics) proxyApp.SetLogger(logger.With("module", "proxy"))