Skip to content

Commit

Permalink
feat(dot/sync): Implement warp sync strategy (#4275)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimartiro authored Jan 29, 2025
1 parent 9698cad commit 55446ab
Show file tree
Hide file tree
Showing 31 changed files with 1,004 additions and 71 deletions.
1 change: 1 addition & 0 deletions chain/kusama/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/paseo/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/polkadot/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/westend-dev/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/westend-local/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.Sync = "full"

return config
}
Expand Down
1 change: 1 addition & 0 deletions chain/westend/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
8 changes: 8 additions & 0 deletions cmd/gossamer/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ func addCoreFlags(cmd *cobra.Command) error {
return fmt.Errorf("failed to add --grandpa-interval flag: %s", err)
}

if err := addStringFlagBindViper(cmd,
"sync",
config.Core.Sync,
"sync mode [warp | full]",
"core.sync"); err != nil {
return fmt.Errorf("failed to add --sync flag: %s", err)
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
DefaultSystemName = "Gossamer"
// DefaultSystemVersion is the default system version
DefaultSystemVersion = "0.0.0"

// DefaultSyncMode is the default block sync mode
DefaultSyncMode = "full"
)

// DefaultRPCModules the default RPC modules
Expand Down Expand Up @@ -188,6 +191,7 @@ type CoreConfig struct {
GrandpaAuthority bool `mapstructure:"grandpa-authority"`
WasmInterpreter string `mapstructure:"wasm-interpreter,omitempty"`
GrandpaInterval time.Duration `mapstructure:"grandpa-interval,omitempty"`
Sync string `mapstructure:"sync,omitempty"`
}

// StateConfig contains the configuration for the state.
Expand Down Expand Up @@ -363,6 +367,7 @@ func DefaultConfig() *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
Sync: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -444,6 +449,7 @@ func DefaultConfigFromSpec(nodeSpec *genesis.Genesis) *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
Sync: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -525,6 +531,7 @@ func Copy(c *Config) Config {
GrandpaAuthority: c.Core.GrandpaAuthority,
WasmInterpreter: c.Core.WasmInterpreter,
GrandpaInterval: c.Core.GrandpaInterval,
Sync: c.Core.Sync,
},
Network: &NetworkConfig{
Port: c.Network.Port,
Expand Down
12 changes: 11 additions & 1 deletion dot/network/host_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package network

import (
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -396,12 +397,21 @@ func Test_PeerSupportsProtocol(t *testing.T) {
}
require.NoError(t, err)

genesisHash := nodeA.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
fullSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, SyncID)
warpSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, WarpSyncID)

tests := []struct {
protocol protocol.ID
expect bool
}{
{
protocol: protocol.ID("/gossamer/test/0/sync/2"),
protocol: protocol.ID(fullSyncProtocolId),
expect: true,
},
{
protocol: protocol.ID(warpSyncProtocolId),
expect: true,
},
{
Expand Down
6 changes: 6 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type WarpProofRequest struct {
Begin common.Hash
}

func NewWarpProofRequest(from common.Hash) *WarpProofRequest {
return &WarpProofRequest{
Begin: from,
}
}

// Decode decodes the message into a WarpProofRequest
func (wpr *WarpProofRequest) Decode(in []byte) error {
return scale.Unmarshal(in, wpr)
Expand Down
5 changes: 3 additions & 2 deletions dot/network/mock_warp_sync_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,14 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String())
genesisHash := s.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
fullSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, SyncID)
warpSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, WarpSyncID)

s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(protocol.ID(fullSyncProtocolId), s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)
s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream)
s.host.registerStreamHandler(protocol.ID(warpSyncProtocolId), s.handleWarpSyncStream)

