Skip to content

Commit 31859a7

Browse files
renaynaytac0turtle
andauthored
fix(block/internal/syncing): only publish header/data locally if source.DA -- do not pollute p2p (#2777)
This is an attempt to fix subscriber too slow problem by only publishing height events that come from DA layer to local subscription (which prevents notifications to external peers). This will allow p2p syncer(s) to see new heights + trigger sync loop to fill in gaps inside go-header store(s) without polluting your p2p network. Please review carefully and also please answer the `TODO` I left in there as I still don't really understand the logic there. --------- Co-authored-by: tac0turtle <[email protected]>
1 parent 1c2d9ef commit 31859a7

File tree

6 files changed

+121
-15
lines changed

6 files changed

+121
-15
lines changed

block/internal/common/broadcaster_mock.go

Lines changed: 28 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

block/internal/common/expected_interfaces.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package common
22

33
import (
44
"context"
5+
pubsub "github.com/libp2p/go-libp2p-pubsub"
56

67
"github.com/celestiaorg/go-header"
78
)
89

910
// broadcaster interface for P2P broadcasting
1011
type Broadcaster[H header.Header[H]] interface {
11-
WriteToStoreAndBroadcast(ctx context.Context, payload H) error
12+
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
1213
Store() header.Store[H]
1314
}

block/internal/syncing/syncer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
pubsub "github.com/libp2p/go-libp2p-pubsub"
89
"sync"
910
"sync/atomic"
1011
"time"
1112

1213
"github.com/rs/zerolog"
1314
"golang.org/x/sync/errgroup"
1415

15-
"github.com/evstack/ev-node/block/internal/cache"
16-
"github.com/evstack/ev-node/block/internal/common"
1716
coreda "github.com/evstack/ev-node/core/da"
1817
coreexecutor "github.com/evstack/ev-node/core/execution"
18+
19+
"github.com/evstack/ev-node/block/internal/cache"
20+
"github.com/evstack/ev-node/block/internal/common"
1921
"github.com/evstack/ev-node/pkg/config"
2022
"github.com/evstack/ev-node/pkg/genesis"
2123
"github.com/evstack/ev-node/pkg/store"
@@ -349,6 +351,8 @@ func (s *Syncer) tryFetchFromP2P() {
349351

350352
// Process data (if not already processed by headers)
351353
newDataHeight := s.dataStore.Store().Height()
354+
// TODO @MARKO: why only if newDataHeight != newHeaderHeight? why not process
355+
// just if newDataHeight > currentHeight ?
352356
if newDataHeight != newHeaderHeight && newDataHeight > currentHeight {
353357
s.p2pHandler.ProcessDataRange(s.ctx, currentHeight+1, newDataHeight, s.heightInCh)
354358
}
@@ -415,8 +419,16 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
415419
// only save to p2p stores if the event came from DA
416420
if event.Source == common.SourceDA {
417421
g, ctx := errgroup.WithContext(s.ctx)
418-
g.Go(func() error { return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header) })
419-
g.Go(func() error { return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data) })
422+
g.Go(func() error {
423+
// broadcast header locally only — prevents spamming the p2p network with old height notifications,
424+
// allowing the syncer to update its target and fill missing blocks
425+
return s.headerStore.WriteToStoreAndBroadcast(ctx, event.Header, pubsub.WithLocalPublication(true))
426+
})
427+
g.Go(func() error {
428+
// broadcast data locally only — prevents spamming the p2p network with old height notifications,
429+
// allowing the syncer to update its target and fill missing blocks
430+
return s.dataStore.WriteToStoreAndBroadcast(ctx, event.Data, pubsub.WithLocalPublication(true))
431+
})
420432
if err := g.Wait(); err != nil {
421433
s.logger.Error().Err(err).Msg("failed to append event header and/or data to p2p store")
422434
}

pkg/sync/sync_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (syncService *SyncService[H]) Store() header.Store[H] {
123123

124124
// WriteToStoreAndBroadcast initializes store if needed and broadcasts provided header or block.
125125
// Note: Only returns an error in case store can't be initialized. Logs error if there's one while broadcasting.
126-
func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, headerOrData H) error {
126+
func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, headerOrData H, opts ...pubsub.PubOpt) error {
127127
if syncService.genesis.InitialHeight == 0 {
128128
return fmt.Errorf("invalid initial height; cannot be zero")
129129
}
@@ -148,7 +148,7 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
148148
}
149149

150150
// Broadcast for subscribers
151-
if err := syncService.sub.Broadcast(ctx, headerOrData); err != nil {
151+
if err := syncService.sub.Broadcast(ctx, headerOrData, opts...); err != nil {
152152
// for the first block when starting the app, broadcast error is expected
153153
// as we have already initialized the store for starting the syncer.
154154
// Hence, we ignore the error. Exact reason: validation ignored

types/serialization.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ func (h *Header) ToProto() *pb.Header {
162162
ChainId: h.BaseHeader.ChainID,
163163
ValidatorHash: h.ValidatorHash,
164164
}
165+
if unknown := encodeLegacyUnknownFields(h.Legacy); len(unknown) > 0 {
166+
pHeader.ProtoReflect().SetUnknown(unknown)
167+
}
165168
return pHeader
166169
}
167170

@@ -509,6 +512,28 @@ func decodeLegacyHeaderFields(pHeader *pb.Header) (*LegacyHeaderFields, error) {
509512
return &legacy, nil
510513
}
511514

515+
func encodeLegacyUnknownFields(legacy *LegacyHeaderFields) []byte {
516+
if legacy == nil || legacy.IsZero() {
517+
return nil
518+
}
519+
520+
var payload []byte
521+
522+
if len(legacy.LastCommitHash) > 0 {
523+
payload = appendBytesField(payload, legacyLastCommitHashField, legacy.LastCommitHash)
524+
}
525+
526+
if len(legacy.ConsensusHash) > 0 {
527+
payload = appendBytesField(payload, legacyConsensusHashField, legacy.ConsensusHash)
528+
}
529+
530+
if len(legacy.LastResultsHash) > 0 {
531+
payload = appendBytesField(payload, legacyLastResultsHashField, legacy.LastResultsHash)
532+
}
533+
534+
return payload
535+
}
536+
512537
func appendBytesField(buf []byte, number protowire.Number, value []byte) []byte {
513538
buf = protowire.AppendTag(buf, number, protowire.BytesType)
514539
buf = protowire.AppendVarint(buf, uint64(len(value)))

types/serialization_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package types
22

33
import (
4+
"bytes"
45
"crypto/rand"
56
"testing"
67
"time"
@@ -355,6 +356,53 @@ func TestHeader_HashFields_NilAndEmpty(t *testing.T) {
355356
assert.Nil(t, h2.ValidatorHash)
356357
}
357358

359+
func TestHeaderMarshalBinary_PreservesLegacyFields(t *testing.T) {
360+
t.Parallel()
361+
362+
header := &Header{
363+
Version: Version{Block: 2, App: 3},
364+
BaseHeader: BaseHeader{Height: 42, Time: 123456789, ChainID: "chain-legacy"},
365+
LastHeaderHash: []byte{
366+
0x01, 0x02, 0x03, 0x04,
367+
},
368+
DataHash: []byte{0x05, 0x06, 0x07, 0x08},
369+
AppHash: []byte{0x09, 0x0A, 0x0B, 0x0C},
370+
ProposerAddress: []byte{0x0D, 0x0E, 0x0F, 0x10},
371+
ValidatorHash: []byte{0x11, 0x12, 0x13, 0x14},
372+
Legacy: &LegacyHeaderFields{
373+
LastCommitHash: bytes.Repeat([]byte{0x21}, 32),
374+
ConsensusHash: bytes.Repeat([]byte{0x22}, 32),
375+
LastResultsHash: bytes.Repeat([]byte{0x23}, 32),
376+
},
377+
}
378+
379+
unknown := header.ToProto().ProtoReflect().GetUnknown()
380+
require.NotEmpty(t, unknown, "legacy fields should be serialized into unknown proto fields")
381+
382+
raw, err := header.MarshalBinary()
383+
require.NoError(t, err)
384+
require.NotEmpty(t, raw)
385+
386+
var decoded Header
387+
require.NoError(t, decoded.UnmarshalBinary(raw))
388+
require.NotNil(t, decoded.Legacy)
389+
require.False(t, decoded.Legacy.IsZero())
390+
391+
assert.Equal(t, header.Legacy.LastCommitHash, decoded.Legacy.LastCommitHash)
392+
assert.Equal(t, header.Legacy.ConsensusHash, decoded.Legacy.ConsensusHash)
393+
assert.Equal(t, header.Legacy.LastResultsHash, decoded.Legacy.LastResultsHash)
394+
395+
origHash := header.Hash()
396+
decodedHash := decoded.Hash()
397+
require.NotNil(t, origHash)
398+
require.NotNil(t, decodedHash)
399+
assert.Equal(t, origHash, decodedHash)
400+
401+
legacyHash, err := header.HashLegacy()
402+
require.NoError(t, err)
403+
assert.Equal(t, legacyHash, origHash)
404+
}
405+
358406
// TestProtoConversionConsistency_AllTypes checks that ToProto/FromProto are true inverses for all major types.
359407
func TestProtoConversionConsistency_AllTypes(t *testing.T) {
360408
// Header

0 commit comments

Comments
 (0)