Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewSyncComponents(
metrics *Metrics,
blockOpts BlockOptions,
) (*Components, error) {
logger.Info().Msg("Starting in sync-mode")
cacheManager, err := cache.NewManager(config, store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create cache manager: %w", err)
Expand Down Expand Up @@ -200,6 +201,7 @@ func NewAggregatorComponents(
metrics *Metrics,
blockOpts BlockOptions,
) (*Components, error) {
logger.Info().Msg("Starting in aggregator-mode")
cacheManager, err := cache.NewManager(config, store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create cache manager: %w", err)
Expand Down
30 changes: 1 addition & 29 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,35 +595,7 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead
return fmt.Errorf("invalid header: %w", err)
}

// Validate header against data
if err := types.Validate(header, data); err != nil {
return fmt.Errorf("header-data validation failed: %w", err)
}

// Check chain ID
if header.ChainID() != lastState.ChainID {
return fmt.Errorf("chain ID mismatch: expected %s, got %s",
lastState.ChainID, header.ChainID())
}

// Check height
expectedHeight := lastState.LastBlockHeight + 1
if header.Height() != expectedHeight {
return fmt.Errorf("invalid height: expected %d, got %d",
expectedHeight, header.Height())
}

// Check timestamp
if header.Height() > 1 && lastState.LastBlockTime.After(header.Time()) {
return fmt.Errorf("block time must be strictly increasing")
}

// Check app hash
if !bytes.Equal(header.AppHash, lastState.AppHash) {
return fmt.Errorf("app hash mismatch")
}

return nil
return lastState.AssertValidForNextState(header, data)
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
2 changes: 1 addition & 1 deletion block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func newHeaderAndData(chainID string, height uint64, nonEmpty bool) (*types.Sign
h := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}, ProposerAddress: []byte{1}}}
d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}}
if nonEmpty {
d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%d", now.UnixNano()))}
d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%s-%d-%d", chainID, height, now.UnixNano()))}
}
return h, d
}
Expand Down
16 changes: 8 additions & 8 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil)

events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
require.Len(t, events, 1)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Header with no data hash present should trigger empty data creation (per current logic)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil)

events := r.processBlobs(context.Background(), [][]byte{hb}, 88)
require.Len(t, events, 1)
Expand All @@ -223,7 +223,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil)
hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil, nil)
gotH := r.tryDecodeHeader(hb, 123)
require.NotNil(t, gotH)
assert.Equal(t, sh.Hash().String(), gotH.Hash().String())
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {

// Prepare header/data blobs
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data, nil)

cfg := config.DefaultConfig()
cfg.DA.Namespace = "nsHdr"
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {

// Create header and data for the same block height but from different DA heights
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil)

// Process header from DA height 100 first
events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100)
Expand Down Expand Up @@ -361,9 +361,9 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
data4Bin, data4 := makeSignedDataBytes(t, gen.ChainID, 4, addr, pub, signer, 2)
data5Bin, data5 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1)

hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data)
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data)
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data)
hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data, nil)
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data, nil)
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil)

// Process multiple headers from DA height 200 - should be stored as pending
events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
Expand Down
37 changes: 21 additions & 16 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (s *Syncer) GetLastState() types.State {

stateCopy := *state
stateCopy.AppHash = bytes.Clone(state.AppHash)
stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash)

return stateCopy
}
Expand Down Expand Up @@ -379,7 +380,14 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
if err := s.trySyncNextBlock(event); err != nil {
s.logger.Error().Err(err).Msg("failed to sync next block")
// If the error is not due to an validation error, re-store the event as pending
if !errors.Is(err, errInvalidBlock) {
switch {
case errors.Is(err, errInvalidBlock):
// do not reschedule
case errors.Is(err, errInvalidState):
s.logger.Fatal().Uint64("block_height", event.Header.Height()).
Uint64("state_height", s.GetLastState().LastBlockHeight).Err(err).
Msg("Invalid state detected - block references do not match local state. Manual intervention required.")
default:
s.cache.SetPendingEvent(height, event)
}
return
Expand All @@ -396,8 +404,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
}
}