// register block announce protocol
err := s.RegisterNotificationsProtocol(
Expand Down Expand Up @@ -622,13 +625,16 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *RequestResponseProtocol {

protocolID := s.host.protocolID + protocol.ID(subprotocol)
genesisHash := s.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
protocolId := fmt.Sprintf("/%s%s", genesisHash, subprotocol)

return &RequestResponseProtocol{
ctx: s.ctx,
host: s.host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
protocolID: protocol.ID(protocolId),
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
Expand Down
11 changes: 2 additions & 9 deletions dot/network/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,16 @@ import (
"fmt"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
primitives "github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/grandpa/warpsync"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

const MaxAllowedSameRequestPerPeer = 5

type WarpSyncVerificationResult struct {
SetId grandpa.SetID
AuthorityList primitives.AuthorityList
Header types.Header
Completed bool
}

// WarpSyncProvider is an interface for generating warp sync proofs
type WarpSyncProvider interface {
// Generate proof starting at given block hash. The proof is accumulated until maximum proof
Expand All @@ -34,7 +27,7 @@ type WarpSyncProvider interface {
encodedProof []byte,
setId grandpa.SetID,
authorities primitives.AuthorityList,
) (*WarpSyncVerificationResult, error)
) (*warpsync.WarpSyncVerificationResult, error)
}

func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) {
Expand Down
10 changes: 10 additions & 0 deletions dot/peerset/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,14 @@ const (
// SameBlockSyncRequest used when a peer send us more than the max number of the same request.
SameBlockSyncRequest Reputation = math.MinInt32
SameBlockSyncRequestReason = "same block sync request"

// UnexpectedResponseValue is used when peer send an unexpected response.
UnexpectedResponseValue Reputation = -(1 << 29)
// UnexpectedResponseReason is used when peer send an unexpected response.
UnexpectedResponseReason = "Unexpected response"

// BadWarpProofValue is used when peer send invalid warp sync proof.
BadWarpProofValue Reputation = -(1 << 29)
// BadWarpProofReason is used when peer send invalid warp sync proof.
BadWarpProofReason = "Bad warp proof"
)
34 changes: 29 additions & 5 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/ChainSafe/gossamer/lib/grandpa"
"github.com/ChainSafe/gossamer/lib/grandpa/warpsync"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"
Expand Down Expand Up @@ -349,7 +350,7 @@ func (nodeBuilder) createNetworkService(config *cfg.Config, stateSrvc *state.Ser
return nil, fmt.Errorf("failed to parse network log level: %w", err)
}

warpSyncProvider := grandpa.NewWarpSyncProofProvider(
warpSyncProvider := warpsync.NewWarpSyncProofProvider(
stateSrvc.Block, stateSrvc.Grandpa,
)

Expand Down Expand Up @@ -523,8 +524,28 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
return nil, fmt.Errorf("failed to parse sync log level: %w", err)
}

requestMaker := net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize)
// Should be shared between all sync strategies
peersView := sync.NewPeerViewSet()

var warpSyncStrategy sync.Strategy

if config.Core.Sync == "warp" {
warpSyncProvider := warpsync.NewWarpSyncProofProvider(st.Block, st.Grandpa)

warpSyncCfg := &sync.WarpSyncConfig{
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
WarpSyncProvider: warpSyncProvider,
WarpSyncRequestMaker: net.GetRequestResponseProtocol(network.WarpSyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
SyncRequestMaker: net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
BlockState: st.Block,
Peers: peersView,
}

warpSyncStrategy = sync.NewWarpSyncStrategy(warpSyncCfg)
}

syncCfg := &sync.FullSyncConfig{
BlockState: st.Block,
Expand All @@ -535,7 +556,9 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
BlockImportHandler: cs,
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
RequestMaker: requestMaker,
RequestMaker: net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
Peers: peersView,
}
fullSync := sync.NewFullSyncStrategy(syncCfg)

Expand All @@ -544,7 +567,8 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
sync.WithNetwork(net),
sync.WithBlockState(st.Block),
sync.WithSlotDuration(slotDuration),
sync.WithStrategies(fullSync, nil),
sync.WithWarpSyncStrategy(warpSyncStrategy),
sync.WithFullSyncStrategy(fullSync),
sync.WithMinPeers(config.Network.MinPeers),
), nil
}
Expand Down
11 changes: 8 additions & 3 deletions dot/sync/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import "time"

type ServiceConfig func(svc *SyncService)

func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
func WithWarpSyncStrategy(warpSyncStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.currentStrategy = currentStrategy
svc.defaultStrategy = defaultStrategy
svc.warpSyncStrategy = warpSyncStrategy
}
}

func WithFullSyncStrategy(fullSyncStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.fullSyncStrategy = fullSyncStrategy
}
}

Expand Down
13 changes: 8 additions & 5 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type FullSyncConfig struct {
BadBlocks []string
NumOfTasks int
RequestMaker network.RequestMaker
Peers *peerViewSet
}

type importer interface {
Expand Down Expand Up @@ -75,15 +76,12 @@ func NewFullSyncStrategy(cfg *FullSyncConfig) *FullSyncStrategy {
reqMaker: cfg.RequestMaker,
blockState: cfg.BlockState,
numOfTasks: cfg.NumOfTasks,
peers: cfg.Peers,
blockImporter: newBlockImporter(cfg),
unreadyBlocks: newUnreadyBlocks(),
requestQueue: &requestsQueue[*messages.BlockRequestMessage]{
queue: list.New(),
},
peers: &peerViewSet{
view: make(map[peer.ID]peerView),
target: 0,
},
}
}

Expand All @@ -109,7 +107,7 @@ func (f *FullSyncStrategy) NextActions() ([]*SyncTask, error) {
}

// our best block is equal or ahead of current target.
// in the node's pov we are not legging behind so there's nothing to do
// in the node's pov we are not lagging behind so there's nothing to do
// or we didn't receive block announces, so lets ask for more blocks
if uint32(bestBlockHeader.Number) >= currentTarget {
return f.createTasks(reqsFromQueue), nil
Expand Down Expand Up @@ -405,6 +403,11 @@ func (f *FullSyncStrategy) IsSynced() bool {
return uint32(highestBlock)+messages.MaxBlocksInResponse >= f.peers.getTarget()
}

func (f *FullSyncStrategy) Result() any {
logger.Debug("trying to get a result from full sync strategy which is supposed to run forever")
return nil
}

type RequestResponseData struct {
req *messages.BlockRequestMessage
responseData []*types.BlockData
Expand Down
Loading

0 comments on commit 55446ab

Please sign in to comment.