Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initInjDBs and NewTestnetNodeWithContext #1

Open
wants to merge 1 commit into
base: v0.38.x-inj
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,238 @@ func NewNodeWithContext(ctx context.Context,
return node, nil
}

// NewNodeWithContext is cancellable version of NewNode.
func NewTestnetNodeWithContext(ctx context.Context,
config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
genesisDocProvider GenesisDocProvider,
dbProvider cfg.DBProvider,
metricsProvider MetricsProvider,
logger log.Logger,
options ...Option,
) (*Node, error) {
blockStore, stateDB, err := initInjDBs(config, dbProvider, genesisDocProvider)
if err != nil {
return nil, err
}

stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
})

state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider)
if err != nil {
return nil, err
}

csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID)

// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
if err != nil {
return nil, err
}

// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}

// If an address is provided, listen on the socket for a connection from an
// external signing process.
if config.PrivValidatorListenAddr != "" {
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator socket client: %w", err)
}
}

pubKey, err := privValidator.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}

// Determine whether we should attempt state sync.
stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}

// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync CometBFT with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}

// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
}
}

// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)

logNodeStartupInfo(state, pubKey, logger, consensusLogger)

mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)

evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger)
if err != nil {
return nil, err
}

// make block executor for consensus and blocksync reactors to execute blocks
blockExec := sm.NewBlockExecutor(
stateStore,
logger.With("module", "state"),
proxyApp.Consensus(),
mempool,
evidencePool,
blockStore,
sm.BlockExecutorWithMetrics(smMetrics),
)

offlineStateSyncHeight := int64(0)
if blockStore.Height() == 0 {
offlineStateSyncHeight, err = blockExec.Store().GetOfflineStateSyncHeight()
if err != nil && err.Error() != "value empty" {
panic(fmt.Sprintf("failed to retrieve statesynced height from store %s; expected state store height to be %v", err, state.LastBlockHeight))
}
}
// Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger, bsMetrics, offlineStateSyncHeight)
if err != nil {
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
}

consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || blockSync, eventBus, consensusLogger, offlineStateSyncHeight,
)

err = stateStore.SetOfflineStateSyncHeight(0)
if err != nil {
panic(fmt.Sprintf("failed to reset the offline state sync height %s", err))
}
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactor := statesync.NewReactor(
*config.StateSync,
proxyApp.Snapshot(),
proxyApp.Query(),
ssMetrics,
)
stateSyncReactor.SetLogger(logger.With("module", "statesync"))

nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}

transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)

p2pLogger := logger.With("module", "p2p")
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
)

err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}

err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}

addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}

// Optionally, start the pex reactor
//
// TODO:
//
// We need to set Seeds and PersistentPeers on the switch,
// since it needs to be able to use these (and their DNS names)
// even if the PEX is off. We can include the DNS name in the NetAddress,
// but it would still be nice to have a clear list of the current "PersistentPeers"
// somewhere that we can return with net_info.
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var pexReactor *pex.Reactor
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}

// Add private IDs to addrbook to block those peers being added
addrBook.AddPrivateIDs(splitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " "))

node := &Node{
config: config,
genesisDoc: genDoc,
privValidator: privValidator,

transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,

stateStore: stateStore,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
mempool: mempool,
consensusState: consensusState,
consensusReactor: consensusReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
evidencePool: evidencePool,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
blockIndexer: blockIndexer,
eventBus: eventBus,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

for _, option := range options {
option(node)
}

return node, nil
}

// OnStart starts the Node. It implements service.Service.
func (n *Node) OnStart() error {
now := cmttime.Now()
Expand Down
53 changes: 53 additions & 0 deletions node/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

dbm "github.com/cometbft/cometbft-db"

"github.com/cosmos/gogoproto/proto"

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/blocksync"
cfg "github.com/cometbft/cometbft/config"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/pex"
"github.com/cometbft/cometbft/privval"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
"github.com/cometbft/cometbft/proxy"
sm "github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/state/indexer"
Expand Down Expand Up @@ -120,6 +123,56 @@ func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.B
return
}

func initInjDBs(config *cfg.Config, dbProvider cfg.DBProvider, genesisDocProvider GenesisDocProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return
}

stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
if err != nil {
return
}

_ = sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
})

state, _, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider)
if err != nil {
return nil, nil, err
}
bz, err := blockStoreDB.Get(calcSeenCommitKey(state.LastBlockHeight))
if err != nil {
return nil, nil, err
}

// Modify blockstore
pbc := new(cmtproto.Commit)
err = proto.Unmarshal(bz, pbc)
if err != nil {
panic(fmt.Sprintf("error unmarshal : %v", err))
}

// Flag commit
pbc.Signatures[0].BlockIdFlag = 2

bz, err = proto.Marshal(pbc)
if err != nil {
panic(fmt.Sprintf("error marshal : %v", err))
}
blockStoreDB.Set(calcSeenCommitKey(state.LastBlockHeight), bz)

blockStore = store.NewBlockStore(blockStoreDB)

return
}

func calcSeenCommitKey(height int64) []byte {
return []byte(fmt.Sprintf("SC:%v", height))
}

func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, metrics)
proxyApp.SetLogger(logger.With("module", "proxy"))
Expand Down