Skip to content

Commit

Permalink
refactor(lib/sync): introduce fullsync strategy (#4114)
Browse files Browse the repository at this point in the history
Co-authored-by: Kirill <[email protected]>
Co-authored-by: Haiko Schol <[email protected]>
  • Loading branch information
3 people authored Sep 30, 2024
1 parent 0f0d6f7 commit dbe6858
Show file tree
Hide file tree
Showing 61 changed files with 2,770 additions and 6,792 deletions.
89 changes: 81 additions & 8 deletions dot/core/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/sync"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/babe/inherents"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -222,32 +221,32 @@ func TestHandleChainReorg_WithReorg_Trans(t *testing.T) {
rt, err := s.blockState.GetRuntime(bestBlockHash)
require.NoError(t, err)

block1 := sync.BuildBlock(t, rt, parent, nil)
block1 := BuildBlock(t, rt, parent, nil)
bs.StoreRuntime(block1.Header.Hash(), rt)
err = bs.AddBlock(block1)
require.NoError(t, err)

block2 := sync.BuildBlock(t, rt, &block1.Header, nil)
block2 := BuildBlock(t, rt, &block1.Header, nil)
bs.StoreRuntime(block2.Header.Hash(), rt)
err = bs.AddBlock(block2)
require.NoError(t, err)

block3 := sync.BuildBlock(t, rt, &block2.Header, nil)
block3 := BuildBlock(t, rt, &block2.Header, nil)
bs.StoreRuntime(block3.Header.Hash(), rt)
err = bs.AddBlock(block3)
require.NoError(t, err)

block4 := sync.BuildBlock(t, rt, &block3.Header, nil)
block4 := BuildBlock(t, rt, &block3.Header, nil)
bs.StoreRuntime(block4.Header.Hash(), rt)
err = bs.AddBlock(block4)
require.NoError(t, err)

block5 := sync.BuildBlock(t, rt, &block4.Header, nil)
block5 := BuildBlock(t, rt, &block4.Header, nil)
bs.StoreRuntime(block5.Header.Hash(), rt)
err = bs.AddBlock(block5)
require.NoError(t, err)

block31 := sync.BuildBlock(t, rt, &block2.Header, nil)
block31 := BuildBlock(t, rt, &block2.Header, nil)
bs.StoreRuntime(block31.Header.Hash(), rt)
err = bs.AddBlock(block31)
require.NoError(t, err)
Expand All @@ -257,7 +256,7 @@ func TestHandleChainReorg_WithReorg_Trans(t *testing.T) {
// Add extrinsic to block `block41`
ext := createExtrinsic(t, rt, bs.(*state.BlockState).GenesisHash(), nonce)

block41 := sync.BuildBlock(t, rt, &block31.Header, ext)
block41 := BuildBlock(t, rt, &block31.Header, ext)
bs.StoreRuntime(block41.Header.Hash(), rt)
err = bs.AddBlock(block41)
require.NoError(t, err)
Expand All @@ -269,6 +268,80 @@ func TestHandleChainReorg_WithReorg_Trans(t *testing.T) {
require.Equal(t, 1, len(pending))
}

func BuildBlock(t *testing.T, instance runtime.Instance, parent *types.Header, ext types.Extrinsic) *types.Block {
digest := types.NewDigest()
prd, err := types.NewBabeSecondaryPlainPreDigest(0, 1).ToPreRuntimeDigest()
require.NoError(t, err)
err = digest.Add(*prd)
require.NoError(t, err)
header := &types.Header{
ParentHash: parent.Hash(),
Number: parent.Number + 1,
Digest: digest,
}

err = instance.InitializeBlock(header)
require.NoError(t, err)

idata := types.NewInherentData()
err = idata.SetInherent(types.Timstap0, uint64(time.Now().Unix()))
require.NoError(t, err)

err = idata.SetInherent(types.Babeslot, uint64(1))
require.NoError(t, err)

ienc, err := idata.Encode()
require.NoError(t, err)

// Call BlockBuilder_inherent_extrinsics which returns the inherents as encoded extrinsics
inherentExts, err := instance.InherentExtrinsics(ienc)
require.NoError(t, err)

// decode inherent extrinsics
cp := make([]byte, len(inherentExts))
copy(cp, inherentExts)
var inExts [][]byte
err = scale.Unmarshal(cp, &inExts)
require.NoError(t, err)

// apply each inherent extrinsic
for _, inherent := range inExts {
in, err := scale.Marshal(inherent)
require.NoError(t, err)

ret, err := instance.ApplyExtrinsic(in)
require.NoError(t, err)
require.Equal(t, ret, []byte{0, 0})
}

body := types.Body(types.BytesArrayToExtrinsics(inExts))

if ext != nil {
// validate and apply extrinsic
var ret []byte

externalExt := types.Extrinsic(append([]byte{byte(types.TxnExternal)}, ext...))
_, err = instance.ValidateTransaction(externalExt)
require.NoError(t, err)

ret, err = instance.ApplyExtrinsic(ext)
require.NoError(t, err)
require.Equal(t, ret, []byte{0, 0})

body = append(body, ext)
}

res, err := instance.FinalizeBlock()
require.NoError(t, err)
res.Number = header.Number
res.Hash()

return &types.Block{
Header: *res,
Body: body,
}
}

func TestHandleChainReorg_WithReorg_NoTransactions(t *testing.T) {
s := NewTestService(t, nil)
const height = 5
Expand Down
4 changes: 2 additions & 2 deletions dot/mock_node_builder_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 4 additions & 19 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"

Expand Down Expand Up @@ -186,33 +185,19 @@ func (s *Service) validateBlockAnnounceHandshake(from peer.ID, hs Handshake) err
np.peersData.setInboundHandshakeData(from, data)
}

// if peer has higher best block than us, begin syncing
latestHeader, err := s.blockState.BestBlockHeader()
if err != nil {
return err
}

// check if peer block number is greater than host block number
if latestHeader.Number >= uint(bhs.BestBlockNumber) {
return nil
}

return s.syncer.HandleBlockAnnounceHandshake(from, bhs)
}

// handleBlockAnnounceMessage handles BlockAnnounce messages
// if some more blocks are required to sync the announced block, the node will open a sync stream
// with its peer and send a BlockRequest message
func (s *Service) handleBlockAnnounceMessage(from peer.ID, msg NotificationsMessage) (propagate bool, err error) {
func (s *Service) handleBlockAnnounceMessage(from peer.ID, msg NotificationsMessage) (bool, error) {
bam, ok := msg.(*BlockAnnounceMessage)
if !ok {
return false, errors.New("invalid message")
}

err = s.syncer.HandleBlockAnnounce(from, bam)
if errors.Is(err, blocktree.ErrBlockExists) {
return true, nil
}

return false, err
err := s.syncer.HandleBlockAnnounce(from, bam)
shouldPropagate := err == nil
return shouldPropagate, err
}
34 changes: 24 additions & 10 deletions dot/network/block_announce_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
package network

import (
"errors"
"testing"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"
gomock "go.uber.org/mock/gomock"
Expand Down Expand Up @@ -131,22 +131,33 @@ func TestHandleBlockAnnounceMessage(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
propagate bool
mockSyncer func(*testing.T, peer.ID, *BlockAnnounceMessage) Syncer
propagate bool
expectError bool
mockSyncer func(*testing.T, peer.ID, *BlockAnnounceMessage) Syncer
}{
"block_already_exists": {
"should_propagate": {
mockSyncer: func(t *testing.T, peer peer.ID, blockAnnounceMessage *BlockAnnounceMessage) Syncer {
ctrl := gomock.NewController(t)
syncer := NewMockSyncer(ctrl)
syncer.EXPECT().
HandleBlockAnnounce(peer, blockAnnounceMessage).
Return(blocktree.ErrBlockExists)
Return(nil)
return syncer
},
propagate: true,
expectError: false,
propagate: true,
},
"block_does_not_exists": {
propagate: false,
"should_not_propagate": {
mockSyncer: func(t *testing.T, peer peer.ID, blockAnnounceMessage *BlockAnnounceMessage) Syncer {
ctrl := gomock.NewController(t)
syncer := NewMockSyncer(ctrl)
syncer.EXPECT().
HandleBlockAnnounce(peer, blockAnnounceMessage).
Return(errors.New("mocked error"))
return syncer
},
expectError: true,
propagate: false,
},
}

Expand Down Expand Up @@ -175,8 +186,11 @@ func TestHandleBlockAnnounceMessage(t *testing.T) {

service := createTestService(t, config)
gotPropagate, err := service.handleBlockAnnounceMessage(peerID, msg)

require.NoError(t, err)
if tt.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.propagate, gotPropagate)
})
}
Expand Down
6 changes: 5 additions & 1 deletion dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg messa
return s.writeToStream(stream, announceHandshake)
}

func (s *testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
func (*testStreamHandler) writeToStream(stream libp2pnetwork.Stream, msg messages.P2PMessage) error {
encMsg, err := msg.Encode()
if err != nil {
return err
Expand Down Expand Up @@ -257,6 +257,10 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
CreateBlockResponse(gomock.Any(), gomock.Any()).
Return(newTestBlockResponseMessage(t), nil).AnyTimes()

syncer.EXPECT().
OnConnectionClosed(gomock.AssignableToTypeOf(peer.ID(string("")))).
AnyTimes()

syncer.EXPECT().IsSynced().Return(false).AnyTimes()
cfg.Syncer = syncer
}
Expand Down
4 changes: 4 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ func (h *host) writeToStream(s network.Stream, msg messages.P2PMessage) error {
return err
}

if len(encMsg) != sent {
logger.Errorf("full message not sent: sent %d, message size %d", sent, len(encMsg))
}

h.bwc.LogSentMessage(int64(sent))

return nil
Expand Down
4 changes: 2 additions & 2 deletions dot/network/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ func TestAscendingBlockRequest(t *testing.T) {
},
},
},

"requesting_4_chunks_of_128_plus_3_blocks": {
startNumber: 1,
targetNumber: (128 * 4) + 3,
Expand Down Expand Up @@ -433,7 +432,8 @@ func TestAscendingBlockRequest(t *testing.T) {
tt := tt

t.Run(tname, func(t *testing.T) {
requests := messages.NewAscendingBlockRequests(tt.startNumber, tt.targetNumber, messages.BootstrapRequestData)
requests := messages.NewAscendingBlockRequests(tt.startNumber, tt.targetNumber,
messages.BootstrapRequestData)
require.Equal(t, tt.expectedBlockRequestMessage, requests)

acc := uint32(0)
Expand Down
Loading

0 comments on commit dbe6858

Please sign in to comment.