diff --git a/block/components.go b/block/components.go index 3ee2062ac..5e20ec9de 100644 --- a/block/components.go +++ b/block/components.go @@ -46,7 +46,8 @@ func (bc *Components) GetLastState() types.State { return types.State{} } -// Start starts all components and monitors for critical errors +// Start starts all components and monitors for critical errors. +// It is blocking and returns when the context is cancelled or an error occurs func (bc *Components) Start(ctx context.Context) error { ctxWithCancel, cancel := context.WithCancel(ctx) @@ -137,6 +138,7 @@ func NewSyncComponents( metrics *Metrics, blockOpts BlockOptions, ) (*Components, error) { + logger.Info().Msg("Starting in sync-mode") cacheManager, err := cache.NewManager(config, store, logger) if err != nil { return nil, fmt.Errorf("failed to create cache manager: %w", err) @@ -200,6 +202,7 @@ func NewAggregatorComponents( metrics *Metrics, blockOpts BlockOptions, ) (*Components, error) { + logger.Info().Msg("Starting in aggregator-mode") cacheManager, err := cache.NewManager(config, store, logger) if err != nil { return nil, fmt.Errorf("failed to create cache manager: %w", err) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 4fef5db38..5578fe043 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -199,7 +199,7 @@ func (e *Executor) initializeState() error { LastBlockHeight: e.genesis.InitialHeight - 1, LastBlockTime: e.genesis.StartTime, AppHash: stateRoot, - DAHeight: 0, + DAHeight: e.genesis.DAStartHeight, } } @@ -633,35 +633,7 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead return fmt.Errorf("invalid header: %w", err) } - // Validate header against data - if err := types.Validate(header, data); err != nil { - return fmt.Errorf("header-data validation failed: %w", err) - } - - // Check chain ID - if header.ChainID() != lastState.ChainID { - return fmt.Errorf("chain ID mismatch: expected %s, got %s", - lastState.ChainID, header.ChainID()) - } - - // Check height - expectedHeight := lastState.LastBlockHeight + 1 - if header.Height() != expectedHeight { - return fmt.Errorf("invalid height: expected %d, got %d", - expectedHeight, header.Height()) - } - - // Check timestamp - if header.Height() > 1 && lastState.LastBlockTime.After(header.Time()) { - return fmt.Errorf("block time must be strictly increasing") - } - - // Check app hash - if !bytes.Equal(header.AppHash, lastState.AppHash) { - return fmt.Errorf("app hash mismatch") - } - - return nil + return lastState.AssertValidForNextState(header, data) } // sendCriticalError sends a critical error to the error channel without blocking diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 0d97d5940..c6e8daa78 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -168,7 +168,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) - hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data) + hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil) events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77) require.Len(t, events, 1) @@ -196,7 +196,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Header with no data hash present should trigger empty data creation (per current logic) - hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil) + hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil) events := r.processBlobs(context.Background(), [][]byte{hb}, 88) require.Len(t, events, 1) @@ -223,7 +223,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) - hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil) + hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil, nil) gotH := r.tryDecodeHeader(hb, 123) require.NotNil(t, gotH) assert.Equal(t, sh.Hash().String(), gotH.Hash().String()) @@ -279,7 +279,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { // Prepare header/data blobs dataBin, data := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1) - hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data) + hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data, nil) cfg := config.DefaultConfig() cfg.DA.Namespace = "nsHdr" @@ -322,7 +322,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) - hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data) + hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil) // Process header from DA height 100 first events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100) @@ -361,9 +361,9 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin data4Bin, data4 := makeSignedDataBytes(t, gen.ChainID, 4, addr, pub, signer, 2) data5Bin, data5 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1) - hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data) - hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data) - hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data) + hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data, nil) + hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data, nil) + hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil) // Process multiple headers from DA height 200 - should be stored as pending events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index bdd496ea8..8f8a3704e 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -158,6 +158,7 @@ func (s *Syncer) GetLastState() types.State { stateCopy := *state stateCopy.AppHash = bytes.Clone(state.AppHash) + stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash) return stateCopy } @@ -182,21 +183,34 @@ func (s *Syncer) initializeState() error { // Load state from store state, err := s.store.GetState(s.ctx) if err != nil { - // Use genesis state if no state exists + // Initialize new chain state for a fresh full node (no prior state on disk) + // Mirror executor initialization to ensure AppHash matches headers produced by the sequencer. + stateRoot, _, initErr := s.exec.InitChain( + s.ctx, + s.genesis.StartTime, + s.genesis.InitialHeight, + s.genesis.ChainID, + ) + if initErr != nil { + return fmt.Errorf("failed to initialize execution client: %w", initErr) + } + state = types.State{ ChainID: s.genesis.ChainID, InitialHeight: s.genesis.InitialHeight, LastBlockHeight: s.genesis.InitialHeight - 1, LastBlockTime: s.genesis.StartTime, - DAHeight: 0, + DAHeight: s.genesis.DAStartHeight, + AppHash: stateRoot, } } - + if state.DAHeight < s.genesis.DAStartHeight { + return fmt.Errorf("DA height (%d) is lower than DA start height (%d)", state.DAHeight, s.genesis.DAStartHeight) + } s.SetLastState(state) // Set DA height - daHeight := max(state.DAHeight, s.genesis.DAStartHeight) - s.SetDAHeight(daHeight) + s.SetDAHeight(state.DAHeight) s.logger.Info(). Uint64("height", state.LastBlockHeight). @@ -385,7 +399,14 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { if err := s.trySyncNextBlock(event); err != nil { s.logger.Error().Err(err).Msg("failed to sync next block") // If the error is not due to an validation error, re-store the event as pending - if !errors.Is(err, errInvalidBlock) { + switch { + case errors.Is(err, errInvalidBlock): + // do not reschedule + case errors.Is(err, errInvalidState): + s.sendCriticalError(fmt.Errorf("invalid state detected (block-height %d, state-height %d) "+ + "- block references do not match local state. Manual intervention required: %w", event.Header.Height(), + s.GetLastState().LastBlockHeight, err)) + default: s.cache.SetPendingEvent(height, event) } return @@ -402,8 +423,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { } } -// errInvalidBlock is returned when a block is failing validation -var errInvalidBlock = errors.New("invalid block") +var ( + // errInvalidBlock is returned when a block is failing validation + errInvalidBlock = errors.New("invalid block") + // errInvalidState is returned when the state has diverged from the DA blocks + errInvalidState = errors.New("invalid state") +) // trySyncNextBlock attempts to sync the next available block // the event is always the next block in sequence as processHeightEvent ensures it. @@ -425,10 +450,13 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish - if err := s.validateBlock(header, data); err != nil { + if err := s.validateBlock(currentState, data, header); err != nil { // remove header as da included (not per se needed, but keep cache clean) s.cache.RemoveHeaderDAIncluded(headerHash) - return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err)) + if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { + return errors.Join(errInvalidBlock, err) + } + return err } // Apply block @@ -534,23 +562,17 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade // NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct // or if the data was gibberish and somehow passed all validation prior but the header was correct // we are still losing both in the pending event. This should never happen. -func (s *Syncer) validateBlock( - header *types.SignedHeader, - data *types.Data, -) error { +func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error { // Set custom verifier for aggregator node signature header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider) - // Validate header with data if err := header.ValidateBasicWithData(data); err != nil { return fmt.Errorf("invalid header: %w", err) } - // Validate header against data - if err := types.Validate(header, data); err != nil { - return fmt.Errorf("header-data validation failed: %w", err) + if err := currState.AssertValidForNextState(header, data); err != nil { + return errors.Join(errInvalidState, err) } - return nil } diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 7a9e80dbb..ca80cab36 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -197,7 +197,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { Return(nil, errors.New("temporary failure")).Once() // Second call - success (should reset backoff and increment DA height) - _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil) + _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) data := &types.Data{ Metadata: &types.Metadata{ ChainID: gen.ChainID, diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 8c9cfea36..2e85a33f4 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -120,7 +120,7 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay heightEvents := make([]common.DAHeightEvent, totalHeights) for i := uint64(0); i < totalHeights; i++ { blockHeight, daHeight := i+gen.InitialHeight, i+daHeightOffset - _, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil) + _, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil, nil) d := &types.Data{Metadata: &types.Metadata{ChainID: gen.ChainID, Height: blockHeight, Time: uint64(time.Now().UnixNano())}} heightEvents[i] = common.DAHeightEvent{Header: sh, Data: d, DaHeight: daHeight} } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 002ca1396..9114155cd 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -3,6 +3,7 @@ package syncing import ( "context" crand "crypto/rand" + "crypto/sha512" "errors" "sync/atomic" "testing" @@ -45,7 +46,17 @@ func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer } // makeSignedHeaderBytes builds a valid SignedHeader and returns its binary encoding and the object -func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, data *types.Data) ([]byte, *types.SignedHeader) { +func makeSignedHeaderBytes( + tb testing.TB, + chainID string, + height uint64, + proposer []byte, + pub crypto.PubKey, + signer signerpkg.Signer, + appHash []byte, + data *types.Data, + lastHeaderHash []byte, +) ([]byte, *types.SignedHeader) { time := uint64(time.Now().UnixNano()) dataHash := common.DataHashForEmptyTxs if data != nil { @@ -59,6 +70,7 @@ func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, propose AppHash: appHash, DataHash: dataHash, ProposerAddress: proposer, + LastHeaderHash: lastHeaderHash, }, Signer: types.Signer{PubKey: pub, Address: proposer}, } @@ -98,8 +110,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { cfg := config.DefaultConfig() gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() s := NewSyncer( st, @@ -115,24 +127,24 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.DefaultBlockOptions(), make(chan error, 1), ) - + require.NoError(t, s.initializeState()) // Create header and data with correct hash data := makeData(gen.ChainID, 1, 2) // non-empty - _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data) + _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil) - err = s.validateBlock(header, data) + err = s.validateBlock(s.GetLastState(), data, header) require.NoError(t, err) // Create header and data with mismatched hash data = makeData(gen.ChainID, 1, 2) // non-empty - _, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil) - err = s.validateBlock(header, data) + _, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) + err = s.validateBlock(s.GetLastState(), data, header) require.Error(t, err) // Create header and empty data data = makeData(gen.ChainID, 1, 0) // empty - _, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil) - err = s.validateBlock(header, data) + _, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil) + err = s.validateBlock(s.GetLastState(), data, header) require.Error(t, err) } @@ -148,7 +160,9 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() + errChan := make(chan error, 1) s := NewSyncer( st, mockExec, @@ -161,7 +175,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NewMockBroadcaster[*types.Data](t), zerolog.Nop(), common.DefaultBlockOptions(), - make(chan error, 1), + errChan, ) require.NoError(t, s.initializeState()) @@ -170,7 +184,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { // Create signed header & data for height 1 lastState := s.GetLastState() data := makeData(gen.ChainID, 1, 0) - _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) // Expect ExecuteTxs call for height 1 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). @@ -179,6 +193,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} s.processHeightEvent(&evt) + requireEmptyChan(t, errChan) h, err := st.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h) @@ -198,7 +213,9 @@ func TestSequentialBlockSync(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() + errChan := make(chan error, 1) s := NewSyncer( st, mockExec, @@ -211,7 +228,7 @@ func TestSequentialBlockSync(t *testing.T) { common.NewMockBroadcaster[*types.Data](t), zerolog.Nop(), common.DefaultBlockOptions(), - make(chan error, 1), + errChan, ) require.NoError(t, s.initializeState()) s.ctx = context.Background() @@ -219,7 +236,7 @@ func TestSequentialBlockSync(t *testing.T) { // Sync two consecutive blocks via processHeightEvent so ExecuteTxs is called and state stored st0 := s.GetLastState() data1 := makeData(gen.ChainID, 1, 1) // non-empty - _, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1) + _, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1, st0.LastHeaderHash) // Expect ExecuteTxs call for height 1 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, st0.AppHash). Return([]byte("app1"), uint64(1024), nil).Once() @@ -228,7 +245,7 @@ func TestSequentialBlockSync(t *testing.T) { st1, _ := st.GetState(context.Background()) data2 := makeData(gen.ChainID, 2, 0) // empty data - _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2) + _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2, st1.LastHeaderHash) // Expect ExecuteTxs call for height 2 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(2), mock.Anything, st1.AppHash). Return([]byte("app2"), uint64(1024), nil).Once() @@ -254,6 +271,7 @@ func TestSequentialBlockSync(t *testing.T) { assert.True(t, ok) _, ok = cm.GetDataDAIncluded(data2.DACommitment().String()) assert.True(t, ok) + requireEmptyChan(t, errChan) } func TestSyncer_sendNonBlockingSignal(t *testing.T) { @@ -317,14 +335,16 @@ func TestSyncer_processPendingEvents(t *testing.T) { func TestSyncLoopPersistState(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + cfg := config.DefaultConfig() + cfg.ClearCache = true + + cacheMgr, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - myDAHeightOffset := uint64(1) - myFutureDAHeight := uint64(9) + const myDAHeightOffset = uint64(1) + const numBlocks = uint64(5) addr, pub, signer := buildSyncTestSigner(t) - cfg := config.DefaultConfig() gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr, DAStartHeight: myDAHeightOffset} dummyExec := execution.NewDummyExecutor() @@ -342,11 +362,12 @@ func TestSyncLoopPersistState(t *testing.T) { mockP2PDataStore := common.NewMockBroadcaster[*types.Data](t) mockP2PDataStore.EXPECT().Store().Return(mockDataStore).Maybe() + errorCh := make(chan error, 1) syncerInst1 := NewSyncer( st, dummyExec, nil, - cm, + cacheMgr, common.NopMetrics(), cfg, gen, @@ -354,7 +375,7 @@ func TestSyncLoopPersistState(t *testing.T) { mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), - make(chan error, 1), + errorCh, ) require.NoError(t, syncerInst1.initializeState()) @@ -366,25 +387,33 @@ func TestSyncLoopPersistState(t *testing.T) { syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched - for i := range myFutureDAHeight - myDAHeightOffset { - chainHeight, daHeight := i, i+myDAHeightOffset + var prevHeaderHash, prevAppHash []byte + prevAppHash, _, _ = execution.NewDummyExecutor().InitChain(t.Context(), gen.StartTime, gen.DAStartHeight, gen.ChainID) + for i := range numBlocks { + chainHeight, daHeight := gen.InitialHeight+i, i+myDAHeightOffset + blockTime := gen.StartTime.Add(time.Duration(chainHeight+1) * time.Second) emptyData := &types.Data{ Metadata: &types.Metadata{ ChainID: gen.ChainID, Height: chainHeight, - Time: uint64(time.Now().Add(time.Duration(chainHeight) * time.Second).UnixNano()), + Time: uint64(blockTime.UnixNano()), }, } - _, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, nil, emptyData) + _, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, prevAppHash, emptyData, prevHeaderHash) evts := []common.DAHeightEvent{{ Header: sigHeader, Data: emptyData, DaHeight: daHeight, }} daRtrMock.On("RetrieveFromDA", mock.Anything, daHeight).Return(evts, nil) + prevHeaderHash = sigHeader.Hash() + hasher := sha512.New() + hasher.Write(prevAppHash) + prevAppHash = hasher.Sum(nil) } // stop at next height + myFutureDAHeight := myDAHeightOffset + numBlocks daRtrMock.On("RetrieveFromDA", mock.Anything, myFutureDAHeight). Run(func(_ mock.Arguments) { // wait for consumer to catch up @@ -396,36 +425,39 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, coreda.ErrHeightFromFuture) go syncerInst1.processLoop() - - // dssync from DA until stop height reached + // sync from DA until stop height reached syncerInst1.syncLoop() + requireEmptyChan(t, errorCh) + t.Log("syncLoop on instance1 completed") + require.Equal(t, myFutureDAHeight, syncerInst1.GetDAHeight()) + lastStateDAHeight := syncerInst1.GetLastState().DAHeight // wait for all events consumed - require.NoError(t, cm.SaveToDisk()) + require.NoError(t, cacheMgr.SaveToDisk()) t.Log("processLoop on instance1 completed") // then daRtrMock.AssertExpectations(t) p2pHndlMock.AssertExpectations(t) + require.Len(t, syncerInst1.heightInCh, 0) // and all processed - verify no events remain at heights we tested - event1 := syncerInst1.cache.GetNextPendingEvent(1) - assert.Nil(t, event1) - event2 := syncerInst1.cache.GetNextPendingEvent(2) - assert.Nil(t, event2) - assert.Len(t, syncerInst1.heightInCh, 0) - + for i := range numBlocks { + blockHeight := gen.InitialHeight + i + 1 + event := syncerInst1.cache.GetNextPendingEvent(blockHeight) + require.Nil(t, event, "event at height %d should have been removed", blockHeight) + } // and when new instance is up on restart - cm, err = cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + cacheMgr, err = cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - require.NoError(t, cm.LoadFromDisk()) + require.NoError(t, cacheMgr.LoadFromDisk()) syncerInst2 := NewSyncer( st, dummyExec, nil, - cm, + cacheMgr, common.NopMetrics(), cfg, gen, @@ -436,7 +468,7 @@ func TestSyncLoopPersistState(t *testing.T) { make(chan error, 1), ) require.NoError(t, syncerInst2.initializeState()) - require.Equal(t, myFutureDAHeight-1, syncerInst2.GetDAHeight()) + require.Equal(t, lastStateDAHeight, syncerInst2.GetDAHeight()) ctx, cancel = context.WithCancel(t.Context()) t.Cleanup(cancel) @@ -450,7 +482,7 @@ func TestSyncLoopPersistState(t *testing.T) { Run(func(arg mock.Arguments) { cancel() // retrieve last one again - assert.Equal(t, myFutureDAHeight-1, arg.Get(1).(uint64)) + assert.Equal(t, lastStateDAHeight, arg.Get(1).(uint64)) }). Return(nil, nil) @@ -607,3 +639,12 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) { // Verify that GetLatestHeight was called (proves Replayer was invoked) mockExec.AssertCalled(t, "GetLatestHeight", mock.Anything) } + +func requireEmptyChan(t *testing.T, errorCh chan error) { + t.Helper() + select { + case err := <-errorCh: + t.Fatalf("syncLoop on instance1 failed: %v", err) + default: + } +} diff --git a/node/full.go b/node/full.go index ce1369e49..708121998 100644 --- a/node/full.go +++ b/node/full.go @@ -321,10 +321,11 @@ func (n *FullNode) Run(parentCtx context.Context) error { return fmt.Errorf("error while starting data sync service: %w", err) } + var runtimeErr error // Start the block components (blocking) if err := n.blockComponents.Start(ctx); err != nil { if !errors.Is(err, context.Canceled) { - n.Logger.Error().Err(err).Msg("unrecoverable error in block components") + runtimeErr = fmt.Errorf("running block components: %w", err) } else { n.Logger.Info().Msg("context canceled, stopping node") } @@ -338,12 +339,12 @@ func (n *FullNode) Run(parentCtx context.Context) error { shutdownCtx, cancel := context.WithTimeout(context.Background(), 9*time.Second) defer cancel() - var multiErr error // Use a multierror variable + var shutdownMultiErr error // Variable to accumulate multiple errors // Stop block components if err := n.blockComponents.Stop(); err != nil { n.Logger.Error().Err(err).Msg("error stopping block components") - multiErr = errors.Join(multiErr, fmt.Errorf("stopping block components: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("stopping block components: %w", err)) } // Stop Header Sync Service @@ -352,7 +353,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Log context canceled errors at a lower level if desired, or handle specific non-cancel errors if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { n.Logger.Error().Err(err).Msg("error stopping header sync service") - multiErr = errors.Join(multiErr, fmt.Errorf("stopping header sync service: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("stopping header sync service: %w", err)) } else { n.Logger.Debug().Err(err).Msg("header sync service stop context ended") // Log cancellation as debug } @@ -364,7 +365,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Log context canceled errors at a lower level if desired, or handle specific non-cancel errors if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { n.Logger.Error().Err(err).Msg("error stopping data sync service") - multiErr = errors.Join(multiErr, fmt.Errorf("stopping data sync service: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("stopping data sync service: %w", err)) } else { n.Logger.Debug().Err(err).Msg("data sync service stop context ended") // Log cancellation as debug } @@ -373,7 +374,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Stop P2P Client err = n.p2pClient.Close() if err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing P2P client: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("closing P2P client: %w", err)) } // Shutdown Prometheus Server @@ -381,7 +382,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { err = n.prometheusSrv.Shutdown(shutdownCtx) // http.ErrServerClosed is expected on graceful shutdown if err != nil && !errors.Is(err, http.ErrServerClosed) { - multiErr = errors.Join(multiErr, fmt.Errorf("shutting down Prometheus server: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down Prometheus server: %w", err)) } else { n.Logger.Debug().Err(err).Msg("Prometheus server shutdown context ended") } @@ -391,7 +392,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { if n.pprofSrv != nil { err = n.pprofSrv.Shutdown(shutdownCtx) if err != nil && !errors.Is(err, http.ErrServerClosed) { - multiErr = errors.Join(multiErr, fmt.Errorf("shutting down pprof server: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down pprof server: %w", err)) } else { n.Logger.Debug().Err(err).Msg("pprof server shutdown context ended") } @@ -401,7 +402,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { if n.rpcServer != nil { err = n.rpcServer.Shutdown(shutdownCtx) if err != nil && !errors.Is(err, http.ErrServerClosed) { - multiErr = errors.Join(multiErr, fmt.Errorf("shutting down RPC server: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down RPC server: %w", err)) } else { n.Logger.Debug().Err(err).Msg("RPC server shutdown context ended") } @@ -409,7 +410,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Ensure Store.Close is called last to maximize chance of data flushing if err = n.Store.Close(); err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing store: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("closing store: %w", err)) } else { n.Logger.Debug().Msg("store closed") } @@ -417,28 +418,27 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Save caches if needed if n.blockComponents != nil && n.blockComponents.Cache != nil { if err := n.blockComponents.Cache.SaveToDisk(); err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("saving caches: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("saving caches: %w", err)) } else { n.Logger.Debug().Msg("caches saved") } } // Log final status - if multiErr != nil { - for _, err := range multiErr.(interface{ Unwrap() []error }).Unwrap() { + if shutdownMultiErr != nil { + for _, err := range shutdownMultiErr.(interface{ Unwrap() []error }).Unwrap() { n.Logger.Error().Err(err).Msg("error during shutdown") } } else { n.Logger.Info().Msg("full node halted successfully") } - - // Return the original context error if it exists (e.g., context cancelled) - // or the combined shutdown error if the context cancellation was clean. - if ctx.Err() != nil { - return ctx.Err() + if runtimeErr != nil { + return runtimeErr } - - return multiErr // Return shutdown errors if context was okay + if shutdownMultiErr != nil { + return shutdownMultiErr + } + return ctx.Err() // context canceled } // GetGenesis returns entire genesis doc. diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index 8edbffed1..a214c1316 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -28,8 +28,9 @@ func TestTxGossipingMultipleNodesNoDA(t *testing.T) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start only the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the first block to be produced by the sequencer err := waitForFirstBlock(nodes[0], Header) @@ -40,7 +41,7 @@ func TestTxGossipingMultipleNodesNoDA(t *testing.T) { // Start the other nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -54,13 +55,13 @@ func TestTxGossipingMultipleNodesNoDA(t *testing.T) { if dummyExec, ok := coreExec.(interface{ InjectTx([]byte) }); ok { dummyExec.InjectTx([]byte("test tx")) } else { - t.Logf("Warning: Could not cast core executor to DummyExecutor, skipping transaction injection") + t.Fatal("Warning: Could not cast core executor to DummyExecutor") } } - blocksToWaitFor := uint64(3) // Wait for all nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, Store)) } @@ -88,8 +89,9 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start only the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the first block to be produced by the sequencer err := waitForFirstBlock(nodes[0], Header) @@ -103,7 +105,7 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) { // Start the other nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -119,13 +121,14 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) { dummyExec.InjectTx([]byte("test tx 2")) dummyExec.InjectTx([]byte("test tx 3")) } else { - t.Logf("Warning: Could not cast core executor to DummyExecutor, skipping transaction injection") + t.Fatalf("Could not cast core executor to DummyExecutor") } } blocksToWaitFor := uint64(5) // Wait for all nodes to reach at least blocksToWaitFor blocks with DA inclusion for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNDAIncludedHeight(nodeItem, blocksToWaitFor)) } @@ -159,8 +162,9 @@ func TestFastDASync(t *testing.T) { ctxs, cancels := createNodeContexts(len(nodes)) var runningWg sync.WaitGroup + errChan := make(chan error, len(nodes)) // Start only the first node - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the first node to produce a few blocks blocksToWaitFor := uint64(2) @@ -170,7 +174,7 @@ func TestFastDASync(t *testing.T) { time.Sleep(500 * time.Millisecond) // Now start the second node and time its sync - startNodeInBackground(t, nodes, ctxs, &runningWg, 1) + startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) start := time.Now() // Wait for the second node to catch up to the first node require.NoError(waitForAtLeastNBlocks(nodes[1], blocksToWaitFor, Store)) @@ -185,6 +189,7 @@ func TestFastDASync(t *testing.T) { "DA fast sync took %v, should be much faster than sequential block time %v (max reasonable: %v). ", syncDuration, expectedSequentialTime, maxReasonableSyncTime) + requireEmptyChan(t, errChan) // Verify both nodes are synced and that the synced block is DA-included assertAllNodesSynced(t, nodes, blocksToWaitFor) @@ -211,9 +216,10 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start only the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], Store)) @@ -223,7 +229,7 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { // Now start the other nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -235,6 +241,7 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { // Wait for all nodes to reach the target block height for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, Store)) } totalDuration := time.Since(start) @@ -306,9 +313,10 @@ func testSingleSequencerSingleFullNode(t *testing.T, source Source) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], source)) @@ -317,11 +325,12 @@ func testSingleSequencerSingleFullNode(t *testing.T, source Source) { time.Sleep(500 * time.Millisecond) // Start the full node - startNodeInBackground(t, nodes, ctxs, &runningWg, 1) + startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) blocksToWaitFor := uint64(3) // Wait for both nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) } @@ -347,9 +356,10 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], source)) @@ -359,7 +369,7 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { // Start the full nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -369,6 +379,7 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { blocksToWaitFor := uint64(3) // Wait for all nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) } @@ -396,9 +407,10 @@ func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], source)) @@ -426,11 +438,12 @@ func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) { time.Sleep(500 * time.Millisecond) // Start the full node - startNodeInBackground(t, nodes, ctxs, &runningWg, 1) + startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) blocksToWaitFor := uint64(3) // Wait for both nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) } @@ -491,9 +504,10 @@ func testTwoChainsInOneNamespace(t *testing.T, chainID1 string, chainID2 string) // Set up context and wait group for the sequencer of chain 1 ctxs1, cancels1 := createNodeContexts(1) var runningWg1 sync.WaitGroup + errChan := make(chan error, 2) // Start the sequencer of chain 1 - startNodeInBackground(t, nodes1, ctxs1, &runningWg1, 0) + startNodeInBackground(t, nodes1, ctxs1, &runningWg1, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes1[0], Store)) @@ -503,13 +517,14 @@ func testTwoChainsInOneNamespace(t *testing.T, chainID1 string, chainID2 string) var runningWg2 sync.WaitGroup // Start the sequencer of chain 2 - startNodeInBackground(t, nodes2, ctxs2, &runningWg2, 0) + startNodeInBackground(t, nodes2, ctxs2, &runningWg2, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes2[0], Store)) blocksToWaitFor := uint64(3) + requireEmptyChan(t, errChan) // Wait for the full node of chain 1 to reach at least blocksToWaitFor blocks require.NoError(waitForAtLeastNBlocks(nodes1[0], blocksToWaitFor, Store)) @@ -520,3 +535,12 @@ func testTwoChainsInOneNamespace(t *testing.T, chainID1 string, chainID2 string) shutdownAndWait(t, cancels1, &runningWg1, 5*time.Second) shutdownAndWait(t, cancels2, &runningWg2, 5*time.Second) } + +func requireEmptyChan(t *testing.T, errChan chan error) { + t.Helper() + select { + case err := <-errChan: + t.Fatalf("Error received: %v", err) + default: + } +} diff --git a/node/helpers_test.go b/node/helpers_test.go index b90dd8f1e..06d789060 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -172,6 +172,10 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey) require.NoError(err) + logger := zerolog.Nop() + if testing.Verbose() { + logger = zerolog.New(zerolog.NewTestWriter(t)) + } aggNode, err := NewNode( config, executor, @@ -182,7 +186,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), - zerolog.Nop(), + logger, NodeOptions{}, ) require.NoError(err) @@ -216,7 +220,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F genesis, dssync.MutexWrap(datastore.NewMapDatastore()), DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), - zerolog.Nop(), + logger, NodeOptions{}, ) require.NoError(err) @@ -246,13 +250,17 @@ func createNodeContexts(n int) ([]context.Context, []context.CancelFunc) { } // Helper to start a single node in a goroutine and add to wait group -func startNodeInBackground(t *testing.T, nodes []*FullNode, ctxs []context.Context, wg *sync.WaitGroup, idx int) { +func startNodeInBackground(t *testing.T, nodes []*FullNode, ctxs []context.Context, wg *sync.WaitGroup, idx int, errChan chan<- error) { wg.Add(1) go func(node *FullNode, ctx context.Context, idx int) { defer wg.Done() err := node.Run(ctx) if err != nil && !errors.Is(err, context.Canceled) { - t.Logf("Error running node %d: %v", idx, err) + if errChan != nil { + errChan <- err + } else { + t.Logf("Error running node %d: %v", idx, err) + } } }(nodes[idx], ctxs[idx], idx) } diff --git a/node/single_sequencer_integration_test.go b/node/single_sequencer_integration_test.go index 4e9716bdf..213760df6 100644 --- a/node/single_sequencer_integration_test.go +++ b/node/single_sequencer_integration_test.go @@ -240,7 +240,7 @@ func TestStateRecovery(t *testing.T) { defer cancel() // Start the sequencer first - startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0) + startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil) blocksToWaitFor := uint64(20) // Wait for the sequencer to produce at first block @@ -283,7 +283,7 @@ func TestMaxPendingHeadersAndData(t *testing.T) { defer cancel() var runningWg sync.WaitGroup - startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0) + startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil) // Wait blocks to be produced up to max pending numExtraBlocks := uint64(5) @@ -331,7 +331,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { defer cancel() var runningWg sync.WaitGroup - startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0) + startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil) // Wait for the node to start producing blocks waitForBlockN(t, 1, node, config.Node.BlockTime.Duration) diff --git a/pkg/store/batch.go b/pkg/store/batch.go index 3ee4c2613..405119c61 100644 --- a/pkg/store/batch.go +++ b/pkg/store/batch.go @@ -2,6 +2,7 @@ package store import ( "context" + "crypto/sha256" "fmt" ds "github.com/ipfs/go-datastore" @@ -69,9 +70,9 @@ func (b *DefaultBatch) SaveBlockData(header *types.SignedHeader, data *types.Dat return fmt.Errorf("failed to put signature blob in batch: %w", err) } - headerHash := types.HeaderHash(headerBlob) + headerHash := sha256.Sum256(headerBlob) heightBytes := encodeHeight(height) - if err := b.batch.Put(b.ctx, ds.NewKey(getIndexKey(headerHash)), heightBytes); err != nil { + if err := b.batch.Put(b.ctx, ds.NewKey(getIndexKey(headerHash[:])), heightBytes); err != nil { return fmt.Errorf("failed to put index key in batch: %w", err) } diff --git a/pkg/store/store.go b/pkg/store/store.go index d187be3fb..972b94e0e 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -2,6 +2,7 @@ package store import ( "context" + "crypto/sha256" "encoding/binary" "errors" "fmt" @@ -247,8 +248,8 @@ func (s *DefaultStore) Rollback(ctx context.Context, height uint64, aggregator b } // Use HeaderHash to avoid re-marshaling the header - hash := types.HeaderHash(headerBlob) - if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil { + headerHash := sha256.Sum256(headerBlob) + if err := batch.Delete(ctx, ds.NewKey(getIndexKey(headerHash[:]))); err != nil { return fmt.Errorf("failed to delete index key in batch: %w", err) } diff --git a/proto/evnode/v1/state.proto b/proto/evnode/v1/state.proto index 5deab3fcc..2aa302567 100644 --- a/proto/evnode/v1/state.proto +++ b/proto/evnode/v1/state.proto @@ -15,6 +15,7 @@ message State { google.protobuf.Timestamp last_block_time = 5; uint64 da_height = 6; bytes app_hash = 8; + bytes last_header_hash = 9; reserved 7; } diff --git a/test/e2e/.gitignore b/test/e2e/.gitignore new file mode 100644 index 000000000..719713a31 --- /dev/null +++ b/test/e2e/.gitignore @@ -0,0 +1 @@ +/testnet/ diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 3661464dc..cd96d6bd6 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -950,6 +950,7 @@ func restartSequencerAndFullNode(t *testing.T, sut *SystemUnderTest, sequencerHo fullNodeJwtSecretFile := filepath.Join(fullNodeHome, "jwt-secret.hex") sut.ExecCmd(evmSingleBinaryPath, "start", + "--evnode.log.format", "json", "--home", fullNodeHome, "--evm.jwt-secret-file", fullNodeJwtSecretFile, "--evm.genesis-hash", genesisHash, diff --git a/test/e2e/sut_helper.go b/test/e2e/sut_helper.go index ebf1249df..226e02a0c 100644 --- a/test/e2e/sut_helper.go +++ b/test/e2e/sut_helper.go @@ -51,6 +51,7 @@ func NewSystemUnderTest(t *testing.T) *SystemUnderTest { cmdToPids: make(map[string][]int), outBuff: ring.New(100), errBuff: ring.New(100), + debug: testing.Verbose(), } t.Cleanup(func() { if t.Failed() { @@ -103,7 +104,7 @@ func (s *SystemUnderTest) AwaitNodeUp(t *testing.T, rpcAddr string, timeout time require.NotNil(t, c) _, err := c.GetHealth(ctx) require.NoError(t, err) - }, timeout, timeout/10, "node is not up") + }, timeout, min(timeout/10, 200*time.Millisecond), "node is not up") } // AwaitNBlocks waits until the node has produced at least `n` blocks. @@ -153,16 +154,24 @@ func (s *SystemUnderTest) awaitProcessCleanup(cmd *exec.Cmd) { func (s *SystemUnderTest) watchLogs(cmd *exec.Cmd) { errReader, err := cmd.StderrPipe() - if err != nil { - panic(fmt.Sprintf("stderr reader error %#+v", err)) + require.NoError(s.t, err) + outReader, err := cmd.StdoutPipe() + require.NoError(s.t, err) + + if s.debug { + logDir := filepath.Join(WorkDir, "testnet") + require.NoError(s.t, os.MkdirAll(logDir, 0o750)) + testName := strings.ReplaceAll(s.t.Name(), "/", "-") + logfileName := filepath.Join(logDir, fmt.Sprintf("exec-%s-%s-%d.out", filepath.Base(cmd.Args[0]), testName, time.Now().UnixNano())) + logfile, err := os.Create(logfileName) + require.NoError(s.t, err) + errReader = io.NopCloser(io.TeeReader(errReader, logfile)) + outReader = io.NopCloser(io.TeeReader(outReader, logfile)) } + stopRingBuffer := make(chan struct{}) go appendToBuf(errReader, s.errBuff, stopRingBuffer) - outReader, err := cmd.StdoutPipe() - if err != nil { - panic(fmt.Sprintf("stdout reader error %#+v", err)) - } go appendToBuf(outReader, s.outBuff, stopRingBuffer) s.t.Cleanup(func() { close(stopRingBuffer) diff --git a/types/hashing.go b/types/hashing.go index 4147ea10a..4a7e88940 100644 --- a/types/hashing.go +++ b/types/hashing.go @@ -2,6 +2,7 @@ package types import ( "crypto/sha256" + "errors" "hash" ) @@ -9,21 +10,56 @@ var ( leafPrefix = []byte{0} ) -// Hash returns hash of the header -func (h *Header) Hash() Hash { +// HashSlim returns the SHA256 hash of the header using the slim (current) binary encoding. +func (h *Header) HashSlim() (Hash, error) { + if h == nil { + return nil, errors.New("header is nil") + } + bytes, err := h.MarshalBinary() if err != nil { - return nil + return nil, err } - return HeaderHash(bytes) + + hash := sha256.Sum256(bytes) + return hash[:], nil } -// HeaderHash returns the SHA256 hash of pre-marshaled header bytes. -// Use this function when you already have marshaled header bytes to avoid -// redundant marshaling operations. For convenience, use Header.Hash() instead. -func HeaderHash(bytes []byte) Hash { +// HashLegacy returns the SHA256 hash of the header using the legacy binary encoding that +// includes the deprecated fields. +func (h *Header) HashLegacy() (Hash, error) { + if h == nil { + return nil, errors.New("header is nil") + } + + bytes, err := h.MarshalBinaryLegacy() + if err != nil { + return nil, err + } + hash := sha256.Sum256(bytes) - return hash[:] + return hash[:], nil +} + +// Hash returns hash of the header +func (h *Header) Hash() Hash { + if h == nil { + return nil + } + + slimHash, err := h.HashSlim() + if err != nil { + return nil + } + + if h.Legacy != nil && !h.Legacy.IsZero() { + legacyHash, err := h.HashLegacy() + if err == nil { + return legacyHash + } + } + + return slimHash } // Hash returns hash of the Data diff --git a/types/hashing_test.go b/types/hashing_test.go index fd52c142b..a9677e3e5 100644 --- a/types/hashing_test.go +++ b/types/hashing_test.go @@ -1,6 +1,7 @@ package types import ( + "bytes" "crypto/sha256" "testing" @@ -34,6 +35,34 @@ func TestHeaderHash(t *testing.T) { assert.NotEqual(t, hash1, hash2, "Different headers should have different hashes") } +func TestHeaderHashLegacy(t *testing.T) { + legacyFields := &LegacyHeaderFields{ + LastCommitHash: bytes.Repeat([]byte{0x01}, 32), + ConsensusHash: bytes.Repeat([]byte{0x02}, 32), + LastResultsHash: bytes.Repeat([]byte{0x03}, 32), + } + + header := &Header{ + BaseHeader: BaseHeader{ + Height: 10, + Time: 987654321, + ChainID: "legacy-chain", + }, + DataHash: bytes.Repeat([]byte{0x04}, 32), + Legacy: legacyFields, + } + + hash := header.Hash() + + legacyBytes, err := header.MarshalBinaryLegacy() + require.NoError(t, err) + expected := sha256.Sum256(legacyBytes) + + assert.NotNil(t, hash) + assert.Len(t, hash, sha256.Size) + assert.Equal(t, Hash(expected[:]), hash, "Header hash should prefer legacy encoding when legacy fields are present") +} + // TestDataHash tests the Hash method of the Data. func TestDataHash(t *testing.T) { data := &Data{ @@ -110,7 +139,7 @@ func TestHeaderHashWithBytes(t *testing.T) { // Hash using the function directly headerBytes, err := header.MarshalBinary() require.NoError(t, err) - hash2 := HeaderHash(headerBytes) - - assert.Equal(t, hash1, hash2, "HeaderHash should produce same result as Header.Hash()") + var targetHeader Header + require.NoError(t, targetHeader.UnmarshalBinary(headerBytes)) + assert.Equal(t, hash1, targetHeader.Hash(), "HeaderHash should produce same result as Header.Hash()") } diff --git a/types/pb/evnode/v1/state.pb.go b/types/pb/evnode/v1/state.pb.go index 9714fce0a..ea7610b7d 100644 --- a/types/pb/evnode/v1/state.pb.go +++ b/types/pb/evnode/v1/state.pb.go @@ -32,6 +32,7 @@ type State struct { LastBlockTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=last_block_time,json=lastBlockTime,proto3" json:"last_block_time,omitempty"` DaHeight uint64 `protobuf:"varint,6,opt,name=da_height,json=daHeight,proto3" json:"da_height,omitempty"` AppHash []byte `protobuf:"bytes,8,opt,name=app_hash,json=appHash,proto3" json:"app_hash,omitempty"` + LastHeaderHash []byte `protobuf:"bytes,9,opt,name=last_header_hash,json=lastHeaderHash,proto3" json:"last_header_hash,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -115,11 +116,18 @@ func (x *State) GetAppHash() []byte { return nil } +func (x *State) GetLastHeaderHash() []byte { + if x != nil { + return x.LastHeaderHash + } + return nil +} + var File_evnode_v1_state_proto protoreflect.FileDescriptor const file_evnode_v1_state_proto_rawDesc = "" + "\n" + - "\x15evnode/v1/state.proto\x12\tevnode.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\"\xa5\x02\n" + + "\x15evnode/v1/state.proto\x12\tevnode.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\"\xcf\x02\n" + "\x05State\x12,\n" + "\aversion\x18\x01 \x01(\v2\x12.evnode.v1.VersionR\aversion\x12\x19\n" + "\bchain_id\x18\x02 \x01(\tR\achainId\x12%\n" + @@ -127,7 +135,8 @@ const file_evnode_v1_state_proto_rawDesc = "" + "\x11last_block_height\x18\x04 \x01(\x04R\x0flastBlockHeight\x12B\n" + "\x0flast_block_time\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\rlastBlockTime\x12\x1b\n" + "\tda_height\x18\x06 \x01(\x04R\bdaHeight\x12\x19\n" + - "\bapp_hash\x18\b \x01(\fR\aappHashJ\x04\b\a\x10\bB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\bapp_hash\x18\b \x01(\fR\aappHash\x12(\n" + + "\x10last_header_hash\x18\t \x01(\fR\x0elastHeaderHashJ\x04\b\a\x10\bB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_state_proto_rawDescOnce sync.Once diff --git a/types/serialization.go b/types/serialization.go index cfc4445c0..5e85736db 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -288,6 +288,7 @@ func (s *State) ToProto() (*pb.State, error) { LastBlockTime: timestamppb.New(s.LastBlockTime), DaHeight: s.DAHeight, AppHash: s.AppHash[:], + LastHeaderHash: s.LastHeaderHash[:], }, nil } @@ -317,6 +318,11 @@ func (s *State) FromProto(other *pb.State) error { } else { s.AppHash = nil } + if other.LastHeaderHash != nil { + s.LastHeaderHash = append([]byte(nil), other.LastHeaderHash...) + } else { + s.LastHeaderHash = nil + } s.DAHeight = other.GetDaHeight() return nil } diff --git a/types/state.go b/types/state.go index 374b119c9..a439f6c34 100644 --- a/types/state.go +++ b/types/state.go @@ -1,6 +1,8 @@ package types import ( + "bytes" + "fmt" "time" ) @@ -25,6 +27,9 @@ type State struct { LastBlockHeight uint64 LastBlockTime time.Time + // LastHeaderHash is the hash of the header of the last block + LastHeaderHash Hash + // DAHeight identifies DA block containing the latest applied Evolve block. DAHeight uint64 @@ -42,6 +47,39 @@ func (s *State) NextState(header Header, stateRoot []byte) (State, error) { LastBlockHeight: height, LastBlockTime: header.Time(), AppHash: stateRoot, + LastHeaderHash: header.Hash(), DAHeight: s.DAHeight, }, nil } + +// AssertValidForNextState performs common validation of a header and data against the current state. +// It assumes any context-specific basic header checks and verifier setup have already been performed +func (s State) AssertValidForNextState(header *SignedHeader, data *Data) error { + if header.ChainID() != s.ChainID { + return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) + } + + if err := Validate(header, data); err != nil { + return fmt.Errorf("header-data validation failed: %w", err) + } + + if len(s.LastHeaderHash) == 0 { // initial state + return nil + } + + if expdHeight := s.LastBlockHeight + 1; header.Height() != expdHeight { + return fmt.Errorf("invalid block height - got: %d, want: %d", header.Height(), expdHeight) + } + + if headerTime := header.Time(); s.LastBlockTime.After(headerTime) { + return fmt.Errorf("invalid block time - got: %v, last: %v", headerTime, s.LastBlockTime) + } + if !bytes.Equal(header.LastHeaderHash, s.LastHeaderHash) { + return fmt.Errorf("invalid last header hash - got: %x, want: %x", header.LastHeaderHash, s.LastHeaderHash) + } + if !bytes.Equal(header.AppHash, s.AppHash) { + return fmt.Errorf("invalid last app hash - got: %x, want: %x", header.AppHash, s.AppHash) + } + + return nil +} diff --git a/types/state_test.go b/types/state_test.go new file mode 100644 index 000000000..e07342c16 --- /dev/null +++ b/types/state_test.go @@ -0,0 +1,167 @@ +package types + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var dataHashForEmptyTxs = []byte{110, 52, 11, 156, 255, 179, 122, 152, 156, 165, 68, 230, 187, 120, 10, 44, 120, 144, 29, 63, 179, 55, 56, 118, 133, 17, 163, 6, 23, 175, 160, 29} + +func TestAssertValidForNextState(t *testing.T) { + // Define test table + now := time.Now() + nowUnixNano := uint64(now.UnixNano()) + testCases := map[string]struct { + state State + header *SignedHeader + data *Data + expectedError string + }{ + "valid initial state": { + state: State{ + ChainID: "test-chain", + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 1, + }, + DataHash: dataHashForEmptyTxs, + }, + }, + data: &Data{}, + expectedError: "", + }, + "chain ID mismatch": { + state: State{ + ChainID: "test-chain", + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "wrong-chain", Height: 1, + }, + DataHash: dataHashForEmptyTxs, + }, + }, + data: &Data{}, + expectedError: "invalid chain ID", + }, + "invalid block height": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockTime: now, + LastBlockHeight: 5, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 7, + Time: nowUnixNano + 1, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("hash"), + }, + }, + data: &Data{}, + expectedError: "invalid block height", + }, + "invalid block time": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockHeight: 1, + LastBlockTime: now, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", + Height: 2, + Time: nowUnixNano - 1, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("hash"), + }, + }, + data: &Data{}, + expectedError: "invalid block time", + }, + "invalid data hash": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockHeight: 1, + LastBlockTime: now, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", + Height: 2, + Time: nowUnixNano, + }, + DataHash: []byte("other-hash"), + LastHeaderHash: []byte("hash"), + }, + }, + data: &Data{}, + expectedError: "dataHash from the header does not match with hash", + }, + "last header hash mismatch": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockHeight: 1, + LastBlockTime: now, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 2, + Time: nowUnixNano, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("other-hash"), + }, + }, + data: &Data{}, + expectedError: "invalid last header hash", + }, + "app hash mismatch": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("expected-hash"), + LastBlockHeight: 1, + LastBlockTime: now, + AppHash: []byte("expected-app-hash"), + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 2, + Time: nowUnixNano, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("expected-hash"), + AppHash: []byte("wrong-app-hash"), + }, + }, + data: &Data{}, + expectedError: "invalid last app hash", + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := tc.state.AssertValidForNextState(tc.header, tc.data) + if tc.expectedError == "" { + assert.NoError(t, err) + return + } + assert.ErrorContains(t, err, tc.expectedError) + }) + } +}