Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mycodecrafting committed Feb 14, 2024
1 parent fb0822b commit b7f5c86
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 85 deletions.
22 changes: 7 additions & 15 deletions block/ss_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/cometbft/cometbft/proxy"
cmtypes "github.com/cometbft/cometbft/types"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/crypto"

"github.com/astriaorg/rollkit/config"
"github.com/astriaorg/rollkit/mempool"
Expand Down Expand Up @@ -62,7 +61,12 @@ func NewSSManager(
return nil, err
}

// Generate a private key from a null secret.
// Since a shared sequencer is providing consensus, we do this for compatibility when building &
// validating blocks, so all nodes build the same blocks.
nullKey := ed25519.GenPrivKeyFromSecret([]byte{0x00})

// Set block proposer to the "null key" address for now; maybe use astria proposer in future?
proposer := nullKey.PubKey().Address()

exec := state.NewBlockExecutor(proposer, genesis.ChainID, mempool, proxyApp.Consensus(), eventBus, logger, execMetrics)
Expand Down Expand Up @@ -97,6 +101,8 @@ func NewSSManager(
return agg, nil
}

// Crash recovery according to https://docs.cometbft.com/v0.38/spec/abci/abci++_app_requirements#crash-recovery
// todo: make sure all "CometBFT will panic if any of the steps in the sequence happen out of order" scenarios are implemented
func (m *SSManager) CheckCrashRecovery(ctx context.Context) error {
m.logger.Info("checking for crash recovery")
res, err := m.proxyApp.Query().Info(ctx, proxy.RequestInfo)
Expand Down Expand Up @@ -362,12 +368,6 @@ func (m *SSManager) updateState(ctx context.Context, s types.State) error {
return nil
}

func (m *SSManager) getLastBlockTime() time.Time {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
return m.lastState.LastBlockTime
}

func (m *SSManager) createBlock(height uint64, timestamp time.Time, txs types.Txs, lastCommit *types.Commit, lastHeaderHash types.Hash) (*types.Block, error) {
m.lastStateMtx.RLock()
defer m.lastStateMtx.RUnlock()
Expand Down Expand Up @@ -438,11 +438,3 @@ func updateState(s *types.State, res *abci.ResponseInitChain) {
s.LastResultsHash = merkle.HashFromByteSlices(nil)

}

func getAddress(key crypto.PrivKey) ([]byte, error) {
rawKey, err := key.GetPublic().Raw()
if err != nil {
return nil, err
}
return cmcrypto.AddressHash(rawKey), nil
}
47 changes: 1 addition & 46 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,29 +128,8 @@ func newFullNode(
return nil, err
}

// dalcKV := newPrefixKV(baseKV, dalcPrefix)
// dalc, err := initDALC(nodeConfig, dalcKV, logger)
// if err != nil {
// return nil, err
// }

// p2pClient, err := p2p.NewClient(nodeConfig.P2P, p2pKey, genesis.ChainID, baseKV, logger.With("module", "p2p"), p2pMetrics)
// if err != nil {
// return nil, err
// }

mainKV := newPrefixKV(baseKV, mainPrefix)

// headerSyncService, err := initHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger)
// if err != nil {
// return nil, err
// }

// blockSyncService, err := initBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger)
// if err != nil {
// return nil, err
// }

mempool := initMempool(logger, proxyApp, memplMetrics)

store := store.New(mainKV)
Expand All @@ -165,6 +144,7 @@ func newFullNode(
return nil, err
}

// check for crash recovery
blockManager.CheckCrashRecovery(ctx)

// genesis info for exec api & sequencer client
Expand Down Expand Up @@ -240,37 +220,12 @@ func initBaseKV(nodeConfig config.NodeConfig, logger log.Logger) (ds.TxnDatastor
return store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, "rollkit")
}

// func initDALC(nodeConfig config.NodeConfig, dalcKV ds.TxnDatastore, logger log.Logger) (*da.DAClient, error) {
// daClient := goDAProxy.NewClient()
// err := daClient.Start(nodeConfig.DAAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
// if err != nil {
// return nil, fmt.Errorf("error while establishing GRPC connection to DA layer: %w", err)
// }
// return &da.DAClient{DA: daClient, GasPrice: nodeConfig.DAGasPrice, Logger: logger.With("module", "da_client")}, nil
// }

func initMempool(logger log.Logger, proxyApp proxy.AppConns, memplMetrics *mempool.Metrics) *mempool.CListMempool {
mempool := mempool.NewCListMempool(llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0, mempool.WithMetrics(memplMetrics))
mempool.EnableTxsAvailable()
return mempool
}

// func initHeaderSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSyncService, error) {
// headerSyncService, err := block.NewHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService"))
// if err != nil {
// return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
// }
// return headerSyncService, nil
// }

// func initBlockSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockSyncService, error) {
// blockSyncService, err := block.NewBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockSyncService"))
// if err != nil {
// return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
// }
// return blockSyncService, nil
// }

func initBlockManager(nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *cmtypes.EventBus, logger log.Logger, seqMetrics *block.Metrics, execMetrics *state.Metrics) (*block.SSManager, error) {
blockManager, err := block.NewSSManager(nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp, eventBus, logger.With("module", "BlockManager"), seqMetrics, execMetrics)
if err != nil {
Expand Down
17 changes: 1 addition & 16 deletions rpc/json/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,20 @@ package json

import (
"context"
"crypto/rand"
"testing"

abci "github.com/cometbft/cometbft/abci/types"
cmconfig "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/crypto/ed25519"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/proxy"
rpcclient "github.com/cometbft/cometbft/rpc/client"
cmtypes "github.com/cometbft/cometbft/types"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/astriaorg/rollkit/config"
"github.com/astriaorg/rollkit/node"
"github.com/astriaorg/rollkit/test/mocks"
"github.com/astriaorg/rollkit/types"
)

func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
Expand Down Expand Up @@ -65,18 +60,8 @@ func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) {
LastBlockHeight: 345,
LastBlockAppHash: nil,
}, nil)
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
validatorKey := ed25519.GenPrivKey()
nodeKey := &p2p.NodeKey{
PrivKey: validatorKey,
}
signingKey, _ := types.GetNodeKey(nodeKey)
pubKey := validatorKey.PubKey()

genesisValidators := []cmtypes.GenesisValidator{
{Address: pubKey.Address(), PubKey: pubKey, Power: int64(100), Name: "gen #1"},
}
n, err := node.NewNode(context.Background(), config.NodeConfig{BlockManagerConfig: config.BlockManagerConfig{}}, key, signingKey, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, node.DefaultMetricsProvider(cmconfig.DefaultInstrumentationConfig()), log.TestingLogger())
n, err := node.NewNode(context.Background(), config.NodeConfig{BlockManagerConfig: config.BlockManagerConfig{}}, proxy.NewLocalClientCreator(app), &cmtypes.GenesisDoc{ChainID: "test"}, node.DefaultMetricsProvider(cmconfig.DefaultInstrumentationConfig()), log.TestingLogger())
require.NoError(err)
require.NotNil(n)

Expand Down
2 changes: 1 addition & 1 deletion state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (e *BlockExecutor) commit(ctx context.Context, state types.State, block *ty
return nil, err
}

if skipExec == false {
if !skipExec {
_, err = e.proxyApp.Commit(ctx)
if err != nil {
return nil, err
Expand Down
18 changes: 11 additions & 7 deletions state/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,14 @@ func doTestApplyBlock(t *testing.T) {
}
block.SignedHeader.Validators = cmtypes.NewValidatorSet(validators)

newState, resp, err := executor.ApplyBlock(context.Background(), state, block)
resp, err := executor.ApplyBlock(context.Background(), state, block)
require.NoError(err)
require.NotNil(newState)
require.NotNil(resp)
newState, err := executor.UpdateState(state, block, resp)
require.NoError(err)
require.NotNil(newState)
assert.Equal(uint64(1), newState.LastBlockHeight)
appHash, _, err := executor.Commit(context.Background(), newState, block, resp)
appHash, err := executor.Commit(context.Background(), newState, block, resp, false)
require.NoError(err)
assert.Equal(mockAppHash, appHash)

Expand All @@ -207,12 +209,14 @@ func doTestApplyBlock(t *testing.T) {
}
block.SignedHeader.Validators = cmtypes.NewValidatorSet(validators)

newState, resp, err = executor.ApplyBlock(context.Background(), newState, block)
resp, err = executor.ApplyBlock(context.Background(), newState, block)
require.NoError(err)
require.NotNil(newState)
require.NotNil(resp)
newState, err = executor.UpdateState(state, block, resp)
require.NoError(err)
require.NotNil(newState)
assert.Equal(uint64(2), newState.LastBlockHeight)
_, _, err = executor.Commit(context.Background(), newState, block, resp)
_, err = executor.Commit(context.Background(), newState, block, resp, true)
require.NoError(err)

// wait for at least 4 Tx events, for up to 3 second.
Expand Down Expand Up @@ -303,7 +307,7 @@ func TestUpdateStateConsensusParams(t *testing.T) {
TxResults: txResults,
}

updatedState, err := executor.updateState(state, block, resp)
updatedState, err := executor.UpdateState(state, block, resp)
require.NoError(t, err)

assert.Equal(t, uint64(1235), updatedState.LastHeightConsensusParamsChanged)
Expand Down

0 comments on commit b7f5c86

Please sign in to comment.