Skip to content

Commit

Permalink
[Syncer] Fix flaky node sync (#548)
Browse files Browse the repository at this point in the history
* Make the insane sync implementation a little more readable

* Remove leftover log files

* Remove useless lock

* allocate slice with known size

* reorder code

* move SyncPeer to separate file

* fetch peer's Number atomically; remove redundant cast

* change if statement to use <= (earleir impl)

* simplify popBlock loop

* Fix minor allocation bug

* implement pruning od blocks from peers

* revert purgeBlocks() on start of WatchSyncWithPeer

* refactor popBlock method

* make enqueueCh buffered

* rollback to merge commit

* * implement prunePeerEnqueuedBlocks
	* this method clears old blocks from all peers' queues

* prune old blocks when commiting a block (BulkSyncWithPeer, WatchSyncWithPeer)

* execute callback handler first, then check if

* implement depented popBlock timeout based on configured block production time

* fix linter

* reduce popTimeout factor (5 -> 3)

* notify peers of new block asynchronously

* use synchronous Broadcast in tests (SyncBroadcast)

* Fix typo in log message

* Standardize errors in the skeleton package

* Simplify conditional in syncer

* Enforce minimum block time

* Allocate the exact number of header objects

* Define constant for timeout

* Rename break variable

* Simplify fromProto methods in syncer

* add break statement in for loop

* simplify if in enqueueBlock

Co-authored-by: dbrajovic <[email protected]>
  • Loading branch information
zivkovicmilos and dbrajovic authored Jun 1, 2022
1 parent d3002b4 commit f73da20
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 505 deletions.
20 changes: 19 additions & 1 deletion command/server/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"errors"
"fmt"
"github.com/0xPolygon/polygon-edge/command/server/config"
"math"
Expand All @@ -16,6 +17,11 @@ import (
"github.com/0xPolygon/polygon-edge/types"
)

var (
errInvalidBlockTime = errors.New("invalid block time specified")
errDataDirectoryUndefined = errors.New("data directory not defined")
)

func (p *serverParams) initConfigFromFile() error {
var parseErr error

Expand Down Expand Up @@ -43,6 +49,10 @@ func (p *serverParams) initRawParams() error {
return err
}

if err := p.initBlockTime(); err != nil {
return err
}

if p.isDevMode {
p.initDevMode()
}
Expand All @@ -53,9 +63,17 @@ func (p *serverParams) initRawParams() error {
return p.initAddresses()
}

func (p *serverParams) initBlockTime() error {
if p.rawConfig.BlockTime < 1 {
return errInvalidBlockTime
}

return nil
}

func (p *serverParams) initDataDirLocation() error {
if p.rawConfig.DataDir == "" {
return fmt.Errorf("data directory not defined")
return errDataDirectoryUndefined
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func setFlags(cmd *cobra.Command) {
&params.rawConfig.BlockTime,
blockTimeFlag,
defaultConfig.BlockTime,
"minimum block time in seconds",
"minimum block time in seconds (at least 1s)",
)

cmd.Flags().StringArrayVar(
Expand Down
4 changes: 2 additions & 2 deletions consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type syncerInterface interface {
Start()
BestPeer() *protocol.SyncPeer
BulkSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(block *types.Block)) error
WatchSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(b *types.Block) bool)
WatchSyncWithPeer(p *protocol.SyncPeer, newBlockHandler func(b *types.Block) bool, blockTimeout time.Duration)
GetSyncProgression() *progress.Progression
Broadcast(b *types.Block)
}
Expand Down Expand Up @@ -542,7 +542,7 @@ func (i *Ibft) runSyncState() {
isValidator = i.isValidSnapshot()

return isValidator
})
}, i.blockTime)

