Skip to content

Commit

Permalink
Merge pull request #1631 from orbs-network/feature/robust-sync
Browse files Browse the repository at this point in the history
Feature/robust sync
  • Loading branch information
noambergIL authored Oct 14, 2020
2 parents cc97bed + d5dcb21 commit dc95292
Show file tree
Hide file tree
Showing 26 changed files with 233 additions and 160 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/ethereum/go-ethereum v1.9.6
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/google/go-cmp v0.3.1
github.com/huin/goupnp v1.0.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.1 // indirect
Expand All @@ -28,11 +29,11 @@ require (
github.com/orbs-network/go-mock v1.1.0
github.com/orbs-network/govnr v0.2.0
github.com/orbs-network/healthcheck v1.1.0
github.com/orbs-network/lean-helix-go v0.5.0
github.com/orbs-network/lean-helix-go v0.5.1-0.20201011065550-9473b7e1df05
github.com/orbs-network/membuffers v0.4.0
github.com/orbs-network/orbs-client-sdk-go v0.18.0
github.com/orbs-network/orbs-contract-sdk v1.8.0
github.com/orbs-network/orbs-spec v0.0.0-20200715083427-d937ef1ec8ef
github.com/orbs-network/orbs-spec v0.0.0-20201013064336-2e9a104f3993
github.com/orbs-network/scribe v0.2.3
github.com/pborman/uuid v1.2.0 // indirect
github.com/pkg/errors v0.9.1
Expand Down
1 change: 1 addition & 0 deletions services/blockstorage/internodesync/block_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type BlockSyncStorage interface {
GetLastCommittedBlockHeight(ctx context.Context, input *services.GetLastCommittedBlockHeightInput) (*services.GetLastCommittedBlockHeightOutput, error)
NodeSyncCommitBlock(ctx context.Context, input *services.CommitBlockInput) (*services.CommitBlockOutput, error)
ValidateBlockForCommit(ctx context.Context, input *services.ValidateBlockForCommitInput) (*services.ValidateBlockForCommitOutput, error)
ValidateChainTip(ctx context.Context, input *services.ValidateChainTipInput) (*services.ValidateChainTipOutput, error)
UpdateConsensusAlgosAboutLastCommittedBlockInLocalPersistence(ctx context.Context)
GetBlock(height primitives.BlockHeight) (*protocol.BlockPairContainer, error)
GetSyncState() SyncState
Expand Down
37 changes: 19 additions & 18 deletions services/blockstorage/internodesync/state_processing_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *processingBlocksState) processState(ctx context.Context) syncState {
logger.Info("failed to verify the blocks chunk range received via sync", log.Error(err))
return s.factory.CreateCollectingAvailabilityResponseState()
}
if err := s.validatePosChain(s.blocks.BlockPairs, syncState, s.factory.config.CommitteeGracePeriod(), receivedSyncBlocksOrder); err != nil {
if err := s.validateChainFork(ctx, s.blocks.BlockPairs, syncState, s.factory.config.CommitteeGracePeriod(), receivedSyncBlocksOrder); err != nil {
s.metrics.failedValidationBlocks.Inc()
logger.Info("failed to verify the blocks chunk PoS received via sync", log.Error(err))
return s.factory.CreateCollectingAvailabilityResponseState()
Expand All @@ -85,26 +85,26 @@ func (s *processingBlocksState) processState(ctx context.Context) syncState {
return nil
}
prevBlockPair := s.getPrevBlock(index, receivedSyncBlocksOrder)
blockHeight := getBlockHeight(blockPair)
_, err := s.storage.ValidateBlockForCommit(ctx, &services.ValidateBlockForCommitInput{BlockPair: blockPair, PrevBlockPair: prevBlockPair})
if err != nil {
blockHeight := getBlockHeight(blockPair)
if prevBlockPair == nil && (blockHeight > primitives.BlockHeight(1)) {
logger.Info("dropping last block in the batch (node does not hold previous block for consensus validation)", logfields.BlockHeight(blockHeight))
} else {
s.metrics.failedValidationBlocks.Inc()
logger.Info("failed to validate block received via sync", log.Error(err), logfields.BlockHeight(blockPair.TransactionsBlock.Header.BlockHeight()), log.Stringable("tx-block-header", blockPair.TransactionsBlock.Header)) // may be a valid failure if height isn't the next height
logger.Info("failed to validate block received via sync", log.Error(err), logfields.BlockHeight(blockHeight), log.Stringable("tx-block-header", blockPair.TransactionsBlock.Header)) // may be a valid failure if height isn't the next height
}
break
}
_, err = s.storage.NodeSyncCommitBlock(ctx, &services.CommitBlockInput{BlockPair: blockPair})
if err != nil {
s.metrics.failedCommitBlocks.Inc()
logger.Error("failed to commit block received via sync", log.Error(err), logfields.BlockHeight(blockPair.TransactionsBlock.Header.BlockHeight()))
logger.Error("failed to commit block received via sync", log.Error(err), logfields.BlockHeight(blockHeight))
break
} else {
s.metrics.lastCommittedTime.Update(time.Now().UnixNano())
s.metrics.committedBlocks.Inc()
logger.Info("successfully committed block received via sync", logfields.BlockHeight(blockPair.TransactionsBlock.Header.BlockHeight()))
logger.Info("successfully committed block received via sync", logfields.BlockHeight(blockHeight))
}
}

Expand All @@ -128,7 +128,7 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair
if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_RESERVED && syncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_ASCENDING {
prevHeight := inOrderHeight
for _, blockPair := range s.blocks.BlockPairs {
currentHeight := blockPair.TransactionsBlock.Header.BlockHeight()
currentHeight := getBlockHeight(blockPair)
if currentHeight != prevHeight+1 {
return fmt.Errorf("invalid blocks chunk found a non consecutive ascending range prevHeight (%d), currentHeight (%d)", prevHeight, currentHeight)
}
Expand All @@ -141,7 +141,7 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair

} else if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING {
firstBlock := blocks[0]
firstBlockHeight := firstBlock.TransactionsBlock.Header.BlockHeight()
firstBlockHeight := getBlockHeight(firstBlock)
if inOrderHeight == topHeight {
if firstBlockHeight <= inOrderHeight {
return fmt.Errorf("invalid blocks chunk where first block height (%d) < syncState.inOrderHeight (%d)", firstBlockHeight, inOrderHeight)
Expand All @@ -153,7 +153,7 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair
}
prevHeight := firstBlockHeight + 1
for _, blockPair := range s.blocks.BlockPairs {
currentHeight := blockPair.TransactionsBlock.Header.BlockHeight()
currentHeight := getBlockHeight(blockPair)
if currentHeight+1 != prevHeight {
return fmt.Errorf("invalid blocks chunk found a non consecutive descending range prevHeight (%d), currentHeight (%d)", prevHeight, currentHeight)
}
Expand All @@ -165,28 +165,29 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair
}

// assumes blocks range is correct. Specifically in descending (blockStorage.lastSynced.height - 1 == blocks[0].height ) or ( blocks[0].height > blockStorage.top.height)
func (s *processingBlocksState) validatePosChain(blocks []*protocol.BlockPairContainer, syncState SyncState, committeeGraePeriod time.Duration, receivedSyncBlocksOrder gossipmessages.SyncBlocksOrder) error {
func (s *processingBlocksState) validateChainFork(ctx context.Context, blocks []*protocol.BlockPairContainer, syncState SyncState, committeeGraePeriod time.Duration, receivedSyncBlocksOrder gossipmessages.SyncBlocksOrder) error {
syncBlocksOrder := s.factory.getSyncBlocksOrder()
topHeight, _, lastSyncedHeight := syncState.GetSyncStateBlockHeights()

if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_RESERVED && syncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_ASCENDING {
return nil
}

} else if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING {
if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING {
firstBlock := blocks[0]
firstBlockHeight := firstBlock.TransactionsBlock.Header.BlockHeight()
firstBlockHeight := getBlockHeight(firstBlock)
if firstBlockHeight == lastSyncedHeight-1 { // will verify hash pointer to block
if nextBlock, err := s.storage.GetBlock(firstBlock.TransactionsBlock.Header.BlockHeight() + 1); err == nil && nextBlock != nil {
if nextBlock, err := s.storage.GetBlock(firstBlockHeight + 1); err == nil && nextBlock != nil {
// prepend
blocks = append([]*protocol.BlockPairContainer{nextBlock}, blocks...)
} else {
return err
}
} else if firstBlockHeight > topHeight { // verify the first block reference complies with committee PoS honesty assumption
topBlockReference := time.Duration(firstBlock.TransactionsBlock.Header.ReferenceTime()) * time.Second
now := time.Duration(time.Now().Unix()) * time.Second
if topBlockReference+committeeGraePeriod < now {
return errors.New(fmt.Sprintf("block reference is not included in committee valid reference grace: block reference (%d), now (%d), grace (%d)", topBlockReference, now, committeeGraePeriod))
} else if firstBlockHeight > topHeight { // verify the first block complies with PoS honesty assumption (1. committee - refTime - is up to date (between now and now-12hrs) or 2. soft: block has at least one honest (weight > f) of current committee (ref = now)
prevBlockPair := s.getPrevBlock(0, receivedSyncBlocksOrder)
_, err := s.storage.ValidateChainTip(ctx, &services.ValidateChainTipInput{BlockPair: firstBlock, PrevBlockPair: prevBlockPair})
if err != nil {
return err
}
} else {
return errors.New(fmt.Sprintf("blocks chunk received (firstHeight %d) does not match current syncState (%v)", firstBlockHeight, syncState))
Expand Down Expand Up @@ -216,7 +217,7 @@ func verifyPrevHashPointer(blockPair *protocol.BlockPairContainer, prevBlockPair

func (s *processingBlocksState) getPrevBlock(index int, receivedSyncBlocksOrder gossipmessages.SyncBlocksOrder) (prevBlock *protocol.BlockPairContainer) {
blocks := s.blocks.BlockPairs
blockHeight := blocks[index].TransactionsBlock.Header.BlockHeight()
blockHeight := getBlockHeight(blocks[index])
if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_ASCENDING {
if index == 0 {
if blockHeight > 0 {
Expand Down
84 changes: 32 additions & 52 deletions services/blockstorage/internodesync/state_processing_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/orbs-network/orbs-spec/types/go/primitives"
"github.com/orbs-network/orbs-spec/types/go/protocol"
"github.com/orbs-network/orbs-spec/types/go/protocol/gossipmessages"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"testing"
"time"
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestStateProcessingBlocksDescending_CommitsAccordinglyAndMovesToCollectingA
h.expectBlockValidationQueriesFromStorage(11)
h.expectBlockCommitsToStorage(11)
state := h.factory.CreateProcessingBlocksState(message)
h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1)
nextState := state.processState(ctx)
require.IsType(t, &collectingAvailabilityResponsesState{}, nextState, "next state after commit should be collecting availability responses")

Expand All @@ -76,6 +78,8 @@ func TestStateProcessingBlocksDescending_CommitsAccordinglyAndMovesToCollectingA
h.storage.When("GetSyncState").Return(syncState).Times(1)
h.storage.When("GetBlock", mock.Any).Return(nil).Times(1)
h.storage.When("ValidateBlockForCommit", mock.Any, mock.Any).Return(nil, nil).Times(10)
h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1)

h.expectBlockCommitsToStorage(10)

state = h.factory.CreateProcessingBlocksState(message)
Expand Down Expand Up @@ -176,6 +180,7 @@ func TestStateProcessingBlocksDescending_ValidateBlockFailureReturnsToCollecting
Build().Message

expectedFailedBlockHeight := primitives.BlockHeight(1)
h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1)
h.expectBlockValidationQueriesFromStorageAndFailLastValidation(11, expectedFailedBlockHeight)
h.expectBlockCommitsToStorage(10)

Expand All @@ -188,6 +193,31 @@ func TestStateProcessingBlocksDescending_ValidateBlockFailureReturnsToCollecting
})
}

func TestStateProcessingBlocksDescending_ValidateChainTipFailureReturnsToCollectingAvailabilityResponses(t *testing.T) {
with.Context(func(ctx context.Context) {
with.Logging(t, func(harness *with.LoggingHarness) {
h := newBlockSyncHarness(harness.Logger).
withDescendingEnabled(true)
harness.AllowErrorsMatching("failed to validate block received via sync")

message := builders.BlockSyncResponseInput().
WithBlocksOrder(gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING).
WithFirstBlockHeight(11).
WithLastBlockHeight(1).
WithLastCommittedBlockHeight(11).
Build().Message

h.storage.When("GetSyncState").Return(nil).Times(1)
h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, errors.New(" failed to validate the chain tip")).Times(1)
state := h.factory.CreateProcessingBlocksState(message)
nextState := state.processState(ctx)

require.IsType(t, &collectingAvailabilityResponsesState{}, nextState, "next state after validation error should be collecting availability responses")
h.verifyMocks(t)
})
})
}

func TestStateProcessingBlocksDescending_ValidateBlockChunkRangeFailureReturnsToCollectingAvailabilityResponses(t *testing.T) {
with.Context(func(ctx context.Context) {
with.Logging(t, func(harness *with.LoggingHarness) {
Expand Down Expand Up @@ -239,58 +269,6 @@ func TestStateProcessingBlocksDescending_ValidateBlockChunkRangeFailureReturnsTo
})
}

func TestStateProcessingBlocksDescending_ValidatePosChainRefTimeFailure(t *testing.T) {
with.Context(func(ctx context.Context) {
with.Logging(t, func(harness *with.LoggingHarness) {
h := newBlockSyncHarness(harness.Logger).
withDescendingEnabled(true)
harness.AllowErrorsMatching("failed to verify the blocks chunk PoS received via sync")

blockPair := builders.BlockPair().
WithHeight(primitives.BlockHeight(10)).
Build()
syncState := SyncState{InOrderBlock: blockPair, TopBlock: blockPair, LastSyncedBlock: blockPair}
h.storage.When("GetSyncState").Return(syncState).Times(1)
h.storage.When("GetBlock", mock.Any).Return(nil)
h.storage.Never("ValidateBlockForCommit", mock.Any, mock.Any)
h.storage.Never("NodeSyncCommitBlock", mock.Any, mock.Any)

var blocks []*protocol.BlockPairContainer
var prevBlock *protocol.BlockPairContainer
currentTime := time.Now()
committeeGracePeriod := 12 * time.Hour
flakinessBuffer := 10 * time.Second
// block chunk does pertain to PoS honesty assumption of 12hr committee (even though in tests it set to 100 seconds)
refTime := primitives.TimestampSeconds(currentTime.Add(-committeeGracePeriod).Add(-flakinessBuffer).Unix())
for i := 11; i <= 20; i++ {
blockTime := time.Unix(currentTime.Unix()+int64(i), 0) // deterministic block creation
blockPair := builders.BlockPair().
WithHeight(primitives.BlockHeight(i)).
WithBlockCreated(blockTime).
WithReferenceTime(refTime).
WithPrevBlock(prevBlock).
Build()
prevBlock = blockPair
blocks = append(blocks, blockPair)
}
reverse(blocks)

message := builders.BlockSyncResponseInput().
WithBlocksOrder(gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING).
WithBlocks(blocks).
WithFirstBlockHeight(20).
WithLastBlockHeight(11).
WithLastCommittedBlockHeight(20).
Build().Message

state := h.factory.CreateProcessingBlocksState(message)
nextState := state.processState(ctx)
require.IsType(t, &collectingAvailabilityResponsesState{}, nextState, "next state after validation error should be collecting availability responses")
h.verifyMocks(t)
})
})
}

func TestStateProcessingBlocksDescending_ValidatePosChainPrevHashFailure(t *testing.T) {
with.Context(func(ctx context.Context) {
with.Logging(t, func(harness *with.LoggingHarness) {
Expand Down Expand Up @@ -389,6 +367,7 @@ func TestStateProcessingBlocksDescending_CommitBlockFailureReturnsToCollectingAv
WithLastCommittedBlockHeight(11).
Build().Message

h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1)
h.expectBlockValidationQueriesFromStorage(11)
h.expectBlockCommitsToStorageAndFailLastCommit(11, message.SignedChunkRange.FirstBlockHeight())

Expand Down Expand Up @@ -450,6 +429,7 @@ func TestStateProcessingBlocksDescending_TerminatesOnContextTermination(t *testi
h.storage.When("GetSyncState").Return(syncState).Times(1)
h.storage.When("GetBlock", mock.Any).Return(nil)
h.storage.When("UpdateConsensusAlgosAboutLastCommittedBlockInLocalPersistence", mock.Any)
h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1)

cancel()
state := h.factory.CreateProcessingBlocksState(message)
Expand Down
8 changes: 8 additions & 0 deletions services/blockstorage/internodesync/storage_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,11 @@ func (s *blockSyncStorageMock) GetSyncState() SyncState {
}
}

func (s *blockSyncStorageMock) ValidateChainTip(ctx context.Context, input *services.ValidateChainTipInput) (*services.ValidateChainTipOutput, error) {
ret := s.Called(ctx, input)
if out := ret.Get(0); out != nil {
return out.(*services.ValidateChainTipOutput), ret.Error(1)
} else {
return nil, ret.Error(1)
}
}
Loading

0 comments on commit dc95292

Please sign in to comment.