// errInvalidBlock is returned when a block is failing validation
var errInvalidBlock = errors.New("invalid block")
var (
// errInvalidBlock is returned when a block is failing validation
errInvalidBlock = errors.New("invalid block")
// errInvalidState is returned when the state has diverged from the DA blocks
errInvalidState = errors.New("invalid state")
)

// trySyncNextBlock attempts to sync the next available block
// the event is always the next block in sequence as processHeightEvent ensures it.
Expand All @@ -418,10 +430,12 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
// Compared to the executor logic where the current block needs to be applied first,
// here only the previous block needs to be applied to proceed to the verification.
// The header validation must be done before applying the block to avoid executing gibberish
if err := s.validateBlock(header, data); err != nil {
if err := s.validateBlock(currentState, data, header); err != nil {
// remove header as da included (not per se needed, but keep cache clean)
s.cache.RemoveHeaderDAIncluded(header.Hash().String())
return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err))
if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) {
return errors.Join(errInvalidBlock, err)
}
}

// Apply block
Expand Down Expand Up @@ -527,24 +541,15 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade
// NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct
// or if the data was gibberish and somehow passed all validation prior but the header was correct
// we are still losing both in the pending event. This should never happen.
func (s *Syncer) validateBlock(
header *types.SignedHeader,
data *types.Data,
) error {
func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error {
// Set custom verifier for aggregator node signature
header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider)

// Validate header with data
if err := header.ValidateBasicWithData(data); err != nil {
return fmt.Errorf("invalid header: %w", err)
}

// Validate header against data
if err := types.Validate(header, data); err != nil {
return fmt.Errorf("header-data validation failed: %w", err)
}

return nil
return currState.AssertValidForNextState(header, data)
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
Return(nil, errors.New("temporary failure")).Once()

// Second call - success (should reset backoff and increment DA height)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
data := &types.Data{
Metadata: &types.Metadata{
ChainID: gen.ChainID,
Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
heightEvents := make([]common.DAHeightEvent, totalHeights)
for i := uint64(0); i < totalHeights; i++ {
blockHeight, daHeight := i+gen.InitialHeight, i+daHeightOffset
_, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil)
_, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil, nil)
d := &types.Data{Metadata: &types.Metadata{ChainID: gen.ChainID, Height: blockHeight, Time: uint64(time.Now().UnixNano())}}
heightEvents[i] = common.DAHeightEvent{Header: sh, Data: d, DaHeight: daHeight}
}
Expand Down
45 changes: 30 additions & 15 deletions block/internal/syncing/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncing
import (
"context"
crand "crypto/rand"
"crypto/sha512"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -44,7 +45,17 @@ func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer
}

// makeSignedHeaderBytes builds a valid SignedHeader and returns its binary encoding and the object
func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, data *types.Data) ([]byte, *types.SignedHeader) {
func makeSignedHeaderBytes(
tb testing.TB,
chainID string,
height uint64,
proposer []byte,
pub crypto.PubKey,
signer signerpkg.Signer,
appHash []byte,
data *types.Data,
lastHeaderHash []byte,
) ([]byte, *types.SignedHeader) {
time := uint64(time.Now().UnixNano())
dataHash := common.DataHashForEmptyTxs
if data != nil {
Expand All @@ -58,6 +69,7 @@ func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, propose
AppHash: appHash,
DataHash: dataHash,
ProposerAddress: proposer,
LastHeaderHash: lastHeaderHash,
},
Signer: types.Signer{PubKey: pub, Address: proposer},
}
Expand Down Expand Up @@ -97,7 +109,6 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {

cfg := config.DefaultConfig()
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

mockExec := testmocks.NewMockExecutor(t)

s := NewSyncer(
Expand All @@ -114,24 +125,24 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {
common.DefaultBlockOptions(),
make(chan error, 1),
)

require.NoError(t, s.initializeState())
// Create header and data with correct hash
data := makeData(gen.ChainID, 1, 2) // non-empty
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil)

err = s.validateBlock(header, data)
err = s.validateBlock(s.GetLastState(), data, header)
require.NoError(t, err)

// Create header and data with mismatched hash
data = makeData(gen.ChainID, 1, 2) // non-empty
_, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil)
err = s.validateBlock(header, data)
_, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
err = s.validateBlock(s.GetLastState(), data, header)
require.Error(t, err)

// Create header and empty data
data = makeData(gen.ChainID, 1, 0) // empty
_, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil)
err = s.validateBlock(header, data)
_, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil)
err = s.validateBlock(s.GetLastState(), data, header)
require.Error(t, err)
}

