From 4ad030861cb2a69c29c39df92ba98dd27fd7b958 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 15 Oct 2025 15:27:26 +0200 Subject: [PATCH 1/9] Validate block headers against state --- block/internal/syncing/syncer.go | 42 +++++++++++++++++++++++++------- test/e2e/sut_helper.go | 23 +++++++++++------ types/state.go | 4 +++ 3 files changed, 53 insertions(+), 16 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 0a9443c319..72bf2aeea8 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -380,7 +380,14 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { if err := s.trySyncNextBlock(event); err != nil { s.logger.Error().Err(err).Msg("failed to sync next block") // If the error is not due to an validation error, re-store the event as pending - if !errors.Is(err, errInvalidBlock) { + switch { + case errors.Is(err, errInvalidBlock): + // do not reschedule + case errors.Is(err, errInvalidState): + s.logger.Fatal().Uint64("block_height", event.Header.Height()). + Uint64("state_height", s.GetLastState().LastBlockHeight).Err(err). + Msg("Invalid state, shutting down") + default: s.cache.SetPendingEvent(height, event) } return @@ -397,8 +404,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { } } -// errInvalidBlock is returned when a block is failing validation -var errInvalidBlock = errors.New("invalid block") +var ( + // errInvalidBlock is returned when a block is failing validation + errInvalidBlock = errors.New("invalid block") + // errInvalidState is returned when the state has diverged from the DA blocks + errInvalidState = errors.New("invalid state") +) // trySyncNextBlock attempts to sync the next available block // the event is always the next block in sequence as processHeightEvent ensures it. @@ -419,10 +430,10 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish - if err := s.validateBlock(header, data); err != nil { + if err := s.validateBlock(header, data, currentState); err != nil { // remove header as da included (not per se needed, but keep cache clean) s.cache.RemoveHeaderDAIncluded(header.Hash().String()) - return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err)) + return err } // Apply block @@ -528,10 +539,7 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade // NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct // or if the data was gibberish and somehow passed all validation prior but the header was correct // we are still losing both in the pending event. This should never happen. -func (s *Syncer) validateBlock( - header *types.SignedHeader, - data *types.Data, -) error { +func (s *Syncer) validateBlock(header *types.SignedHeader, data *types.Data, state types.State, ) error { // Set custom verifier for aggregator node signature header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider) @@ -544,7 +552,23 @@ func (s *Syncer) validateBlock( if err := types.Validate(header, data); err != nil { return fmt.Errorf("header-data validation failed: %w", err) } + if state.LastBlockHeight < s.genesis.InitialHeight { + return nil + } + // Validate header against state + if header.Height() != state.LastBlockHeight+1 { + return fmt.Errorf("%w: invalid block height - got: %d, want: %d", errInvalidState, header.Height(), state.LastBlockHeight+1) + } + if !header.Time().After(state.LastBlockTime) { + return fmt.Errorf("%w: invalid block time - got: %v, last: %v", errInvalidState, header.Time(), state.LastBlockTime) + } + if !bytes.Equal(header.LastHeaderHash, state.LastHeaderHash) { + return fmt.Errorf("%w: invalid last header hash - got: %x, want: %x", errInvalidState, header.LastHeaderHash, header.LastHeaderHash) + } + if !bytes.Equal(header.AppHash, state.AppHash) { + return fmt.Errorf("%w: invalid last appHash hash - got: %x, want: %x", errInvalidState, header.AppHash, header.AppHash) + } return nil } diff --git a/test/e2e/sut_helper.go b/test/e2e/sut_helper.go index ebf1249dfa..226e02a0cb 100644 --- a/test/e2e/sut_helper.go +++ b/test/e2e/sut_helper.go @@ -51,6 +51,7 @@ func NewSystemUnderTest(t *testing.T) *SystemUnderTest { cmdToPids: make(map[string][]int), outBuff: ring.New(100), errBuff: ring.New(100), + debug: testing.Verbose(), } t.Cleanup(func() { if t.Failed() { @@ -103,7 +104,7 @@ func (s *SystemUnderTest) AwaitNodeUp(t *testing.T, rpcAddr string, timeout time require.NotNil(t, c) _, err := c.GetHealth(ctx) require.NoError(t, err) - }, timeout, timeout/10, "node is not up") + }, timeout, min(timeout/10, 200*time.Millisecond), "node is not up") } // AwaitNBlocks waits until the node has produced at least `n` blocks. @@ -153,16 +154,24 @@ func (s *SystemUnderTest) awaitProcessCleanup(cmd *exec.Cmd) { func (s *SystemUnderTest) watchLogs(cmd *exec.Cmd) { errReader, err := cmd.StderrPipe() - if err != nil { - panic(fmt.Sprintf("stderr reader error %#+v", err)) + require.NoError(s.t, err) + outReader, err := cmd.StdoutPipe() + require.NoError(s.t, err) + + if s.debug { + logDir := filepath.Join(WorkDir, "testnet") + require.NoError(s.t, os.MkdirAll(logDir, 0o750)) + testName := strings.ReplaceAll(s.t.Name(), "/", "-") + logfileName := filepath.Join(logDir, fmt.Sprintf("exec-%s-%s-%d.out", filepath.Base(cmd.Args[0]), testName, time.Now().UnixNano())) + logfile, err := os.Create(logfileName) + require.NoError(s.t, err) + errReader = io.NopCloser(io.TeeReader(errReader, logfile)) + outReader = io.NopCloser(io.TeeReader(outReader, logfile)) } + stopRingBuffer := make(chan struct{}) go appendToBuf(errReader, s.errBuff, stopRingBuffer) - outReader, err := cmd.StdoutPipe() - if err != nil { - panic(fmt.Sprintf("stdout reader error %#+v", err)) - } go appendToBuf(outReader, s.outBuff, stopRingBuffer) s.t.Cleanup(func() { close(stopRingBuffer) diff --git a/types/state.go b/types/state.go index d4597f7b73..887976c9cb 100644 --- a/types/state.go +++ b/types/state.go @@ -25,6 +25,9 @@ type State struct { LastBlockHeight uint64 LastBlockTime time.Time + // LastHeaderHash is the hash of the header of the last block + LastHeaderHash Hash + // DAHeight identifies DA block containing the latest applied Evolve block. DAHeight uint64 @@ -45,6 +48,7 @@ func (s *State) NextState(header Header, stateRoot []byte) (State, error) { LastBlockHeight: height, LastBlockTime: header.Time(), AppHash: stateRoot, + LastHeaderHash: header.Hash(), DAHeight: s.DAHeight, }, nil } From b930d3cdf1cfada9496453c0526bb0c8ac832429 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 15 Oct 2025 15:48:50 +0200 Subject: [PATCH 2/9] Review feedback --- block/internal/syncing/syncer.go | 6 +++--- test/e2e/.gitignore | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) create mode 100644 test/e2e/.gitignore diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 72bf2aeea8..ebfd7f85c0 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -386,7 +386,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { case errors.Is(err, errInvalidState): s.logger.Fatal().Uint64("block_height", event.Header.Height()). Uint64("state_height", s.GetLastState().LastBlockHeight).Err(err). - Msg("Invalid state, shutting down") + Msg("Invalid state detected - block references do not match local state. Manual intervention required.") default: s.cache.SetPendingEvent(height, event) } @@ -564,10 +564,10 @@ func (s *Syncer) validateBlock(header *types.SignedHeader, data *types.Data, sta return fmt.Errorf("%w: invalid block time - got: %v, last: %v", errInvalidState, header.Time(), state.LastBlockTime) } if !bytes.Equal(header.LastHeaderHash, state.LastHeaderHash) { - return fmt.Errorf("%w: invalid last header hash - got: %x, want: %x", errInvalidState, header.LastHeaderHash, header.LastHeaderHash) + return fmt.Errorf("%w: invalid last header hash - got: %x, want: %x", errInvalidState, header.LastHeaderHash, state.LastHeaderHash) } if !bytes.Equal(header.AppHash, state.AppHash) { - return fmt.Errorf("%w: invalid last appHash hash - got: %x, want: %x", errInvalidState, header.AppHash, header.AppHash) + return fmt.Errorf("%w: invalid last app hash - got: %x, want: %x", errInvalidState, header.AppHash, state.AppHash) } return nil } diff --git a/test/e2e/.gitignore b/test/e2e/.gitignore new file mode 100644 index 0000000000..719713a312 --- /dev/null +++ b/test/e2e/.gitignore @@ -0,0 +1 @@ +/testnet/ From 6737929a37a2ca33d1be3665f99238800effa760 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 17 Oct 2025 17:36:19 +0200 Subject: [PATCH 3/9] Store last header hash --- block/components.go | 2 + block/internal/executing/executor.go | 30 +--- block/internal/syncing/da_retriever_test.go | 16 +- block/internal/syncing/syncer.go | 29 +-- block/internal/syncing/syncer_backoff_test.go | 2 +- .../internal/syncing/syncer_benchmark_test.go | 2 +- block/internal/syncing/syncer_test.go | 45 +++-- proto/evnode/v1/state.proto | 1 + test/e2e/evm_full_node_e2e_test.go | 1 + test/e2e/evm_test_common.go | 1 + types/pb/evnode/v1/state.pb.go | 15 +- types/serialization.go | 6 + types/state.go | 34 ++++ types/state_test.go | 167 ++++++++++++++++++ 14 files changed, 269 insertions(+), 82 deletions(-) create mode 100644 types/state_test.go diff --git a/block/components.go b/block/components.go index 3ee2062acf..ea5db78bd0 100644 --- a/block/components.go +++ b/block/components.go @@ -137,6 +137,7 @@ func NewSyncComponents( metrics *Metrics, blockOpts BlockOptions, ) (*Components, error) { + logger.Info().Msg("Starting in sync-mode") cacheManager, err := cache.NewManager(config, store, logger) if err != nil { return nil, fmt.Errorf("failed to create cache manager: %w", err) @@ -200,6 +201,7 @@ func NewAggregatorComponents( metrics *Metrics, blockOpts BlockOptions, ) (*Components, error) { + logger.Info().Msg("Starting in aggregator-mode") cacheManager, err := cache.NewManager(config, store, logger) if err != nil { return nil, fmt.Errorf("failed to create cache manager: %w", err) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 00259b7050..9da392f47b 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -597,35 +597,7 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead return fmt.Errorf("invalid header: %w", err) } - // Validate header against data - if err := types.Validate(header, data); err != nil { - return fmt.Errorf("header-data validation failed: %w", err) - } - - // Check chain ID - if header.ChainID() != lastState.ChainID { - return fmt.Errorf("chain ID mismatch: expected %s, got %s", - lastState.ChainID, header.ChainID()) - } - - // Check height - expectedHeight := lastState.LastBlockHeight + 1 - if header.Height() != expectedHeight { - return fmt.Errorf("invalid height: expected %d, got %d", - expectedHeight, header.Height()) - } - - // Check timestamp - if header.Height() > 1 && lastState.LastBlockTime.After(header.Time()) { - return fmt.Errorf("block time must be strictly increasing") - } - - // Check app hash - if !bytes.Equal(header.AppHash, lastState.AppHash) { - return fmt.Errorf("app hash mismatch") - } - - return nil + return lastState.AssertValidForNextState(header, data) } // sendCriticalError sends a critical error to the error channel without blocking diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 0d97d5940f..c6e8daa78f 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -168,7 +168,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) - hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data) + hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil) events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77) require.Len(t, events, 1) @@ -196,7 +196,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) // Header with no data hash present should trigger empty data creation (per current logic) - hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil) + hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil) events := r.processBlobs(context.Background(), [][]byte{hb}, 88) require.Len(t, events, 1) @@ -223,7 +223,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) - hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil) + hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil, nil) gotH := r.tryDecodeHeader(hb, 123) require.NotNil(t, gotH) assert.Equal(t, sh.Hash().String(), gotH.Hash().String()) @@ -279,7 +279,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { // Prepare header/data blobs dataBin, data := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1) - hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data) + hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data, nil) cfg := config.DefaultConfig() cfg.DA.Namespace = "nsHdr" @@ -322,7 +322,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) - hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data) + hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil) // Process header from DA height 100 first events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100) @@ -361,9 +361,9 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin data4Bin, data4 := makeSignedDataBytes(t, gen.ChainID, 4, addr, pub, signer, 2) data5Bin, data5 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1) - hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data) - hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data) - hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data) + hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data, nil) + hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data, nil) + hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil) // Process multiple headers from DA height 200 - should be stored as pending events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index ebfd7f85c0..fa60b4234a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -159,6 +159,7 @@ func (s *Syncer) GetLastState() types.State { stateCopy := *state stateCopy.AppHash = bytes.Clone(state.AppHash) stateCopy.LastResultsHash = bytes.Clone(state.LastResultsHash) + stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash) return stateCopy } @@ -430,7 +431,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish - if err := s.validateBlock(header, data, currentState); err != nil { + if err := s.validateBlock(currentState, data, header); err != nil { // remove header as da included (not per se needed, but keep cache clean) s.cache.RemoveHeaderDAIncluded(header.Hash().String()) return err @@ -539,37 +540,15 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade // NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct // or if the data was gibberish and somehow passed all validation prior but the header was correct // we are still losing both in the pending event. This should never happen. -func (s *Syncer) validateBlock(header *types.SignedHeader, data *types.Data, state types.State, ) error { +func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error { // Set custom verifier for aggregator node signature header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider) - // Validate header with data if err := header.ValidateBasicWithData(data); err != nil { return fmt.Errorf("invalid header: %w", err) } - // Validate header against data - if err := types.Validate(header, data); err != nil { - return fmt.Errorf("header-data validation failed: %w", err) - } - if state.LastBlockHeight < s.genesis.InitialHeight { - return nil - } - // Validate header against state - if header.Height() != state.LastBlockHeight+1 { - return fmt.Errorf("%w: invalid block height - got: %d, want: %d", errInvalidState, header.Height(), state.LastBlockHeight+1) - } - - if !header.Time().After(state.LastBlockTime) { - return fmt.Errorf("%w: invalid block time - got: %v, last: %v", errInvalidState, header.Time(), state.LastBlockTime) - } - if !bytes.Equal(header.LastHeaderHash, state.LastHeaderHash) { - return fmt.Errorf("%w: invalid last header hash - got: %x, want: %x", errInvalidState, header.LastHeaderHash, state.LastHeaderHash) - } - if !bytes.Equal(header.AppHash, state.AppHash) { - return fmt.Errorf("%w: invalid last app hash - got: %x, want: %x", errInvalidState, header.AppHash, state.AppHash) - } - return nil + return currState.AssertValidForNextState(header, data) } // sendCriticalError sends a critical error to the error channel without blocking diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 7a9e80dbbd..ca80cab363 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -197,7 +197,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { Return(nil, errors.New("temporary failure")).Once() // Second call - success (should reset backoff and increment DA height) - _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil) + _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) data := &types.Data{ Metadata: &types.Metadata{ ChainID: gen.ChainID, diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 8c9cfea362..2e85a33f4d 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -120,7 +120,7 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay heightEvents := make([]common.DAHeightEvent, totalHeights) for i := uint64(0); i < totalHeights; i++ { blockHeight, daHeight := i+gen.InitialHeight, i+daHeightOffset - _, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil) + _, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil, nil) d := &types.Data{Metadata: &types.Metadata{ChainID: gen.ChainID, Height: blockHeight, Time: uint64(time.Now().UnixNano())}} heightEvents[i] = common.DAHeightEvent{Header: sh, Data: d, DaHeight: daHeight} } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 65ecfb674c..c58f28f631 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -3,6 +3,7 @@ package syncing import ( "context" crand "crypto/rand" + "crypto/sha512" "errors" "testing" "time" @@ -44,7 +45,17 @@ func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer } // makeSignedHeaderBytes builds a valid SignedHeader and returns its binary encoding and the object -func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, data *types.Data) ([]byte, *types.SignedHeader) { +func makeSignedHeaderBytes( + tb testing.TB, + chainID string, + height uint64, + proposer []byte, + pub crypto.PubKey, + signer signerpkg.Signer, + appHash []byte, + data *types.Data, + lastHeaderHash []byte, +) ([]byte, *types.SignedHeader) { time := uint64(time.Now().UnixNano()) dataHash := common.DataHashForEmptyTxs if data != nil { @@ -58,6 +69,7 @@ func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, propose AppHash: appHash, DataHash: dataHash, ProposerAddress: proposer, + LastHeaderHash: lastHeaderHash, }, Signer: types.Signer{PubKey: pub, Address: proposer}, } @@ -97,7 +109,6 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { cfg := config.DefaultConfig() gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - mockExec := testmocks.NewMockExecutor(t) s := NewSyncer( @@ -114,24 +125,24 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { common.DefaultBlockOptions(), make(chan error, 1), ) - + require.NoError(t, s.initializeState()) // Create header and data with correct hash data := makeData(gen.ChainID, 1, 2) // non-empty - _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data) + _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil) - err = s.validateBlock(header, data) + err = s.validateBlock(s.GetLastState(), data, header) require.NoError(t, err) // Create header and data with mismatched hash data = makeData(gen.ChainID, 1, 2) // non-empty - _, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil) - err = s.validateBlock(header, data) + _, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) + err = s.validateBlock(s.GetLastState(), data, header) require.Error(t, err) // Create header and empty data data = makeData(gen.ChainID, 1, 0) // empty - _, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil) - err = s.validateBlock(header, data) + _, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil) + err = s.validateBlock(s.GetLastState(), data, header) require.Error(t, err) } @@ -169,7 +180,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { // Create signed header & data for height 1 lastState := s.GetLastState() data := makeData(gen.ChainID, 1, 0) - _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) // Expect ExecuteTxs call for height 1 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). @@ -218,7 +229,7 @@ func TestSequentialBlockSync(t *testing.T) { // Sync two consecutive blocks via processHeightEvent so ExecuteTxs is called and state stored st0 := s.GetLastState() data1 := makeData(gen.ChainID, 1, 1) // non-empty - _, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1) + _, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1, st0.LastHeaderHash) // Expect ExecuteTxs call for height 1 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, st0.AppHash). Return([]byte("app1"), uint64(1024), nil).Once() @@ -227,7 +238,7 @@ func TestSequentialBlockSync(t *testing.T) { st1, _ := st.GetState(context.Background()) data2 := makeData(gen.ChainID, 2, 0) // empty data - _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2) + _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2, st1.LastHeaderHash) // Expect ExecuteTxs call for height 2 mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(2), mock.Anything, st1.AppHash). Return([]byte("app2"), uint64(1024), nil).Once() @@ -365,8 +376,9 @@ func TestSyncLoopPersistState(t *testing.T) { syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock // with n da blobs fetched + var prevHeaderHash, prevAppHash []byte for i := range myFutureDAHeight - myDAHeightOffset { - chainHeight, daHeight := i, i+myDAHeightOffset + chainHeight, daHeight := i+1, i+myDAHeightOffset emptyData := &types.Data{ Metadata: &types.Metadata{ ChainID: gen.ChainID, @@ -374,13 +386,17 @@ func TestSyncLoopPersistState(t *testing.T) { Time: uint64(time.Now().Add(time.Duration(chainHeight) * time.Second).UnixNano()), }, } - _, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, nil, emptyData) + _, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, prevAppHash, emptyData, prevHeaderHash) evts := []common.DAHeightEvent{{ Header: sigHeader, Data: emptyData, DaHeight: daHeight, }} daRtrMock.On("RetrieveFromDA", mock.Anything, daHeight).Return(evts, nil) + prevHeaderHash = sigHeader.Hash() + hasher := sha512.New() + hasher.Write(prevAppHash) + prevAppHash = hasher.Sum(nil) } // stop at next height @@ -395,7 +411,6 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, coreda.ErrHeightFromFuture) go syncerInst1.processLoop() - // dssync from DA until stop height reached syncerInst1.syncLoop() t.Log("syncLoop on instance1 completed") diff --git a/proto/evnode/v1/state.proto b/proto/evnode/v1/state.proto index 11af7f0cf3..49192872c0 100644 --- a/proto/evnode/v1/state.proto +++ b/proto/evnode/v1/state.proto @@ -16,4 +16,5 @@ message State { uint64 da_height = 6; bytes last_results_hash = 7; bytes app_hash = 8; + bytes LastHeaderHash = 9; } diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 2bff86d93c..2016773b04 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -948,6 +948,7 @@ func restartSequencerAndFullNode(t *testing.T, sut *SystemUnderTest, sequencerHo // Now restart the full node (without init - node already exists) sut.ExecCmd(evmSingleBinaryPath, "start", + "--evnode.log.format", "json", "--home", fullNodeHome, "--evm.jwt-secret", fullNodeJwtSecret, "--evm.genesis-hash", genesisHash, diff --git a/test/e2e/evm_test_common.go b/test/e2e/evm_test_common.go index c0f2e1fcba..ec1fe27d44 100644 --- a/test/e2e/evm_test_common.go +++ b/test/e2e/evm_test_common.go @@ -563,6 +563,7 @@ func restartDAAndSequencer(t *testing.T, sut *SystemUnderTest, sequencerHome, jw // Then restart the sequencer node (without init - node already exists) sut.ExecCmd(evmSingleBinaryPath, "start", + "--evnode.log.format", "json", "--evm.jwt-secret", jwtSecret, "--evm.genesis-hash", genesisHash, "--rollkit.node.block_time", DefaultBlockTime, diff --git a/types/pb/evnode/v1/state.pb.go b/types/pb/evnode/v1/state.pb.go index 18386afe78..ab9300c845 100644 --- a/types/pb/evnode/v1/state.pb.go +++ b/types/pb/evnode/v1/state.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.8 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: evnode/v1/state.proto @@ -33,6 +33,7 @@ type State struct { DaHeight uint64 `protobuf:"varint,6,opt,name=da_height,json=daHeight,proto3" json:"da_height,omitempty"` LastResultsHash []byte `protobuf:"bytes,7,opt,name=last_results_hash,json=lastResultsHash,proto3" json:"last_results_hash,omitempty"` AppHash []byte `protobuf:"bytes,8,opt,name=app_hash,json=appHash,proto3" json:"app_hash,omitempty"` + LastHeaderHash []byte `protobuf:"bytes,9,opt,name=LastHeaderHash,proto3" json:"LastHeaderHash,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -123,11 +124,18 @@ func (x *State) GetAppHash() []byte { return nil } +func (x *State) GetLastHeaderHash() []byte { + if x != nil { + return x.LastHeaderHash + } + return nil +} + var File_evnode_v1_state_proto protoreflect.FileDescriptor const file_evnode_v1_state_proto_rawDesc = "" + "\n" + - "\x15evnode/v1/state.proto\x12\tevnode.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\"\xcb\x02\n" + + "\x15evnode/v1/state.proto\x12\tevnode.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\"\xf3\x02\n" + "\x05State\x12,\n" + "\aversion\x18\x01 \x01(\v2\x12.evnode.v1.VersionR\aversion\x12\x19\n" + "\bchain_id\x18\x02 \x01(\tR\achainId\x12%\n" + @@ -136,7 +144,8 @@ const file_evnode_v1_state_proto_rawDesc = "" + "\x0flast_block_time\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\rlastBlockTime\x12\x1b\n" + "\tda_height\x18\x06 \x01(\x04R\bdaHeight\x12*\n" + "\x11last_results_hash\x18\a \x01(\fR\x0flastResultsHash\x12\x19\n" + - "\bapp_hash\x18\b \x01(\fR\aappHashB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\bapp_hash\x18\b \x01(\fR\aappHash\x12&\n" + + "\x0eLastHeaderHash\x18\t \x01(\fR\x0eLastHeaderHashB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_state_proto_rawDescOnce sync.Once diff --git a/types/serialization.go b/types/serialization.go index ee78a81654..3bb9904c13 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -288,6 +288,7 @@ func (s *State) ToProto() (*pb.State, error) { DaHeight: s.DAHeight, LastResultsHash: s.LastResultsHash[:], AppHash: s.AppHash[:], + LastHeaderHash: s.LastHeaderHash[:], }, nil } @@ -322,6 +323,11 @@ func (s *State) FromProto(other *pb.State) error { } else { s.AppHash = nil } + if other.LastHeaderHash != nil { + s.LastHeaderHash = append([]byte(nil), other.LastHeaderHash...) + } else { + s.LastHeaderHash = nil + } s.DAHeight = other.GetDaHeight() return nil } diff --git a/types/state.go b/types/state.go index 887976c9cb..ba0f372669 100644 --- a/types/state.go +++ b/types/state.go @@ -1,6 +1,8 @@ package types import ( + "bytes" + "fmt" "time" ) @@ -52,3 +54,35 @@ func (s *State) NextState(header Header, stateRoot []byte) (State, error) { DAHeight: s.DAHeight, }, nil } + +// AssertValidForNextState performs common validation of a header and data against the current state. +// It assumes any context-specific basic header checks and verifier setup have already been performed +func (s State) AssertValidForNextState(header *SignedHeader, data *Data) error { + if header.ChainID() != s.ChainID { + return fmt.Errorf("invalid chain ID - got %s, want %s", header.ChainID(), s.ChainID) + } + + if err := Validate(header, data); err != nil { + return fmt.Errorf("header-data validation failed: %w", err) + } + + if len(s.LastHeaderHash) == 0 { // initial state + return nil + } + + if expdHeight := s.LastBlockHeight + 1; header.Height() != expdHeight { + return fmt.Errorf("invalid block height - got: %d, want: %d", header.Height(), expdHeight) + } + + if s.LastBlockTime.After(header.Time()) { + return fmt.Errorf("invalid block time - got: %v, last: %v", header.Time(), s.LastBlockTime) + } + if !bytes.Equal(header.LastHeaderHash, s.LastHeaderHash) { + return fmt.Errorf("invalid last header hash - got: %x, want: %x", header.LastHeaderHash, s.LastHeaderHash) + } + if !bytes.Equal(header.AppHash, s.AppHash) { + return fmt.Errorf("invalid last app hash - got: %x, want: %x", header.AppHash, s.AppHash) + } + + return nil +} diff --git a/types/state_test.go b/types/state_test.go new file mode 100644 index 0000000000..e07342c164 --- /dev/null +++ b/types/state_test.go @@ -0,0 +1,167 @@ +package types + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var dataHashForEmptyTxs = []byte{110, 52, 11, 156, 255, 179, 122, 152, 156, 165, 68, 230, 187, 120, 10, 44, 120, 144, 29, 63, 179, 55, 56, 118, 133, 17, 163, 6, 23, 175, 160, 29} + +func TestAssertValidForNextState(t *testing.T) { + // Define test table + now := time.Now() + nowUnixNano := uint64(now.UnixNano()) + testCases := map[string]struct { + state State + header *SignedHeader + data *Data + expectedError string + }{ + "valid initial state": { + state: State{ + ChainID: "test-chain", + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 1, + }, + DataHash: dataHashForEmptyTxs, + }, + }, + data: &Data{}, + expectedError: "", + }, + "chain ID mismatch": { + state: State{ + ChainID: "test-chain", + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "wrong-chain", Height: 1, + }, + DataHash: dataHashForEmptyTxs, + }, + }, + data: &Data{}, + expectedError: "invalid chain ID", + }, + "invalid block height": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockTime: now, + LastBlockHeight: 5, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 7, + Time: nowUnixNano + 1, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("hash"), + }, + }, + data: &Data{}, + expectedError: "invalid block height", + }, + "invalid block time": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockHeight: 1, + LastBlockTime: now, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", + Height: 2, + Time: nowUnixNano - 1, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("hash"), + }, + }, + data: &Data{}, + expectedError: "invalid block time", + }, + "invalid data hash": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockHeight: 1, + LastBlockTime: now, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", + Height: 2, + Time: nowUnixNano, + }, + DataHash: []byte("other-hash"), + LastHeaderHash: []byte("hash"), + }, + }, + data: &Data{}, + expectedError: "dataHash from the header does not match with hash", + }, + "last header hash mismatch": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("hash"), + LastBlockHeight: 1, + LastBlockTime: now, + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 2, + Time: nowUnixNano, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("other-hash"), + }, + }, + data: &Data{}, + expectedError: "invalid last header hash", + }, + "app hash mismatch": { + state: State{ + ChainID: "test-chain", + LastHeaderHash: []byte("expected-hash"), + LastBlockHeight: 1, + LastBlockTime: now, + AppHash: []byte("expected-app-hash"), + }, + header: &SignedHeader{ + Header: Header{ + BaseHeader: BaseHeader{ + ChainID: "test-chain", Height: 2, + Time: nowUnixNano, + }, + DataHash: dataHashForEmptyTxs, + LastHeaderHash: []byte("expected-hash"), + AppHash: []byte("wrong-app-hash"), + }, + }, + data: &Data{}, + expectedError: "invalid last app hash", + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + err := tc.state.AssertValidForNextState(tc.header, tc.data) + if tc.expectedError == "" { + assert.NoError(t, err) + return + } + assert.ErrorContains(t, err, tc.expectedError) + }) + } +} From 74cc608d7aaaa93f831d37f3b85277c951536436 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 17 Oct 2025 17:50:08 +0200 Subject: [PATCH 4/9] Fix merge conflict --- block/internal/submitting/submitter_test.go | 2 +- block/internal/syncing/syncer.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 9da50af2af..ae9f0d215a 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -303,7 +303,7 @@ func newHeaderAndData(chainID string, height uint64, nonEmpty bool) (*types.Sign h := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}, ProposerAddress: []byte{1}}} d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}} if nonEmpty { - d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%d", now.UnixNano()))} + d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%s-%d-%d", chainID, height, now.UnixNano()))} } return h, d } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 2801f2ae02..ac4e866b41 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -158,11 +158,7 @@ func (s *Syncer) GetLastState() types.State { stateCopy := *state stateCopy.AppHash = bytes.Clone(state.AppHash) -<<<<<<< HEAD - stateCopy.LastResultsHash = bytes.Clone(state.LastResultsHash) stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash) -======= ->>>>>>> main return stateCopy } From ae4fe1f484744f90f6fa1cfc6a68c44f8456e092 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 17 Oct 2025 20:25:37 +0200 Subject: [PATCH 5/9] Review feedback --- block/internal/syncing/syncer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index ac4e866b41..2ba29201c4 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -433,7 +433,9 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { if err := s.validateBlock(currentState, data, header); err != nil { // remove header as da included (not per se needed, but keep cache clean) s.cache.RemoveHeaderDAIncluded(header.Hash().String()) - return err + if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { + return errors.Join(errInvalidBlock, err) + } } // Apply block From 11fbbee6d26c3917bdaa0e7be5d7d58b8c098195 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Mon, 27 Oct 2025 12:00:37 +0100 Subject: [PATCH 6/9] Review feedback --- block/internal/syncing/syncer.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 419b433ff9..309b5a33b8 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -390,9 +390,9 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) { case errors.Is(err, errInvalidBlock): // do not reschedule case errors.Is(err, errInvalidState): - s.logger.Fatal().Uint64("block_height", event.Header.Height()). - Uint64("state_height", s.GetLastState().LastBlockHeight).Err(err). - Msg("Invalid state detected - block references do not match local state. Manual intervention required.") + s.sendCriticalError(fmt.Errorf("invalid state detected (block-height %d, state-height %d) "+ + "- block references do not match local state. Manual intervention required: %w", event.Header.Height(), + s.GetLastState().LastBlockHeight, err)) default: s.cache.SetPendingEvent(height, event) } @@ -555,7 +555,10 @@ func (s *Syncer) validateBlock(currState types.State, data *types.Data, header * return fmt.Errorf("invalid header: %w", err) } - return currState.AssertValidForNextState(header, data) + if err := currState.AssertValidForNextState(header, data); err != nil { + return errors.Join(errInvalidState, err) + } + return nil } // sendCriticalError sends a critical error to the error channel without blocking From 70d9fb8327836b077980e42cb013d93f7a1cc86e Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 28 Oct 2025 11:25:40 +0100 Subject: [PATCH 7/9] Init chain on empty state; Forward runtime errors --- block/components.go | 3 +- block/internal/executing/executor.go | 2 +- block/internal/syncing/syncer.go | 24 ++++++-- block/internal/syncing/syncer_test.go | 74 +++++++++++++++-------- node/full.go | 40 ++++++------ node/full_node_integration_test.go | 62 +++++++++++++------ node/helpers_test.go | 16 +++-- node/single_sequencer_integration_test.go | 6 +- types/state.go | 4 +- 9 files changed, 152 insertions(+), 79 deletions(-) diff --git a/block/components.go b/block/components.go index ea5db78bd0..5e20ec9de4 100644 --- a/block/components.go +++ b/block/components.go @@ -46,7 +46,8 @@ func (bc *Components) GetLastState() types.State { return types.State{} } -// Start starts all components and monitors for critical errors +// Start starts all components and monitors for critical errors. +// It is blocking and returns when the context is cancelled or an error occurs func (bc *Components) Start(ctx context.Context) error { ctxWithCancel, cancel := context.WithCancel(ctx) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 1f3eb600ed..474263c27e 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -199,7 +199,7 @@ func (e *Executor) initializeState() error { LastBlockHeight: e.genesis.InitialHeight - 1, LastBlockTime: e.genesis.StartTime, AppHash: stateRoot, - DAHeight: 0, + DAHeight: e.genesis.DAStartHeight, } } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 309b5a33b8..e43681ea40 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -183,21 +183,34 @@ func (s *Syncer) initializeState() error { // Load state from store state, err := s.store.GetState(s.ctx) if err != nil { - // Use genesis state if no state exists + // Initialize new chain state for a fresh full node (no prior state on disk) + // Mirror executor initialization to ensure AppHash matches headers produced by the sequencer. + stateRoot, _, initErr := s.exec.InitChain( + s.ctx, + s.genesis.StartTime, + s.genesis.InitialHeight, + s.genesis.ChainID, + ) + if initErr != nil { + return fmt.Errorf("failed to initialize execution client: %w", initErr) + } + state = types.State{ ChainID: s.genesis.ChainID, InitialHeight: s.genesis.InitialHeight, LastBlockHeight: s.genesis.InitialHeight - 1, LastBlockTime: s.genesis.StartTime, - DAHeight: 0, + DAHeight: s.genesis.DAStartHeight, + AppHash: stateRoot, } } - + if state.DAHeight < s.genesis.DAStartHeight { + return fmt.Errorf("DA height (%d) is lower than DA start height (%d)", state.DAHeight, s.genesis.DAStartHeight) + } s.SetLastState(state) // Set DA height - daHeight := max(state.DAHeight, s.genesis.DAStartHeight) - s.SetDAHeight(daHeight) + s.SetDAHeight(state.DAHeight) s.logger.Info(). Uint64("height", state.LastBlockHeight). @@ -442,6 +455,7 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } + return err } // Apply block diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index bf0f83a19b..9114155cda 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -111,6 +111,7 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { cfg := config.DefaultConfig() gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() s := NewSyncer( st, @@ -159,7 +160,9 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() + errChan := make(chan error, 1) s := NewSyncer( st, mockExec, @@ -172,7 +175,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { common.NewMockBroadcaster[*types.Data](t), zerolog.Nop(), common.DefaultBlockOptions(), - make(chan error, 1), + errChan, ) require.NoError(t, s.initializeState()) @@ -190,6 +193,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1} s.processHeightEvent(&evt) + requireEmptyChan(t, errChan) h, err := st.Height(context.Background()) require.NoError(t, err) assert.Equal(t, uint64(1), h) @@ -209,7 +213,9 @@ func TestSequentialBlockSync(t *testing.T) { gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), uint64(1024), nil).Once() + errChan := make(chan error, 1) s := NewSyncer( st, mockExec, @@ -222,7 +228,7 @@ func TestSequentialBlockSync(t *testing.T) { common.NewMockBroadcaster[*types.Data](t), zerolog.Nop(), common.DefaultBlockOptions(), - make(chan error, 1), + errChan, ) require.NoError(t, s.initializeState()) s.ctx = context.Background() @@ -265,6 +271,7 @@ func TestSequentialBlockSync(t *testing.T) { assert.True(t, ok) _, ok = cm.GetDataDAIncluded(data2.DACommitment().String()) assert.True(t, ok) + requireEmptyChan(t, errChan) } func TestSyncer_sendNonBlockingSignal(t *testing.T) { @@ -328,14 +335,16 @@ func TestSyncer_processPendingEvents(t *testing.T) { func TestSyncLoopPersistState(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + cfg := config.DefaultConfig() + cfg.ClearCache = true + + cacheMgr, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - myDAHeightOffset := uint64(1) - myFutureDAHeight := uint64(9) + const myDAHeightOffset = uint64(1) + const numBlocks = uint64(5) addr, pub, signer := buildSyncTestSigner(t) - cfg := config.DefaultConfig() gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr, DAStartHeight: myDAHeightOffset} dummyExec := execution.NewDummyExecutor() @@ -353,11 +362,12 @@ func TestSyncLoopPersistState(t *testing.T) { mockP2PDataStore := common.NewMockBroadcaster[*types.Data](t) mockP2PDataStore.EXPECT().Store().Return(mockDataStore).Maybe() + errorCh := make(chan error, 1) syncerInst1 := NewSyncer( st, dummyExec, nil, - cm, + cacheMgr, common.NopMetrics(), cfg, gen, @@ -365,7 +375,7 @@ func TestSyncLoopPersistState(t *testing.T) { mockP2PDataStore, zerolog.Nop(), common.DefaultBlockOptions(), - make(chan error, 1), + errorCh, ) require.NoError(t, syncerInst1.initializeState()) @@ -378,13 +388,15 @@ func TestSyncLoopPersistState(t *testing.T) { // with n da blobs fetched var prevHeaderHash, prevAppHash []byte - for i := range myFutureDAHeight - myDAHeightOffset { - chainHeight, daHeight := i+1, i+myDAHeightOffset + prevAppHash, _, _ = execution.NewDummyExecutor().InitChain(t.Context(), gen.StartTime, gen.DAStartHeight, gen.ChainID) + for i := range numBlocks { + chainHeight, daHeight := gen.InitialHeight+i, i+myDAHeightOffset + blockTime := gen.StartTime.Add(time.Duration(chainHeight+1) * time.Second) emptyData := &types.Data{ Metadata: &types.Metadata{ ChainID: gen.ChainID, Height: chainHeight, - Time: uint64(time.Now().Add(time.Duration(chainHeight) * time.Second).UnixNano()), + Time: uint64(blockTime.UnixNano()), }, } _, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, prevAppHash, emptyData, prevHeaderHash) @@ -401,6 +413,7 @@ func TestSyncLoopPersistState(t *testing.T) { } // stop at next height + myFutureDAHeight := myDAHeightOffset + numBlocks daRtrMock.On("RetrieveFromDA", mock.Anything, myFutureDAHeight). Run(func(_ mock.Arguments) { // wait for consumer to catch up @@ -412,35 +425,39 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, coreda.ErrHeightFromFuture) go syncerInst1.processLoop() - // dssync from DA until stop height reached + // sync from DA until stop height reached syncerInst1.syncLoop() + requireEmptyChan(t, errorCh) + t.Log("syncLoop on instance1 completed") + require.Equal(t, myFutureDAHeight, syncerInst1.GetDAHeight()) + lastStateDAHeight := syncerInst1.GetLastState().DAHeight // wait for all events consumed - require.NoError(t, cm.SaveToDisk()) + require.NoError(t, cacheMgr.SaveToDisk()) t.Log("processLoop on instance1 completed") // then daRtrMock.AssertExpectations(t) p2pHndlMock.AssertExpectations(t) + require.Len(t, syncerInst1.heightInCh, 0) // and all processed - verify no events remain at heights we tested - event1 := syncerInst1.cache.GetNextPendingEvent(1) - assert.Nil(t, event1) - event2 := syncerInst1.cache.GetNextPendingEvent(2) - assert.Nil(t, event2) - assert.Len(t, syncerInst1.heightInCh, 0) - + for i := range numBlocks { + blockHeight := gen.InitialHeight + i + 1 + event := syncerInst1.cache.GetNextPendingEvent(blockHeight) + require.Nil(t, event, "event at height %d should have been removed", blockHeight) + } // and when new instance is up on restart - cm, err = cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + cacheMgr, err = cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - require.NoError(t, cm.LoadFromDisk()) + require.NoError(t, cacheMgr.LoadFromDisk()) syncerInst2 := NewSyncer( st, dummyExec, nil, - cm, + cacheMgr, common.NopMetrics(), cfg, gen, @@ -451,7 +468,7 @@ func TestSyncLoopPersistState(t *testing.T) { make(chan error, 1), ) require.NoError(t, syncerInst2.initializeState()) - require.Equal(t, myFutureDAHeight-1, syncerInst2.GetDAHeight()) + require.Equal(t, lastStateDAHeight, syncerInst2.GetDAHeight()) ctx, cancel = context.WithCancel(t.Context()) t.Cleanup(cancel) @@ -465,7 +482,7 @@ func TestSyncLoopPersistState(t *testing.T) { Run(func(arg mock.Arguments) { cancel() // retrieve last one again - assert.Equal(t, myFutureDAHeight-1, arg.Get(1).(uint64)) + assert.Equal(t, lastStateDAHeight, arg.Get(1).(uint64)) }). Return(nil, nil) @@ -622,3 +639,12 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) { // Verify that GetLatestHeight was called (proves Replayer was invoked) mockExec.AssertCalled(t, "GetLatestHeight", mock.Anything) } + +func requireEmptyChan(t *testing.T, errorCh chan error) { + t.Helper() + select { + case err := <-errorCh: + t.Fatalf("syncLoop on instance1 failed: %v", err) + default: + } +} diff --git a/node/full.go b/node/full.go index ce1369e49c..708121998c 100644 --- a/node/full.go +++ b/node/full.go @@ -321,10 +321,11 @@ func (n *FullNode) Run(parentCtx context.Context) error { return fmt.Errorf("error while starting data sync service: %w", err) } + var runtimeErr error // Start the block components (blocking) if err := n.blockComponents.Start(ctx); err != nil { if !errors.Is(err, context.Canceled) { - n.Logger.Error().Err(err).Msg("unrecoverable error in block components") + runtimeErr = fmt.Errorf("running block components: %w", err) } else { n.Logger.Info().Msg("context canceled, stopping node") } @@ -338,12 +339,12 @@ func (n *FullNode) Run(parentCtx context.Context) error { shutdownCtx, cancel := context.WithTimeout(context.Background(), 9*time.Second) defer cancel() - var multiErr error // Use a multierror variable + var shutdownMultiErr error // Variable to accumulate multiple errors // Stop block components if err := n.blockComponents.Stop(); err != nil { n.Logger.Error().Err(err).Msg("error stopping block components") - multiErr = errors.Join(multiErr, fmt.Errorf("stopping block components: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("stopping block components: %w", err)) } // Stop Header Sync Service @@ -352,7 +353,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Log context canceled errors at a lower level if desired, or handle specific non-cancel errors if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { n.Logger.Error().Err(err).Msg("error stopping header sync service") - multiErr = errors.Join(multiErr, fmt.Errorf("stopping header sync service: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("stopping header sync service: %w", err)) } else { n.Logger.Debug().Err(err).Msg("header sync service stop context ended") // Log cancellation as debug } @@ -364,7 +365,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Log context canceled errors at a lower level if desired, or handle specific non-cancel errors if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { n.Logger.Error().Err(err).Msg("error stopping data sync service") - multiErr = errors.Join(multiErr, fmt.Errorf("stopping data sync service: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("stopping data sync service: %w", err)) } else { n.Logger.Debug().Err(err).Msg("data sync service stop context ended") // Log cancellation as debug } @@ -373,7 +374,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Stop P2P Client err = n.p2pClient.Close() if err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing P2P client: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("closing P2P client: %w", err)) } // Shutdown Prometheus Server @@ -381,7 +382,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { err = n.prometheusSrv.Shutdown(shutdownCtx) // http.ErrServerClosed is expected on graceful shutdown if err != nil && !errors.Is(err, http.ErrServerClosed) { - multiErr = errors.Join(multiErr, fmt.Errorf("shutting down Prometheus server: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down Prometheus server: %w", err)) } else { n.Logger.Debug().Err(err).Msg("Prometheus server shutdown context ended") } @@ -391,7 +392,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { if n.pprofSrv != nil { err = n.pprofSrv.Shutdown(shutdownCtx) if err != nil && !errors.Is(err, http.ErrServerClosed) { - multiErr = errors.Join(multiErr, fmt.Errorf("shutting down pprof server: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down pprof server: %w", err)) } else { n.Logger.Debug().Err(err).Msg("pprof server shutdown context ended") } @@ -401,7 +402,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { if n.rpcServer != nil { err = n.rpcServer.Shutdown(shutdownCtx) if err != nil && !errors.Is(err, http.ErrServerClosed) { - multiErr = errors.Join(multiErr, fmt.Errorf("shutting down RPC server: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down RPC server: %w", err)) } else { n.Logger.Debug().Err(err).Msg("RPC server shutdown context ended") } @@ -409,7 +410,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Ensure Store.Close is called last to maximize chance of data flushing if err = n.Store.Close(); err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing store: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("closing store: %w", err)) } else { n.Logger.Debug().Msg("store closed") } @@ -417,28 +418,27 @@ func (n *FullNode) Run(parentCtx context.Context) error { // Save caches if needed if n.blockComponents != nil && n.blockComponents.Cache != nil { if err := n.blockComponents.Cache.SaveToDisk(); err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("saving caches: %w", err)) + shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("saving caches: %w", err)) } else { n.Logger.Debug().Msg("caches saved") } } // Log final status - if multiErr != nil { - for _, err := range multiErr.(interface{ Unwrap() []error }).Unwrap() { + if shutdownMultiErr != nil { + for _, err := range shutdownMultiErr.(interface{ Unwrap() []error }).Unwrap() { n.Logger.Error().Err(err).Msg("error during shutdown") } } else { n.Logger.Info().Msg("full node halted successfully") } - - // Return the original context error if it exists (e.g., context cancelled) - // or the combined shutdown error if the context cancellation was clean. - if ctx.Err() != nil { - return ctx.Err() + if runtimeErr != nil { + return runtimeErr } - - return multiErr // Return shutdown errors if context was okay + if shutdownMultiErr != nil { + return shutdownMultiErr + } + return ctx.Err() // context canceled } // GetGenesis returns entire genesis doc. diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index 8edbffed1d..a214c13163 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -28,8 +28,9 @@ func TestTxGossipingMultipleNodesNoDA(t *testing.T) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start only the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the first block to be produced by the sequencer err := waitForFirstBlock(nodes[0], Header) @@ -40,7 +41,7 @@ func TestTxGossipingMultipleNodesNoDA(t *testing.T) { // Start the other nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -54,13 +55,13 @@ func TestTxGossipingMultipleNodesNoDA(t *testing.T) { if dummyExec, ok := coreExec.(interface{ InjectTx([]byte) }); ok { dummyExec.InjectTx([]byte("test tx")) } else { - t.Logf("Warning: Could not cast core executor to DummyExecutor, skipping transaction injection") + t.Fatal("Warning: Could not cast core executor to DummyExecutor") } } - blocksToWaitFor := uint64(3) // Wait for all nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, Store)) } @@ -88,8 +89,9 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start only the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the first block to be produced by the sequencer err := waitForFirstBlock(nodes[0], Header) @@ -103,7 +105,7 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) { // Start the other nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -119,13 +121,14 @@ func TestTxGossipingMultipleNodesDAIncluded(t *testing.T) { dummyExec.InjectTx([]byte("test tx 2")) dummyExec.InjectTx([]byte("test tx 3")) } else { - t.Logf("Warning: Could not cast core executor to DummyExecutor, skipping transaction injection") + t.Fatalf("Could not cast core executor to DummyExecutor") } } blocksToWaitFor := uint64(5) // Wait for all nodes to reach at least blocksToWaitFor blocks with DA inclusion for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNDAIncludedHeight(nodeItem, blocksToWaitFor)) } @@ -159,8 +162,9 @@ func TestFastDASync(t *testing.T) { ctxs, cancels := createNodeContexts(len(nodes)) var runningWg sync.WaitGroup + errChan := make(chan error, len(nodes)) // Start only the first node - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the first node to produce a few blocks blocksToWaitFor := uint64(2) @@ -170,7 +174,7 @@ func TestFastDASync(t *testing.T) { time.Sleep(500 * time.Millisecond) // Now start the second node and time its sync - startNodeInBackground(t, nodes, ctxs, &runningWg, 1) + startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) start := time.Now() // Wait for the second node to catch up to the first node require.NoError(waitForAtLeastNBlocks(nodes[1], blocksToWaitFor, Store)) @@ -185,6 +189,7 @@ func TestFastDASync(t *testing.T) { "DA fast sync took %v, should be much faster than sequential block time %v (max reasonable: %v). ", syncDuration, expectedSequentialTime, maxReasonableSyncTime) + requireEmptyChan(t, errChan) // Verify both nodes are synced and that the synced block is DA-included assertAllNodesSynced(t, nodes, blocksToWaitFor) @@ -211,9 +216,10 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start only the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], Store)) @@ -223,7 +229,7 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { // Now start the other nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -235,6 +241,7 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { // Wait for all nodes to reach the target block height for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, Store)) } totalDuration := time.Since(start) @@ -306,9 +313,10 @@ func testSingleSequencerSingleFullNode(t *testing.T, source Source) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], source)) @@ -317,11 +325,12 @@ func testSingleSequencerSingleFullNode(t *testing.T, source Source) { time.Sleep(500 * time.Millisecond) // Start the full node - startNodeInBackground(t, nodes, ctxs, &runningWg, 1) + startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) blocksToWaitFor := uint64(3) // Wait for both nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) } @@ -347,9 +356,10 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], source)) @@ -359,7 +369,7 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { // Start the full nodes for i := 1; i < numNodes; i++ { - startNodeInBackground(t, nodes, ctxs, &runningWg, i) + startNodeInBackground(t, nodes, ctxs, &runningWg, i, errChan) // Add a small delay between starting nodes to avoid connection race if i < numNodes-1 { time.Sleep(100 * time.Millisecond) @@ -369,6 +379,7 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { blocksToWaitFor := uint64(3) // Wait for all nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) } @@ -396,9 +407,10 @@ func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) { ctxs, cancels := createNodeContexts(numNodes) var runningWg sync.WaitGroup + errChan := make(chan error, numNodes) // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0) + startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes[0], source)) @@ -426,11 +438,12 @@ func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) { time.Sleep(500 * time.Millisecond) // Start the full node - startNodeInBackground(t, nodes, ctxs, &runningWg, 1) + startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) blocksToWaitFor := uint64(3) // Wait for both nodes to reach at least blocksToWaitFor blocks for _, nodeItem := range nodes { + requireEmptyChan(t, errChan) require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) } @@ -491,9 +504,10 @@ func testTwoChainsInOneNamespace(t *testing.T, chainID1 string, chainID2 string) // Set up context and wait group for the sequencer of chain 1 ctxs1, cancels1 := createNodeContexts(1) var runningWg1 sync.WaitGroup + errChan := make(chan error, 2) // Start the sequencer of chain 1 - startNodeInBackground(t, nodes1, ctxs1, &runningWg1, 0) + startNodeInBackground(t, nodes1, ctxs1, &runningWg1, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes1[0], Store)) @@ -503,13 +517,14 @@ func testTwoChainsInOneNamespace(t *testing.T, chainID1 string, chainID2 string) var runningWg2 sync.WaitGroup // Start the sequencer of chain 2 - startNodeInBackground(t, nodes2, ctxs2, &runningWg2, 0) + startNodeInBackground(t, nodes2, ctxs2, &runningWg2, 0, errChan) // Wait for the sequencer to produce at first block require.NoError(waitForFirstBlock(nodes2[0], Store)) blocksToWaitFor := uint64(3) + requireEmptyChan(t, errChan) // Wait for the full node of chain 1 to reach at least blocksToWaitFor blocks require.NoError(waitForAtLeastNBlocks(nodes1[0], blocksToWaitFor, Store)) @@ -520,3 +535,12 @@ func testTwoChainsInOneNamespace(t *testing.T, chainID1 string, chainID2 string) shutdownAndWait(t, cancels1, &runningWg1, 5*time.Second) shutdownAndWait(t, cancels2, &runningWg2, 5*time.Second) } + +func requireEmptyChan(t *testing.T, errChan chan error) { + t.Helper() + select { + case err := <-errChan: + t.Fatalf("Error received: %v", err) + default: + } +} diff --git a/node/helpers_test.go b/node/helpers_test.go index b90dd8f1ef..06d789060f 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -172,6 +172,10 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey) require.NoError(err) + logger := zerolog.Nop() + if testing.Verbose() { + logger = zerolog.New(zerolog.NewTestWriter(t)) + } aggNode, err := NewNode( config, executor, @@ -182,7 +186,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), - zerolog.Nop(), + logger, NodeOptions{}, ) require.NoError(err) @@ -216,7 +220,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F genesis, dssync.MutexWrap(datastore.NewMapDatastore()), DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), - zerolog.Nop(), + logger, NodeOptions{}, ) require.NoError(err) @@ -246,13 +250,17 @@ func createNodeContexts(n int) ([]context.Context, []context.CancelFunc) { } // Helper to start a single node in a goroutine and add to wait group -func startNodeInBackground(t *testing.T, nodes []*FullNode, ctxs []context.Context, wg *sync.WaitGroup, idx int) { +func startNodeInBackground(t *testing.T, nodes []*FullNode, ctxs []context.Context, wg *sync.WaitGroup, idx int, errChan chan<- error) { wg.Add(1) go func(node *FullNode, ctx context.Context, idx int) { defer wg.Done() err := node.Run(ctx) if err != nil && !errors.Is(err, context.Canceled) { - t.Logf("Error running node %d: %v", idx, err) + if errChan != nil { + errChan <- err + } else { + t.Logf("Error running node %d: %v", idx, err) + } } }(nodes[idx], ctxs[idx], idx) } diff --git a/node/single_sequencer_integration_test.go b/node/single_sequencer_integration_test.go index 4e9716bdfa..213760df6e 100644 --- a/node/single_sequencer_integration_test.go +++ b/node/single_sequencer_integration_test.go @@ -240,7 +240,7 @@ func TestStateRecovery(t *testing.T) { defer cancel() // Start the sequencer first - startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0) + startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil) blocksToWaitFor := uint64(20) // Wait for the sequencer to produce at first block @@ -283,7 +283,7 @@ func TestMaxPendingHeadersAndData(t *testing.T) { defer cancel() var runningWg sync.WaitGroup - startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0) + startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil) // Wait blocks to be produced up to max pending numExtraBlocks := uint64(5) @@ -331,7 +331,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { defer cancel() var runningWg sync.WaitGroup - startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0) + startNodeInBackground(t, []*FullNode{node}, []context.Context{ctx}, &runningWg, 0, nil) // Wait for the node to start producing blocks waitForBlockN(t, 1, node, config.Node.BlockTime.Duration) diff --git a/types/state.go b/types/state.go index 389bc4f94b..a439f6c34c 100644 --- a/types/state.go +++ b/types/state.go @@ -71,8 +71,8 @@ func (s State) AssertValidForNextState(header *SignedHeader, data *Data) error { return fmt.Errorf("invalid block height - got: %d, want: %d", header.Height(), expdHeight) } - if s.LastBlockTime.After(header.Time()) { - return fmt.Errorf("invalid block time - got: %v, last: %v", header.Time(), s.LastBlockTime) + if headerTime := header.Time(); s.LastBlockTime.After(headerTime) { + return fmt.Errorf("invalid block time - got: %v, last: %v", headerTime, s.LastBlockTime) } if !bytes.Equal(header.LastHeaderHash, s.LastHeaderHash) { return fmt.Errorf("invalid last header hash - got: %x, want: %x", header.LastHeaderHash, s.LastHeaderHash) From 5d0265a4d0e77a42a57406c29d3285ecd30a7d6e Mon Sep 17 00:00:00 2001 From: Marko Date: Wed, 29 Oct 2025 17:52:15 +0100 Subject: [PATCH 8/9] fix: legacy header hashing (#2793) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Overview Updated header hashing to respect both encodings. Header.Hash() now tries the slim hash first and, when legacy fields are present, recomputes using the legacy binary payload so historical blocks keep their original digest. Added standalone HashSlim / HashLegacy helpers and a regression test that ensures the legacy path is chosen even when legacy hashes are zero-filled. Tests run: go test ./types. You may still see state mismatches from other code paths that compare hashes generated before this change—rerun your sync tests to confirm where the remaining divergence happens. --- types/hashing.go | 49 ++++++++++++++++++++++++++++++++++++++++--- types/hashing_test.go | 29 +++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/types/hashing.go b/types/hashing.go index f2c9006e1a..4a7e88940c 100644 --- a/types/hashing.go +++ b/types/hashing.go @@ -2,6 +2,7 @@ package types import ( "crypto/sha256" + "errors" "hash" ) @@ -9,14 +10,56 @@ var ( leafPrefix = []byte{0} ) +// HashSlim returns the SHA256 hash of the header using the slim (current) binary encoding. +func (h *Header) HashSlim() (Hash, error) { + if h == nil { + return nil, errors.New("header is nil") + } + + bytes, err := h.MarshalBinary() + if err != nil { + return nil, err + } + + hash := sha256.Sum256(bytes) + return hash[:], nil +} + +// HashLegacy returns the SHA256 hash of the header using the legacy binary encoding that +// includes the deprecated fields. +func (h *Header) HashLegacy() (Hash, error) { + if h == nil { + return nil, errors.New("header is nil") + } + + bytes, err := h.MarshalBinaryLegacy() + if err != nil { + return nil, err + } + + hash := sha256.Sum256(bytes) + return hash[:], nil +} + // Hash returns hash of the header func (h *Header) Hash() Hash { - bytes, err := h.MarshalBinary() + if h == nil { + return nil + } + + slimHash, err := h.HashSlim() if err != nil { return nil } - hash := sha256.Sum256(bytes) - return hash[:] + + if h.Legacy != nil && !h.Legacy.IsZero() { + legacyHash, err := h.HashLegacy() + if err == nil { + return legacyHash + } + } + + return slimHash } // Hash returns hash of the Data diff --git a/types/hashing_test.go b/types/hashing_test.go index 1bcc8691d2..49d230ba5a 100644 --- a/types/hashing_test.go +++ b/types/hashing_test.go @@ -1,6 +1,7 @@ package types import ( + "bytes" "crypto/sha256" "testing" @@ -34,6 +35,34 @@ func TestHeaderHash(t *testing.T) { assert.NotEqual(t, hash1, hash2, "Different headers should have different hashes") } +func TestHeaderHashLegacy(t *testing.T) { + legacyFields := &LegacyHeaderFields{ + LastCommitHash: bytes.Repeat([]byte{0x01}, 32), + ConsensusHash: bytes.Repeat([]byte{0x02}, 32), + LastResultsHash: bytes.Repeat([]byte{0x03}, 32), + } + + header := &Header{ + BaseHeader: BaseHeader{ + Height: 10, + Time: 987654321, + ChainID: "legacy-chain", + }, + DataHash: bytes.Repeat([]byte{0x04}, 32), + Legacy: legacyFields, + } + + hash := header.Hash() + + legacyBytes, err := header.MarshalBinaryLegacy() + require.NoError(t, err) + expected := sha256.Sum256(legacyBytes) + + assert.NotNil(t, hash) + assert.Len(t, hash, sha256.Size) + assert.Equal(t, Hash(expected[:]), hash, "Header hash should prefer legacy encoding when legacy fields are present") +} + // TestDataHash tests the Hash method of the Data. func TestDataHash(t *testing.T) { data := &Data{ From e04714ab9063450277a2e513e9f6347bc69da638 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 30 Oct 2025 13:28:23 +0100 Subject: [PATCH 9/9] Review feedback; minor fixes --- pkg/store/batch.go | 5 +++-- pkg/store/store.go | 5 +++-- proto/evnode/v1/state.proto | 2 +- types/hashing_test.go | 6 +++--- types/pb/evnode/v1/state.pb.go | 8 ++++---- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/store/batch.go b/pkg/store/batch.go index 3ee4c26137..405119c612 100644 --- a/pkg/store/batch.go +++ b/pkg/store/batch.go @@ -2,6 +2,7 @@ package store import ( "context" + "crypto/sha256" "fmt" ds "github.com/ipfs/go-datastore" @@ -69,9 +70,9 @@ func (b *DefaultBatch) SaveBlockData(header *types.SignedHeader, data *types.Dat return fmt.Errorf("failed to put signature blob in batch: %w", err) } - headerHash := types.HeaderHash(headerBlob) + headerHash := sha256.Sum256(headerBlob) heightBytes := encodeHeight(height) - if err := b.batch.Put(b.ctx, ds.NewKey(getIndexKey(headerHash)), heightBytes); err != nil { + if err := b.batch.Put(b.ctx, ds.NewKey(getIndexKey(headerHash[:])), heightBytes); err != nil { return fmt.Errorf("failed to put index key in batch: %w", err) } diff --git a/pkg/store/store.go b/pkg/store/store.go index d187be3fb7..972b94e0e4 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -2,6 +2,7 @@ package store import ( "context" + "crypto/sha256" "encoding/binary" "errors" "fmt" @@ -247,8 +248,8 @@ func (s *DefaultStore) Rollback(ctx context.Context, height uint64, aggregator b } // Use HeaderHash to avoid re-marshaling the header - hash := types.HeaderHash(headerBlob) - if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil { + headerHash := sha256.Sum256(headerBlob) + if err := batch.Delete(ctx, ds.NewKey(getIndexKey(headerHash[:]))); err != nil { return fmt.Errorf("failed to delete index key in batch: %w", err) } diff --git a/proto/evnode/v1/state.proto b/proto/evnode/v1/state.proto index 461366e9da..2aa3025676 100644 --- a/proto/evnode/v1/state.proto +++ b/proto/evnode/v1/state.proto @@ -15,7 +15,7 @@ message State { google.protobuf.Timestamp last_block_time = 5; uint64 da_height = 6; bytes app_hash = 8; - bytes LastHeaderHash = 9; + bytes last_header_hash = 9; reserved 7; } diff --git a/types/hashing_test.go b/types/hashing_test.go index 32b5104b15..a9677e3e54 100644 --- a/types/hashing_test.go +++ b/types/hashing_test.go @@ -139,7 +139,7 @@ func TestHeaderHashWithBytes(t *testing.T) { // Hash using the function directly headerBytes, err := header.MarshalBinary() require.NoError(t, err) - hash2 := HeaderHash(headerBytes) - - assert.Equal(t, hash1, hash2, "HeaderHash should produce same result as Header.Hash()") + var targetHeader Header + require.NoError(t, targetHeader.UnmarshalBinary(headerBytes)) + assert.Equal(t, hash1, targetHeader.Hash(), "HeaderHash should produce same result as Header.Hash()") } diff --git a/types/pb/evnode/v1/state.pb.go b/types/pb/evnode/v1/state.pb.go index 097786fbee..ea7610b7d9 100644 --- a/types/pb/evnode/v1/state.pb.go +++ b/types/pb/evnode/v1/state.pb.go @@ -32,7 +32,7 @@ type State struct { LastBlockTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=last_block_time,json=lastBlockTime,proto3" json:"last_block_time,omitempty"` DaHeight uint64 `protobuf:"varint,6,opt,name=da_height,json=daHeight,proto3" json:"da_height,omitempty"` AppHash []byte `protobuf:"bytes,8,opt,name=app_hash,json=appHash,proto3" json:"app_hash,omitempty"` - LastHeaderHash []byte `protobuf:"bytes,9,opt,name=LastHeaderHash,proto3" json:"LastHeaderHash,omitempty"` + LastHeaderHash []byte `protobuf:"bytes,9,opt,name=last_header_hash,json=lastHeaderHash,proto3" json:"last_header_hash,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -127,7 +127,7 @@ var File_evnode_v1_state_proto protoreflect.FileDescriptor const file_evnode_v1_state_proto_rawDesc = "" + "\n" + - "\x15evnode/v1/state.proto\x12\tevnode.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\"\xcd\x02\n" + + "\x15evnode/v1/state.proto\x12\tevnode.v1\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x16evnode/v1/evnode.proto\"\xcf\x02\n" + "\x05State\x12,\n" + "\aversion\x18\x01 \x01(\v2\x12.evnode.v1.VersionR\aversion\x12\x19\n" + "\bchain_id\x18\x02 \x01(\tR\achainId\x12%\n" + @@ -135,8 +135,8 @@ const file_evnode_v1_state_proto_rawDesc = "" + "\x11last_block_height\x18\x04 \x01(\x04R\x0flastBlockHeight\x12B\n" + "\x0flast_block_time\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\rlastBlockTime\x12\x1b\n" + "\tda_height\x18\x06 \x01(\x04R\bdaHeight\x12\x19\n" + - "\bapp_hash\x18\b \x01(\fR\aappHash\x12&\n" + - "\x0eLastHeaderHash\x18\t \x01(\fR\x0eLastHeaderHashJ\x04\b\a\x10\bB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\bapp_hash\x18\b \x01(\fR\aappHash\x12(\n" + + "\x10last_header_hash\x18\t \x01(\fR\x0elastHeaderHashJ\x04\b\a\x10\bB/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_state_proto_rawDescOnce sync.Once