From b7f5c86760f612db22b9d1ebd72556b98128ae25 Mon Sep 17 00:00:00 2001 From: Josh Dechant Date: Tue, 13 Feb 2024 21:36:56 -0500 Subject: [PATCH] some cleanup --- block/ss_manager.go | 22 ++++++------------- node/full.go | 47 +--------------------------------------- rpc/json/helpers_test.go | 17 +-------------- state/executor.go | 2 +- state/executor_test.go | 18 +++++++++------ 5 files changed, 21 insertions(+), 85 deletions(-) diff --git a/block/ss_manager.go b/block/ss_manager.go index f8abd14a2..c76ea57e6 100644 --- a/block/ss_manager.go +++ b/block/ss_manager.go @@ -15,7 +15,6 @@ import ( "github.com/cometbft/cometbft/proxy" cmtypes "github.com/cometbft/cometbft/types" ds "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/astriaorg/rollkit/config" "github.com/astriaorg/rollkit/mempool" @@ -62,7 +61,12 @@ func NewSSManager( return nil, err } + // Generate a private key from a null secret. + // Since a shared sequencer is providing consensus, we do this for compatibility when building & + // validating blocks, so all nodes build the same blocks. nullKey := ed25519.GenPrivKeyFromSecret([]byte{0x00}) + + // Set block proposer to the "null key" address for now; maybe use astria proposer in future? proposer := nullKey.PubKey().Address() exec := state.NewBlockExecutor(proposer, genesis.ChainID, mempool, proxyApp.Consensus(), eventBus, logger, execMetrics) @@ -97,6 +101,8 @@ func NewSSManager( return agg, nil } +// Crash recovery according to https://docs.cometbft.com/v0.38/spec/abci/abci++_app_requirements#crash-recovery +// todo: make sure all "CometBFT will panic if any of the steps in the sequence happen out of order" scenarios are implemented func (m *SSManager) CheckCrashRecovery(ctx context.Context) error { m.logger.Info("checking for crash recovery") res, err := m.proxyApp.Query().Info(ctx, proxy.RequestInfo) @@ -362,12 +368,6 @@ func (m *SSManager) updateState(ctx context.Context, s types.State) error { return nil } -func (m *SSManager) getLastBlockTime() time.Time { - m.lastStateMtx.RLock() - defer m.lastStateMtx.RUnlock() - return m.lastState.LastBlockTime -} - func (m *SSManager) createBlock(height uint64, timestamp time.Time, txs types.Txs, lastCommit *types.Commit, lastHeaderHash types.Hash) (*types.Block, error) { m.lastStateMtx.RLock() defer m.lastStateMtx.RUnlock() @@ -438,11 +438,3 @@ func updateState(s *types.State, res *abci.ResponseInitChain) { s.LastResultsHash = merkle.HashFromByteSlices(nil) } - -func getAddress(key crypto.PrivKey) ([]byte, error) { - rawKey, err := key.GetPublic().Raw() - if err != nil { - return nil, err - } - return cmcrypto.AddressHash(rawKey), nil -} diff --git a/node/full.go b/node/full.go index 063d07dce..7a63172a0 100644 --- a/node/full.go +++ b/node/full.go @@ -128,29 +128,8 @@ func newFullNode( return nil, err } - // dalcKV := newPrefixKV(baseKV, dalcPrefix) - // dalc, err := initDALC(nodeConfig, dalcKV, logger) - // if err != nil { - // return nil, err - // } - - // p2pClient, err := p2p.NewClient(nodeConfig.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p"), p2pMetrics) - // if err != nil { - // return nil, err - // } - mainKV := newPrefixKV(baseKV, mainPrefix) - // headerSyncService, err := initHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger) - // if err != nil { - // return nil, err - // } - - // blockSyncService, err := initBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger) - // if err != nil { - // return nil, err - // } - mempool := initMempool(logger, proxyApp, memplMetrics) store := store.New(mainKV) @@ -165,6 +144,7 @@ func newFullNode( return nil, err } + // check for crash recovery blockManager.CheckCrashRecovery(ctx) // genesis info for exec api & sequencer client @@ -240,37 +220,12 @@ func initBaseKV(nodeConfig config.NodeConfig, logger log.Logger) (ds.TxnDatastor return store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, "rollkit") } -// func initDALC(nodeConfig config.NodeConfig, dalcKV ds.TxnDatastore, logger log.Logger) (*da.DAClient, error) { -// daClient := goDAProxy.NewClient() -// err := daClient.Start(nodeConfig.DAAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) -// if err != nil { -// return nil, fmt.Errorf("error while establishing GRPC connection to DA layer: %w", err) -// } -// return &da.DAClient{DA: daClient, GasPrice: nodeConfig.DAGasPrice, Logger: logger.With("module", "da_client")}, nil -// } - func initMempool(logger log.Logger, proxyApp proxy.AppConns, memplMetrics *mempool.Metrics) *mempool.CListMempool { mempool := mempool.NewCListMempool(llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0, mempool.WithMetrics(memplMetrics)) mempool.EnableTxsAvailable() return mempool } -// func initHeaderSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSyncService, error) { -// headerSyncService, err := block.NewHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService")) -// if err != nil { -// return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) -// } -// return headerSyncService, nil -// } - -// func initBlockSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockSyncService, error) { -// blockSyncService, err := block.NewBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockSyncService")) -// if err != nil { -// return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) -// } -// return blockSyncService, nil -// } - func initBlockManager(nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *cmtypes.EventBus, logger log.Logger, seqMetrics *block.Metrics, execMetrics *state.Metrics) (*block.SSManager, error) { blockManager, err := block.NewSSManager(nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp, eventBus, logger.With("module", "BlockManager"), seqMetrics, execMetrics) if err != nil { diff --git a/rpc/json/helpers_test.go b/rpc/json/helpers_test.go index bd34bfffe..cc712564a 100644 --- a/rpc/json/helpers_test.go +++ b/rpc/json/helpers_test.go @@ -2,25 +2,20 @@ package json import ( "context" - "crypto/rand" "testing" abci "github.com/cometbft/cometbft/abci/types" cmconfig "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/crypto/ed25519" "github.com/cometbft/cometbft/libs/log" - "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/proxy" rpcclient "github.com/cometbft/cometbft/rpc/client" cmtypes "github.com/cometbft/cometbft/types" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/astriaorg/rollkit/config" "github.com/astriaorg/rollkit/node" "github.com/astriaorg/rollkit/test/mocks" - "github.com/astriaorg/rollkit/types" ) func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) { @@ -65,18 +60,8 @@ func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) { LastBlockHeight: 345, LastBlockAppHash: nil, }, nil) - key, _, _ := crypto.GenerateEd25519Key(rand.Reader) - validatorKey := ed25519.GenPrivKey() - nodeKey := &p2p.NodeKey{ - PrivKey: validatorKey, - } - signingKey, _ := types.GetNodeKey(nodeKey) - pubKey := validatorKey.PubKey() - genesisValidators := []cmtypes.GenesisValidator{ - {Address: pubKey.Address(), PubKey: pubKey, Power: int64(100), Name: "gen #1"}, - } - n, err := node.NewNode(context.Background(), config.NodeConfig{BlockManagerConfig: config.BlockManagerConfig{}}, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, node.DefaultMetricsProvider(cmconfig.DefaultInstrumentationConfig()), log.TestingLogger()) + n, err := node.NewNode(context.Background(), config.NodeConfig{BlockManagerConfig: config.BlockManagerConfig{}}, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, node.DefaultMetricsProvider(cmconfig.DefaultInstrumentationConfig()), log.TestingLogger()) require.NoError(err) require.NotNil(n) diff --git a/state/executor.go b/state/executor.go index 839d38973..43b59c052 100644 --- a/state/executor.go +++ b/state/executor.go @@ -373,7 +373,7 @@ func (e *BlockExecutor) commit(ctx context.Context, state types.State, block *ty return nil, err } - if skipExec == false { + if !skipExec { _, err = e.proxyApp.Commit(ctx) if err != nil { return nil, err diff --git a/state/executor_test.go b/state/executor_test.go index 82707a490..04f65d82f 100644 --- a/state/executor_test.go +++ b/state/executor_test.go @@ -178,12 +178,14 @@ func doTestApplyBlock(t *testing.T) { } block.SignedHeader.Validators = cmtypes.NewValidatorSet(validators) - newState, resp, err := executor.ApplyBlock(context.Background(), state, block) + resp, err := executor.ApplyBlock(context.Background(), state, block) require.NoError(err) - require.NotNil(newState) require.NotNil(resp) + newState, err := executor.UpdateState(state, block, resp) + require.NoError(err) + require.NotNil(newState) assert.Equal(uint64(1), newState.LastBlockHeight) - appHash, _, err := executor.Commit(context.Background(), newState, block, resp) + appHash, err := executor.Commit(context.Background(), newState, block, resp, false) require.NoError(err) assert.Equal(mockAppHash, appHash) @@ -207,12 +209,14 @@ func doTestApplyBlock(t *testing.T) { } block.SignedHeader.Validators = cmtypes.NewValidatorSet(validators) - newState, resp, err = executor.ApplyBlock(context.Background(), newState, block) + resp, err = executor.ApplyBlock(context.Background(), newState, block) require.NoError(err) - require.NotNil(newState) require.NotNil(resp) + newState, err = executor.UpdateState(state, block, resp) + require.NoError(err) + require.NotNil(newState) assert.Equal(uint64(2), newState.LastBlockHeight) - _, _, err = executor.Commit(context.Background(), newState, block, resp) + _, err = executor.Commit(context.Background(), newState, block, resp, true) require.NoError(err) // wait for at least 4 Tx events, for up to 3 second. @@ -303,7 +307,7 @@ func TestUpdateStateConsensusParams(t *testing.T) { TxResults: txResults, } - updatedState, err := executor.updateState(state, block, resp) + updatedState, err := executor.UpdateState(state, block, resp) require.NoError(t, err) assert.Equal(t, uint64(1235), updatedState.LastHeightConsensusParamsChanged)