Skip to content

Commit

Permalink
Revert "[StateSync] add retries to process state sync where failed pe…
Browse files Browse the repository at this point in the history
…er config is excluded on the next try (#1835)" (#2262)

This reverts commit 973ac15.
  • Loading branch information
fxfactorial authored Feb 13, 2020
1 parent 206208f commit 521cd50
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 74 deletions.
90 changes: 21 additions & 69 deletions api/service/syncing/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// Constants for syncing.
const (
downloadBlocksRetryLimit = 5 // downloadBlocks service retry limit
stateSyncRetryLimit = 5 // ProcessStateSync retry limit
TimesToFail = 5 // downloadBlocks service retry limit
RegistrationNumber = 3
SyncingPortDifference = 3000
inSyncThreshold = 0 // when peerBlockHeight - myBlockHeight <= inSyncThreshold, it's ready to join consensus
Expand Down Expand Up @@ -66,10 +66,8 @@ type SyncConfig struct {
// mtx locks peers, and *SyncPeerConfig pointers in peers.
// SyncPeerConfig itself is guarded by its own mutex.
mtx sync.RWMutex
// failedPeers contains the sync peer config that had been chosen
// as the max consensus sync peer config but failed during chain insertion
failedPeers []*SyncPeerConfig
peers []*SyncPeerConfig

peers []*SyncPeerConfig
}

// AddPeer adds the given sync peer.
Expand Down Expand Up @@ -201,8 +199,8 @@ func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *
}
}

// CompareSyncPeerConfigByBlockHashes compares two SyncPeerConfig by blockHashes.
func CompareSyncPeerConfigByBlockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes.
func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
if len(a.blockHashes) != len(b.blockHashes) {
if len(a.blockHashes) < len(b.blockHashes) {
return -1
Expand Down Expand Up @@ -285,19 +283,7 @@ func (sc *SyncConfig) getHowManyMaxConsensus() (int, int) {
maxCount := 0
maxFirstID := -1
for i := range sc.peers {
// skip if among the previously failed peer config
skip := false
for j := range sc.failedPeers {
if CompareSyncPeerConfigByBlockHashes(sc.peers[i], sc.failedPeers[j]) == 0 {
skip = true
break
}
}
if skip {
continue
}

if curFirstID == -1 || CompareSyncPeerConfigByBlockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(sc.peers[curFirstID], sc.peers[i]) != 0 {
curCount = 1
curFirstID = i
} else {
Expand Down Expand Up @@ -326,10 +312,10 @@ func (sc *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]
func (sc *SyncConfig) cleanUpPeers(maxFirstID int) {
fixedPeer := sc.peers[maxFirstID]
for i := 0; i < len(sc.peers); i++ {
if CompareSyncPeerConfigByBlockHashes(fixedPeer, sc.peers[i]) != 0 {
if CompareSyncPeerConfigByblockHashes(fixedPeer, sc.peers[i]) != 0 {
// TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the peers set
// Close the client and remove the peer out of the
sc.peers[i].client.Close()
copy(sc.peers[i:], sc.peers[i+1:])
sc.peers[len(sc.peers)-1] = nil
Expand All @@ -347,7 +333,7 @@ func (sc *SyncConfig) GetBlockHashesConsensusAndCleanUp() {
defer sc.mtx.Unlock()
// Sort all peers by the blockHashes.
sort.Slice(sc.peers, func(i, j int) bool {
return CompareSyncPeerConfigByBlockHashes(sc.peers[i], sc.peers[j]) == -1
return CompareSyncPeerConfigByblockHashes(sc.peers[i], sc.peers[j]) == -1
})
maxFirstID, maxCount := sc.getHowManyMaxConsensus()
utils.Logger().Info().
Expand Down Expand Up @@ -784,62 +770,28 @@ Loop:
currentHeight := bc.CurrentBlock().NumberU64()

if currentHeight >= otherHeight {
utils.Logger().Debug().
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] Node is now IN SYNC!")
utils.Logger().Info().
Msgf("[SYNC] Node is now IN SYNC! (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
break Loop
} else {
utils.Logger().Debug().
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] Node is Not in Sync.")
Msgf("[SYNC] Node is Not in Sync (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
}
startHash := bc.CurrentBlock().Hash()
size := uint32(otherHeight - currentHeight)
if size > SyncLoopBatchSize {
size = SyncLoopBatchSize
}

retryCount := 0
var err error
for {
err = ss.ProcessStateSync(startHash[:], size, bc, worker)
if err == nil {
break
}
if retryCount >= stateSyncRetryLimit {
utils.Logger().Warn().Err(err).
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Msg("[SYNC] ProcessStateSync failed after retries. Node failed to sync.")
break
}

utils.Logger().Warn().Err(err).
Bool("isBeacon", isBeacon).
Uint32("ShardID", bc.ShardID()).
Uint64("otherHeight", otherHeight).
Uint64("currentHeight", currentHeight).
Int("retryCount", retryCount).
Msg("[SYNC] ProcessStateSync failed. Retrying ...")

// Track the peer sync config(s) that failed node sync here to exclude it on successive retries
trackFailedPeer := func(configPeer *SyncPeerConfig) (brk bool) {
ss.syncConfig.failedPeers = append(ss.syncConfig.failedPeers, configPeer)
brk = true
return
}
ss.syncConfig.ForEachPeer(trackFailedPeer)
retryCount++
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
isBeacon, bc.ShardID(), otherHeight, currentHeight)
// should we still call UpdateConsensusInformation() upon state sync failure?
// how to handle error here?
}

ss.purgeOldBlocksFromCache()
if consensus != nil {
consensus.UpdateConsensusInformation()
Expand Down
10 changes: 5 additions & 5 deletions api/service/syncing/syncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ func TestCreateTestSyncPeerConfig(t *testing.T) {
}

// Simple test for IncorrectResponse
func TestCompareSyncPeerConfigByBlockHashes(t *testing.T) {
func TestCompareSyncPeerConfigByblockHashes(t *testing.T) {
client := &downloader.Client{}
blockHashes1 := [][]byte{{1, 2, 3}}
syncPeerConfig1 := CreateTestSyncPeerConfig(client, blockHashes1)
blockHashes2 := [][]byte{{1, 2, 4}}
syncPeerConfig2 := CreateTestSyncPeerConfig(client, blockHashes2)

// syncPeerConfig1 is less than syncPeerConfig2
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), -1, "syncPeerConfig1 is less than syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), -1, "syncPeerConfig1 is less than syncPeerConfig2")

// syncPeerConfig1 is greater than syncPeerConfig2
blockHashes1[0][2] = 5
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), 1, "syncPeerConfig1 is greater than syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), 1, "syncPeerConfig1 is greater than syncPeerConfig2")

// syncPeerConfig1 is equal to syncPeerConfig2
blockHashes1[0][2] = 4
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is equal to syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is equal to syncPeerConfig2")

// syncPeerConfig1 is less than syncPeerConfig2
blockHashes1 = blockHashes1[:1]
assert.Equal(t, CompareSyncPeerConfigByBlockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is less than syncPeerConfig2")
assert.Equal(t, CompareSyncPeerConfigByblockHashes(syncPeerConfig1, syncPeerConfig2), 0, "syncPeerConfig1 is less than syncPeerConfig2")
}

func TestCreateStateSync(t *testing.T) {
Expand Down

0 comments on commit 521cd50

Please sign in to comment.