Expand Down Expand Up @@ -169,7 +180,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) {
// Create signed header & data for height 1
lastState := s.GetLastState()
data := makeData(gen.ChainID, 1, 0)
_, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data)
_, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil)

// Expect ExecuteTxs call for height 1
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash).
Expand Down Expand Up @@ -218,7 +229,7 @@ func TestSequentialBlockSync(t *testing.T) {
// Sync two consecutive blocks via processHeightEvent so ExecuteTxs is called and state stored
st0 := s.GetLastState()
data1 := makeData(gen.ChainID, 1, 1) // non-empty
_, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1)
_, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1, st0.LastHeaderHash)
// Expect ExecuteTxs call for height 1
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, st0.AppHash).
Return([]byte("app1"), uint64(1024), nil).Once()
Expand All @@ -227,7 +238,7 @@ func TestSequentialBlockSync(t *testing.T) {

st1, _ := st.GetState(context.Background())
data2 := makeData(gen.ChainID, 2, 0) // empty data
_, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2)
_, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2, st1.LastHeaderHash)
// Expect ExecuteTxs call for height 2
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(2), mock.Anything, st1.AppHash).
Return([]byte("app2"), uint64(1024), nil).Once()
Expand Down Expand Up @@ -365,22 +376,27 @@ func TestSyncLoopPersistState(t *testing.T) {
syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock

// with n da blobs fetched
var prevHeaderHash, prevAppHash []byte
for i := range myFutureDAHeight - myDAHeightOffset {
chainHeight, daHeight := i, i+myDAHeightOffset
chainHeight, daHeight := i+1, i+myDAHeightOffset
emptyData := &types.Data{
Metadata: &types.Metadata{
ChainID: gen.ChainID,
Height: chainHeight,
Time: uint64(time.Now().Add(time.Duration(chainHeight) * time.Second).UnixNano()),
},
}
_, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, nil, emptyData)
_, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, prevAppHash, emptyData, prevHeaderHash)
evts := []common.DAHeightEvent{{
Header: sigHeader,
Data: emptyData,
DaHeight: daHeight,
}}
daRtrMock.On("RetrieveFromDA", mock.Anything, daHeight).Return(evts, nil)
prevHeaderHash = sigHeader.Hash()
hasher := sha512.New()
hasher.Write(prevAppHash)
prevAppHash = hasher.Sum(nil)
}

// stop at next height
Expand All @@ -395,7 +411,6 @@ func TestSyncLoopPersistState(t *testing.T) {
Return(nil, coreda.ErrHeightFromFuture)

go syncerInst1.processLoop()

// dssync from DA until stop height reached
syncerInst1.syncLoop()
t.Log("syncLoop on instance1 completed")
Expand Down
1 change: 1 addition & 0 deletions proto/evnode/v1/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ message State {
google.protobuf.Timestamp last_block_time = 5;
uint64 da_height = 6;
bytes app_hash = 8;
bytes LastHeaderHash = 9;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store last header hash to validate ensure consistency


reserved 7;
}
1 change: 1 addition & 0 deletions test/e2e/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/testnet/
1 change: 1 addition & 0 deletions test/e2e/evm_full_node_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ func restartSequencerAndFullNode(t *testing.T, sut *SystemUnderTest, sequencerHo
// Now restart the full node (without init - node already exists)
sut.ExecCmd(evmSingleBinaryPath,
"start",
"--evnode.log.format", "json",
"--home", fullNodeHome,
"--evm.jwt-secret", fullNodeJwtSecret,
"--evm.genesis-hash", genesisHash,
Expand Down
1 change: 1 addition & 0 deletions test/e2e/evm_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func restartDAAndSequencer(t *testing.T, sut *SystemUnderTest, sequencerHome, jw
// Then restart the sequencer node (without init - node already exists)
sut.ExecCmd(evmSingleBinaryPath,
"start",
"--evnode.log.format", "json",
"--evm.jwt-secret", jwtSecret,
"--evm.genesis-hash", genesisHash,
"--rollkit.node.block_time", DefaultBlockTime,
Expand Down
Loading
Loading