if isValidator {
// at this point, we are in sync with the latest chain we know of
Expand Down
8 changes: 6 additions & 2 deletions consensus/ibft/ibft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,13 @@ func (s *mockSyncer) BulkSyncWithPeer(p *protocol.SyncPeer, handler func(block *
return nil
}

func (s *mockSyncer) WatchSyncWithPeer(p *protocol.SyncPeer, handler func(b *types.Block) bool) {
func (s *mockSyncer) WatchSyncWithPeer(
p *protocol.SyncPeer,
newBlockHandler func(b *types.Block) bool,
blockTimeout time.Duration,
) {
if s.receivedNewHeadFromPeer != nil {
handler(s.receivedNewHeadFromPeer)
newBlockHandler(s.receivedNewHeadFromPeer)
}
}

Expand Down
27 changes: 18 additions & 9 deletions protocol/service_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type rlpObject interface {
}

var (
errInvalidHeadersRequest = errors.New("invalid headers request")
errMalformedNotifyRequest = errors.New("malformed notify request")
errMalformedNotifyBody = errors.New("malformed notify body")
errMalformedNotifyStatus = errors.New("malformed notify status")
Expand All @@ -54,7 +55,7 @@ func (s *serviceV1) Notify(ctx context.Context, req *proto.NotifyReq) (*empty.Em
return nil, err
}

status, err := fromProto(req.Status)
status, err := statusFromProto(req.Status)

if err != nil {
return nil, err
Expand Down Expand Up @@ -135,21 +136,22 @@ func (s *serviceV1) GetObjectsByHash(_ context.Context, req *proto.HashRequest)
return resp, nil
}

const maxHeadersAmount = 190
const MaxSkeletonHeadersAmount = 190

// GetHeaders implements the V1Server interface
func (s *serviceV1) GetHeaders(_ context.Context, req *proto.GetHeadersRequest) (*proto.Response, error) {
if req.Number != 0 && req.Hash != "" {
return nil, errors.New("cannot provide both a number and a hash")
return nil, errInvalidHeadersRequest
}

if req.Amount > maxHeadersAmount {
req.Amount = maxHeadersAmount
if req.Amount > MaxSkeletonHeadersAmount {
req.Amount = MaxSkeletonHeadersAmount
}

var origin *types.Header

var ok bool
var (
origin *types.Header
ok bool
)

if req.Number != 0 {
origin, ok = s.store.GetHeaderByNumber(uint64(req.Number))
Expand All @@ -171,6 +173,7 @@ func (s *serviceV1) GetHeaders(_ context.Context, req *proto.GetHeadersRequest)
resp := &proto.Response{
Objs: []*proto.Response_Component{},
}

addData := func(h *types.Header) {
resp.Objs = append(resp.Objs, &proto.Response_Component{
Spec: &anypb.Any{
Expand Down Expand Up @@ -211,7 +214,13 @@ func getBodies(ctx context.Context, clt proto.V1Client, hashes []types.Hash) ([]
input = append(input, h.String())
}

resp, err := clt.GetObjectsByHash(ctx, &proto.HashRequest{Hash: input, Type: proto.HashRequest_BODIES})
resp, err := clt.GetObjectsByHash(
ctx,
&proto.HashRequest{
Hash: input,
Type: proto.HashRequest_BODIES,
},
)
if err != nil {
return nil, err
}
Expand Down
133 changes: 51 additions & 82 deletions protocol/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/0xPolygon/polygon-edge/protocol/proto"
"github.com/0xPolygon/polygon-edge/types"
)

const (
defaultBodyFetchTimeout = time.Second * 10
)

var (
errMalformedHeadersResponse = errors.New("malformed headers response")
errMalformedHeadersBody = errors.New("malformed headers body")
errInvalidHeaderSequence = errors.New("invalid header sequence")
errHeaderBodyMismatch = errors.New("requested body and header mismatch")
)

func getHeaders(clt proto.V1Client, req *proto.GetHeadersRequest) ([]*types.Header, error) {
Expand All @@ -20,9 +27,9 @@ func getHeaders(clt proto.V1Client, req *proto.GetHeadersRequest) ([]*types.Head
return nil, err
}

headers := []*types.Header{}
headers := make([]*types.Header, len(resp.Objs))

for _, obj := range resp.Objs {
for index, obj := range resp.Objs {
// Verify the header response is correctly formed
if verifyErr := verifyHeadersResponse(obj); verifyErr != nil {
return nil, fmt.Errorf(
Expand All @@ -36,7 +43,7 @@ func getHeaders(clt proto.V1Client, req *proto.GetHeadersRequest) ([]*types.Head
return nil, err
}

headers = append(headers, header)
headers[index] = header
}

return headers, nil
Expand All @@ -58,105 +65,67 @@ func verifyHeadersResponse(headersResp *proto.Response_Component) error {
}

type skeleton struct {
slots []*slot
span int64
num int64
}

func (s *skeleton) LastHeader() *types.Header {
slot := s.slots[len(s.slots)-1]

return slot.blocks[len(slot.blocks)-1].Header
}

func (s *skeleton) build(clt proto.V1Client, ancestor types.Hash) error {
// since ancestor is the common block we need to query the next one
headers, err := getHeaders(clt, &proto.GetHeadersRequest{Hash: ancestor.String(), Skip: s.span - 1, Amount: s.num})
if err != nil {
return err
}
s.addSkeleton(headers) // nolint

return nil
blocks []*types.Block
skip int64
amount int64
}

func (s *skeleton) fillSlot(indx uint64, clt proto.V1Client) error {
slot := s.slots[indx]
req := &proto.GetHeadersRequest{
Hash: slot.hash.String(),
Amount: s.span,
}
resp, err := getHeaders(clt, req)

// getBlocksFromPeer fetches the blocks from the peer,
// from the specified block number (including)
func (s *skeleton) getBlocksFromPeer(
peerClient proto.V1Client,
initialBlockNum uint64,
) error {
// Fetch the headers from the peer
headers, err := getHeaders(
peerClient,
&proto.GetHeadersRequest{
Number: int64(initialBlockNum),
Skip: s.skip,
Amount: s.amount,
},
)
if err != nil {
return err
}

slot.blocks = []*types.Block{}

for _, h := range resp {
slot.blocks = append(slot.blocks, &types.Block{
Header: h,
})
}

// for each header with body we request it
bodyHashes := []types.Hash{}
bodyIndex := []int{}

for indx, h := range resp {
if h.TxRoot != types.EmptyRootHash {
bodyHashes = append(bodyHashes, h.Hash)
bodyIndex = append(bodyIndex, indx)
// Make sure the number sequences match up
for i := 1; i < len(headers); i++ {
if headers[i].Number-headers[i-1].Number != 1 {
return errInvalidHeaderSequence
}
}

if len(bodyHashes) == 0 {
return nil
// Construct the body request
headerHashes := make([]types.Hash, len(headers))
for index, header := range headers {
headerHashes[index] = header.Hash
}

bodies, err := getBodies(context.Background(), clt, bodyHashes)
getBodiesContext, cancelFn := context.WithTimeout(
context.Background(),
defaultBodyFetchTimeout,
)
defer cancelFn()

// Grab the block bodies
bodies, err := getBodies(getBodiesContext, peerClient, headerHashes)
if err != nil {
return err
}

for indx, body := range bodies {
slot.blocks[bodyIndex[indx]].Transactions = body.Transactions
}

return nil
}

func (s *skeleton) addSkeleton(headers []*types.Header) error {
// safe check make sure they all have the same difference
diff := uint64(0)

for i := 1; i < len(headers); i++ {
elemDiff := headers[i].Number - headers[i-1].Number
if diff == 0 {
diff = elemDiff
} else if elemDiff != diff {
return fmt.Errorf("bad diff")
}
if len(bodies) != len(headers) {
return errHeaderBodyMismatch
}

// fill up the slots
s.slots = make([]*slot, len(headers))
s.blocks = make([]*types.Block, len(headers))

for indx, header := range headers {
slot := &slot{
hash: header.Hash,
number: header.Number,
blocks: make([]*types.Block, diff),
for index, body := range bodies {
s.blocks[index] = &types.Block{
Header: headers[index],
Transactions: body.Transactions,
}
s.slots[indx] = slot
}

return nil
}

type slot struct {
hash types.Hash
number uint64
blocks []*types.Block
}
Loading

0 comments on commit f73da20

Please sign in to comment.