diff --git a/go.mod b/go.mod index 588eecb7e..84b8adfe2 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/ethereum/go-ethereum v1.9.6 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect + github.com/go-stack/stack v1.8.0 // indirect github.com/google/go-cmp v0.3.1 github.com/huin/goupnp v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.1 // indirect @@ -28,11 +29,11 @@ require ( github.com/orbs-network/go-mock v1.1.0 github.com/orbs-network/govnr v0.2.0 github.com/orbs-network/healthcheck v1.1.0 - github.com/orbs-network/lean-helix-go v0.5.0 + github.com/orbs-network/lean-helix-go v0.5.1-0.20201011065550-9473b7e1df05 github.com/orbs-network/membuffers v0.4.0 github.com/orbs-network/orbs-client-sdk-go v0.18.0 github.com/orbs-network/orbs-contract-sdk v1.8.0 - github.com/orbs-network/orbs-spec v0.0.0-20200715083427-d937ef1ec8ef + github.com/orbs-network/orbs-spec v0.0.0-20201013064336-2e9a104f3993 github.com/orbs-network/scribe v0.2.3 github.com/pborman/uuid v1.2.0 // indirect github.com/pkg/errors v0.9.1 diff --git a/services/blockstorage/internodesync/block_sync.go b/services/blockstorage/internodesync/block_sync.go index 7511d7c79..8a33dd0a9 100644 --- a/services/blockstorage/internodesync/block_sync.go +++ b/services/blockstorage/internodesync/block_sync.go @@ -70,6 +70,7 @@ type BlockSyncStorage interface { GetLastCommittedBlockHeight(ctx context.Context, input *services.GetLastCommittedBlockHeightInput) (*services.GetLastCommittedBlockHeightOutput, error) NodeSyncCommitBlock(ctx context.Context, input *services.CommitBlockInput) (*services.CommitBlockOutput, error) ValidateBlockForCommit(ctx context.Context, input *services.ValidateBlockForCommitInput) (*services.ValidateBlockForCommitOutput, error) + ValidateChainTip(ctx context.Context, input *services.ValidateChainTipInput) (*services.ValidateChainTipOutput, error) UpdateConsensusAlgosAboutLastCommittedBlockInLocalPersistence(ctx context.Context) GetBlock(height primitives.BlockHeight) (*protocol.BlockPairContainer, error) GetSyncState() SyncState diff --git a/services/blockstorage/internodesync/state_processing_blocks.go b/services/blockstorage/internodesync/state_processing_blocks.go index 4b15e84a8..5b1ae2e29 100644 --- a/services/blockstorage/internodesync/state_processing_blocks.go +++ b/services/blockstorage/internodesync/state_processing_blocks.go @@ -72,7 +72,7 @@ func (s *processingBlocksState) processState(ctx context.Context) syncState { logger.Info("failed to verify the blocks chunk range received via sync", log.Error(err)) return s.factory.CreateCollectingAvailabilityResponseState() } - if err := s.validatePosChain(s.blocks.BlockPairs, syncState, s.factory.config.CommitteeGracePeriod(), receivedSyncBlocksOrder); err != nil { + if err := s.validateChainFork(ctx, s.blocks.BlockPairs, syncState, s.factory.config.CommitteeGracePeriod(), receivedSyncBlocksOrder); err != nil { s.metrics.failedValidationBlocks.Inc() logger.Info("failed to verify the blocks chunk PoS received via sync", log.Error(err)) return s.factory.CreateCollectingAvailabilityResponseState() @@ -85,26 +85,26 @@ func (s *processingBlocksState) processState(ctx context.Context) syncState { return nil } prevBlockPair := s.getPrevBlock(index, receivedSyncBlocksOrder) + blockHeight := getBlockHeight(blockPair) _, err := s.storage.ValidateBlockForCommit(ctx, &services.ValidateBlockForCommitInput{BlockPair: blockPair, PrevBlockPair: prevBlockPair}) if err != nil { - blockHeight := getBlockHeight(blockPair) if prevBlockPair == nil && (blockHeight > primitives.BlockHeight(1)) { logger.Info("dropping last block in the batch (node does not hold previous block for consensus validation)", logfields.BlockHeight(blockHeight)) } else { s.metrics.failedValidationBlocks.Inc() - logger.Info("failed to validate block received via sync", log.Error(err), logfields.BlockHeight(blockPair.TransactionsBlock.Header.BlockHeight()), log.Stringable("tx-block-header", blockPair.TransactionsBlock.Header)) // may be a valid failure if height isn't the next height + logger.Info("failed to validate block received via sync", log.Error(err), logfields.BlockHeight(blockHeight), log.Stringable("tx-block-header", blockPair.TransactionsBlock.Header)) // may be a valid failure if height isn't the next height } break } _, err = s.storage.NodeSyncCommitBlock(ctx, &services.CommitBlockInput{BlockPair: blockPair}) if err != nil { s.metrics.failedCommitBlocks.Inc() - logger.Error("failed to commit block received via sync", log.Error(err), logfields.BlockHeight(blockPair.TransactionsBlock.Header.BlockHeight())) + logger.Error("failed to commit block received via sync", log.Error(err), logfields.BlockHeight(blockHeight)) break } else { s.metrics.lastCommittedTime.Update(time.Now().UnixNano()) s.metrics.committedBlocks.Inc() - logger.Info("successfully committed block received via sync", logfields.BlockHeight(blockPair.TransactionsBlock.Header.BlockHeight())) + logger.Info("successfully committed block received via sync", logfields.BlockHeight(blockHeight)) } } @@ -128,7 +128,7 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_RESERVED && syncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_ASCENDING { prevHeight := inOrderHeight for _, blockPair := range s.blocks.BlockPairs { - currentHeight := blockPair.TransactionsBlock.Header.BlockHeight() + currentHeight := getBlockHeight(blockPair) if currentHeight != prevHeight+1 { return fmt.Errorf("invalid blocks chunk found a non consecutive ascending range prevHeight (%d), currentHeight (%d)", prevHeight, currentHeight) } @@ -141,7 +141,7 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair } else if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING { firstBlock := blocks[0] - firstBlockHeight := firstBlock.TransactionsBlock.Header.BlockHeight() + firstBlockHeight := getBlockHeight(firstBlock) if inOrderHeight == topHeight { if firstBlockHeight <= inOrderHeight { return fmt.Errorf("invalid blocks chunk where first block height (%d) < syncState.inOrderHeight (%d)", firstBlockHeight, inOrderHeight) @@ -153,7 +153,7 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair } prevHeight := firstBlockHeight + 1 for _, blockPair := range s.blocks.BlockPairs { - currentHeight := blockPair.TransactionsBlock.Header.BlockHeight() + currentHeight := getBlockHeight(blockPair) if currentHeight+1 != prevHeight { return fmt.Errorf("invalid blocks chunk found a non consecutive descending range prevHeight (%d), currentHeight (%d)", prevHeight, currentHeight) } @@ -165,28 +165,29 @@ func (s *processingBlocksState) validateBlocksRange(blocks []*protocol.BlockPair } // assumes blocks range is correct. Specifically in descending (blockStorage.lastSynced.height - 1 == blocks[0].height ) or ( blocks[0].height > blockStorage.top.height) -func (s *processingBlocksState) validatePosChain(blocks []*protocol.BlockPairContainer, syncState SyncState, committeeGraePeriod time.Duration, receivedSyncBlocksOrder gossipmessages.SyncBlocksOrder) error { +func (s *processingBlocksState) validateChainFork(ctx context.Context, blocks []*protocol.BlockPairContainer, syncState SyncState, committeeGraePeriod time.Duration, receivedSyncBlocksOrder gossipmessages.SyncBlocksOrder) error { syncBlocksOrder := s.factory.getSyncBlocksOrder() topHeight, _, lastSyncedHeight := syncState.GetSyncStateBlockHeights() if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_RESERVED && syncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_ASCENDING { return nil + } - } else if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING { + if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING { firstBlock := blocks[0] - firstBlockHeight := firstBlock.TransactionsBlock.Header.BlockHeight() + firstBlockHeight := getBlockHeight(firstBlock) if firstBlockHeight == lastSyncedHeight-1 { // will verify hash pointer to block - if nextBlock, err := s.storage.GetBlock(firstBlock.TransactionsBlock.Header.BlockHeight() + 1); err == nil && nextBlock != nil { + if nextBlock, err := s.storage.GetBlock(firstBlockHeight + 1); err == nil && nextBlock != nil { // prepend blocks = append([]*protocol.BlockPairContainer{nextBlock}, blocks...) } else { return err } - } else if firstBlockHeight > topHeight { // verify the first block reference complies with committee PoS honesty assumption - topBlockReference := time.Duration(firstBlock.TransactionsBlock.Header.ReferenceTime()) * time.Second - now := time.Duration(time.Now().Unix()) * time.Second - if topBlockReference+committeeGraePeriod < now { - return errors.New(fmt.Sprintf("block reference is not included in committee valid reference grace: block reference (%d), now (%d), grace (%d)", topBlockReference, now, committeeGraePeriod)) + } else if firstBlockHeight > topHeight { // verify the first block complies with PoS honesty assumption (1. committee - refTime - is up to date (between now and now-12hrs) or 2. soft: block has at least one honest (weight > f) of current committee (ref = now) + prevBlockPair := s.getPrevBlock(0, receivedSyncBlocksOrder) + _, err := s.storage.ValidateChainTip(ctx, &services.ValidateChainTipInput{BlockPair: firstBlock, PrevBlockPair: prevBlockPair}) + if err != nil { + return err } } else { return errors.New(fmt.Sprintf("blocks chunk received (firstHeight %d) does not match current syncState (%v)", firstBlockHeight, syncState)) @@ -216,7 +217,7 @@ func verifyPrevHashPointer(blockPair *protocol.BlockPairContainer, prevBlockPair func (s *processingBlocksState) getPrevBlock(index int, receivedSyncBlocksOrder gossipmessages.SyncBlocksOrder) (prevBlock *protocol.BlockPairContainer) { blocks := s.blocks.BlockPairs - blockHeight := blocks[index].TransactionsBlock.Header.BlockHeight() + blockHeight := getBlockHeight(blocks[index]) if receivedSyncBlocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_ASCENDING { if index == 0 { if blockHeight > 0 { diff --git a/services/blockstorage/internodesync/state_processing_blocks_test.go b/services/blockstorage/internodesync/state_processing_blocks_test.go index 62f3596da..8060dcf44 100644 --- a/services/blockstorage/internodesync/state_processing_blocks_test.go +++ b/services/blockstorage/internodesync/state_processing_blocks_test.go @@ -14,6 +14,7 @@ import ( "github.com/orbs-network/orbs-spec/types/go/primitives" "github.com/orbs-network/orbs-spec/types/go/protocol" "github.com/orbs-network/orbs-spec/types/go/protocol/gossipmessages" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "testing" "time" @@ -58,6 +59,7 @@ func TestStateProcessingBlocksDescending_CommitsAccordinglyAndMovesToCollectingA h.expectBlockValidationQueriesFromStorage(11) h.expectBlockCommitsToStorage(11) state := h.factory.CreateProcessingBlocksState(message) + h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1) nextState := state.processState(ctx) require.IsType(t, &collectingAvailabilityResponsesState{}, nextState, "next state after commit should be collecting availability responses") @@ -76,6 +78,8 @@ func TestStateProcessingBlocksDescending_CommitsAccordinglyAndMovesToCollectingA h.storage.When("GetSyncState").Return(syncState).Times(1) h.storage.When("GetBlock", mock.Any).Return(nil).Times(1) h.storage.When("ValidateBlockForCommit", mock.Any, mock.Any).Return(nil, nil).Times(10) + h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1) + h.expectBlockCommitsToStorage(10) state = h.factory.CreateProcessingBlocksState(message) @@ -176,6 +180,7 @@ func TestStateProcessingBlocksDescending_ValidateBlockFailureReturnsToCollecting Build().Message expectedFailedBlockHeight := primitives.BlockHeight(1) + h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1) h.expectBlockValidationQueriesFromStorageAndFailLastValidation(11, expectedFailedBlockHeight) h.expectBlockCommitsToStorage(10) @@ -188,6 +193,31 @@ func TestStateProcessingBlocksDescending_ValidateBlockFailureReturnsToCollecting }) } +func TestStateProcessingBlocksDescending_ValidateChainTipFailureReturnsToCollectingAvailabilityResponses(t *testing.T) { + with.Context(func(ctx context.Context) { + with.Logging(t, func(harness *with.LoggingHarness) { + h := newBlockSyncHarness(harness.Logger). + withDescendingEnabled(true) + harness.AllowErrorsMatching("failed to validate block received via sync") + + message := builders.BlockSyncResponseInput(). + WithBlocksOrder(gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING). + WithFirstBlockHeight(11). + WithLastBlockHeight(1). + WithLastCommittedBlockHeight(11). + Build().Message + + h.storage.When("GetSyncState").Return(nil).Times(1) + h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, errors.New(" failed to validate the chain tip")).Times(1) + state := h.factory.CreateProcessingBlocksState(message) + nextState := state.processState(ctx) + + require.IsType(t, &collectingAvailabilityResponsesState{}, nextState, "next state after validation error should be collecting availability responses") + h.verifyMocks(t) + }) + }) +} + func TestStateProcessingBlocksDescending_ValidateBlockChunkRangeFailureReturnsToCollectingAvailabilityResponses(t *testing.T) { with.Context(func(ctx context.Context) { with.Logging(t, func(harness *with.LoggingHarness) { @@ -239,58 +269,6 @@ func TestStateProcessingBlocksDescending_ValidateBlockChunkRangeFailureReturnsTo }) } -func TestStateProcessingBlocksDescending_ValidatePosChainRefTimeFailure(t *testing.T) { - with.Context(func(ctx context.Context) { - with.Logging(t, func(harness *with.LoggingHarness) { - h := newBlockSyncHarness(harness.Logger). - withDescendingEnabled(true) - harness.AllowErrorsMatching("failed to verify the blocks chunk PoS received via sync") - - blockPair := builders.BlockPair(). - WithHeight(primitives.BlockHeight(10)). - Build() - syncState := SyncState{InOrderBlock: blockPair, TopBlock: blockPair, LastSyncedBlock: blockPair} - h.storage.When("GetSyncState").Return(syncState).Times(1) - h.storage.When("GetBlock", mock.Any).Return(nil) - h.storage.Never("ValidateBlockForCommit", mock.Any, mock.Any) - h.storage.Never("NodeSyncCommitBlock", mock.Any, mock.Any) - - var blocks []*protocol.BlockPairContainer - var prevBlock *protocol.BlockPairContainer - currentTime := time.Now() - committeeGracePeriod := 12 * time.Hour - flakinessBuffer := 10 * time.Second - // block chunk does pertain to PoS honesty assumption of 12hr committee (even though in tests it set to 100 seconds) - refTime := primitives.TimestampSeconds(currentTime.Add(-committeeGracePeriod).Add(-flakinessBuffer).Unix()) - for i := 11; i <= 20; i++ { - blockTime := time.Unix(currentTime.Unix()+int64(i), 0) // deterministic block creation - blockPair := builders.BlockPair(). - WithHeight(primitives.BlockHeight(i)). - WithBlockCreated(blockTime). - WithReferenceTime(refTime). - WithPrevBlock(prevBlock). - Build() - prevBlock = blockPair - blocks = append(blocks, blockPair) - } - reverse(blocks) - - message := builders.BlockSyncResponseInput(). - WithBlocksOrder(gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING). - WithBlocks(blocks). - WithFirstBlockHeight(20). - WithLastBlockHeight(11). - WithLastCommittedBlockHeight(20). - Build().Message - - state := h.factory.CreateProcessingBlocksState(message) - nextState := state.processState(ctx) - require.IsType(t, &collectingAvailabilityResponsesState{}, nextState, "next state after validation error should be collecting availability responses") - h.verifyMocks(t) - }) - }) -} - func TestStateProcessingBlocksDescending_ValidatePosChainPrevHashFailure(t *testing.T) { with.Context(func(ctx context.Context) { with.Logging(t, func(harness *with.LoggingHarness) { @@ -389,6 +367,7 @@ func TestStateProcessingBlocksDescending_CommitBlockFailureReturnsToCollectingAv WithLastCommittedBlockHeight(11). Build().Message + h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1) h.expectBlockValidationQueriesFromStorage(11) h.expectBlockCommitsToStorageAndFailLastCommit(11, message.SignedChunkRange.FirstBlockHeight()) @@ -450,6 +429,7 @@ func TestStateProcessingBlocksDescending_TerminatesOnContextTermination(t *testi h.storage.When("GetSyncState").Return(syncState).Times(1) h.storage.When("GetBlock", mock.Any).Return(nil) h.storage.When("UpdateConsensusAlgosAboutLastCommittedBlockInLocalPersistence", mock.Any) + h.storage.When("ValidateChainTip", mock.Any, mock.Any).Return(nil, nil).Times(1) cancel() state := h.factory.CreateProcessingBlocksState(message) diff --git a/services/blockstorage/internodesync/storage_mock.go b/services/blockstorage/internodesync/storage_mock.go index f107a46a1..de8700860 100644 --- a/services/blockstorage/internodesync/storage_mock.go +++ b/services/blockstorage/internodesync/storage_mock.go @@ -67,3 +67,11 @@ func (s *blockSyncStorageMock) GetSyncState() SyncState { } } +func (s *blockSyncStorageMock) ValidateChainTip(ctx context.Context, input *services.ValidateChainTipInput) (*services.ValidateChainTipOutput, error) { + ret := s.Called(ctx, input) + if out := ret.Get(0); out != nil { + return out.(*services.ValidateChainTipOutput), ret.Error(1) + } else { + return nil, ret.Error(1) + } +} diff --git a/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go b/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go index 7eb83453a..336db13fd 100644 --- a/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go +++ b/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go @@ -24,7 +24,6 @@ import ( "time" ) - // Node Sync assumes consensus verifies block without blocking // Start syncing - in parallel to consensus service progressing // During HandleBlockConsensus and before block proof verification, commit blocks from consensus: @@ -68,7 +67,7 @@ func testSyncPetitionerConsensusVerifyNonBlocking(ctx context.Context, t *testin virtualMachine := &services.MockVirtualMachine{} cfg := config.ForConsensusContextTests(false) management := &services.MockManagement{} - management.When("GetGenesisReference", mock.Any, mock.Any).Return(&services.GetGenesisReferenceOutput{CurrentReference: 5000, GenesisReference: 0,}, nil) + management.When("GetGenesisReference", mock.Any, mock.Any).Return(&services.GetGenesisReferenceOutput{CurrentReference: 5000, GenesisReference: 0}, nil) consensusContext := consensuscontext.NewConsensusContext(harness.txPool, virtualMachine, harness.stateStorage, management, cfg, harness.Logger, metric.NewRegistry()) timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -97,7 +96,7 @@ func testSyncPetitionerConsensusVerifyNonBlocking(ctx context.Context, t *testin virtualMachine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { output, _ := harness.stateStorage.GetLastCommittedBlockInfo(ctx, &services.GetLastCommittedBlockInfoInput{}) currentHeight := output.BlockHeight - if currentHeight >= input.BlockHeight + primitives.BlockHeight(numOfStateRevisionsToRetain) { + if currentHeight >= input.BlockHeight+primitives.BlockHeight(numOfStateRevisionsToRetain) { return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, numOfStateRevisionsToRetain)) } return &services.CallSystemContractOutput{ @@ -107,8 +106,12 @@ func testSyncPetitionerConsensusVerifyNonBlocking(ctx context.Context, t *testin }) harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { - if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY { - simulateVerifyBlockConsensus(ctx, t, consensusContext, input.BlockPair.TransactionsBlock.Header.BlockHeight(), done) + if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY { + var prevBlockReferenceTime primitives.TimestampSeconds + if input.PrevBlockPair != nil { + prevBlockReferenceTime = input.PrevBlockPair.TransactionsBlock.Header.ReferenceTime() + } + simulateVerifyBlockConsensus(ctx, t, consensusContext, input.BlockPair.TransactionsBlock.Header.BlockHeight(), prevBlockReferenceTime, done) } return nil, nil }) @@ -130,7 +133,6 @@ func testSyncPetitionerConsensusVerifyNonBlocking(ctx context.Context, t *testin } } - func simulateConsensusCommits(ctx context.Context, harness *harness, blocks []*protocol.BlockPairContainer, committedBlockHeights chan primitives.BlockHeight, target int) { for i := 0; i < target; i++ { _, err := harness.commitBlock(ctx, blocks[i]) @@ -140,12 +142,13 @@ func simulateConsensusCommits(ctx context.Context, harness *harness, blocks []*p } } -func simulateVerifyBlockConsensus(ctx context.Context, tb testing.TB, consensusContext services.ConsensusContext, currentBlockHeight primitives.BlockHeight, done chan struct{}) { +func simulateVerifyBlockConsensus(ctx context.Context, tb testing.TB, consensusContext services.ConsensusContext, currentBlockHeight primitives.BlockHeight, prevBlockReferenceTime primitives.TimestampSeconds, done chan struct{}) { go func() { consensusContext.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ - CurrentBlockHeight: currentBlockHeight, - RandomSeed: 0, - MaxCommitteeSize: 4, + CurrentBlockHeight: currentBlockHeight, + RandomSeed: 0, + MaxCommitteeSize: 4, + PrevBlockReferenceTime: prevBlockReferenceTime, }) done <- struct{}{} }() diff --git a/services/blockstorage/test/block_sync_full_flow_test.go b/services/blockstorage/test/block_sync_full_flow_test.go index 4af55bc99..5246be9b4 100644 --- a/services/blockstorage/test/block_sync_full_flow_test.go +++ b/services/blockstorage/test/block_sync_full_flow_test.go @@ -32,7 +32,8 @@ func TestSyncPetitioner_CompleteSyncFlow(t *testing.T) { withSyncCollectChunksTimeout(50 * time.Millisecond). withBlockSyncDescendingEnabled(false) // ascending order - testSyncPetitionerCompleteSyncFlow(ctx, t, harness) }) + testSyncPetitionerCompleteSyncFlow(ctx, t, harness) + }) } func TestSyncPetitioner_CompleteSyncFlowDescending(t *testing.T) { @@ -48,7 +49,6 @@ func TestSyncPetitioner_CompleteSyncFlowDescending(t *testing.T) { }) } - func testSyncPetitionerCompleteSyncFlow(ctx context.Context, t *testing.T, harness *harness) { const NUM_BLOCKS = 20 blockChain := generateInMemoryBlockChain(NUM_BLOCKS) @@ -110,6 +110,7 @@ func requireValidHandleBlockConsensusMode(t *testing.T, mode handlers.HandleBloc handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE, handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, + handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_CHAIN_TIP, }, mode, "consensus updates must be update or update+verify") } @@ -162,7 +163,6 @@ func (s *syncFlowResults) logHandleBlockConsensusCalls(t *testing.T, input *hand } } - func requireBlockSyncRequestConformsToBlockAvailabilityResponse(t *testing.T, input *gossiptopics.BlockSyncRequestInput, availableBlocks primitives.BlockHeight, sources ...int) { sourceAddresses := make([]primitives.NodeAddress, 0, len(sources)) for _, sourceIndex := range sources { @@ -177,7 +177,7 @@ func requireBlockSyncRequestConformsToBlockAvailabilityResponse(t *testing.T, in if blocksOrder == gossipmessages.SYNC_BLOCKS_ORDER_DESCENDING { require.Conditionf(t, func() (success bool) { return (lastRequestedBlock >= 1 && lastRequestedBlock <= availableBlocks) && (firstRequestedBlock == 0 || (firstRequestedBlock >= lastRequestedBlock && firstRequestedBlock <= availableBlocks)) - }, "request is not consistent with my BlockAvailabilityResponse: first (%d) and last (%d) requested block must be smaller than total (%d); either first requested block is unknown(0) or first must be larger than last", firstRequestedBlock, lastRequestedBlock, availableBlocks ) + }, "request is not consistent with my BlockAvailabilityResponse: first (%d) and last (%d) requested block must be smaller than total (%d); either first requested block is unknown(0) or first must be larger than last", firstRequestedBlock, lastRequestedBlock, availableBlocks) } else { require.Conditionf(t, func() (success bool) { diff --git a/services/blockstorage/test/block_sync_stress_race_test.go b/services/blockstorage/test/block_sync_stress_race_test.go index 9b2602372..c95293918 100644 --- a/services/blockstorage/test/block_sync_stress_race_test.go +++ b/services/blockstorage/test/block_sync_stress_race_test.go @@ -73,9 +73,9 @@ func testSyncPetitionerStressCommitsDuringSync(ctx context.Context, t *testing.T }) harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { - if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE && input.PrevCommittedBlockPair != nil { + if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE && input.PrevBlockPair != nil { currHeight := input.BlockPair.TransactionsBlock.Header.BlockHeight() - prevHeight := input.PrevCommittedBlockPair.TransactionsBlock.Header.BlockHeight() + prevHeight := input.PrevBlockPair.TransactionsBlock.Header.BlockHeight() if currHeight != prevHeight+1 { done <- struct{}{} require.Failf(t, "HandleBlockConsensus given invalid args", "called with height %d and prev height %d", currHeight, prevHeight) diff --git a/services/blockstorage/validate_block.go b/services/blockstorage/validate_block.go index ae0aed264..63e888499 100644 --- a/services/blockstorage/validate_block.go +++ b/services/blockstorage/validate_block.go @@ -31,19 +31,37 @@ func (s *Service) ValidateBlockForCommit(ctx context.Context, input *services.Va return nil, blockHeightError } - logger.Info("ValidateBlockForCommit calling notifyConsensusAlgos with VERIFY_AND_UPDATE", logfields.BlockHeight(input.BlockPair.TransactionsBlock.Header.BlockHeight())) - if err := s.notifyConsensusAlgos(ctx, input.PrevBlockPair, input.BlockPair, handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY); err != nil { + logger.Info("ValidateBlockForCommit calling notifyConsensusAlgos with VERIFY_ONLY", logfields.BlockHeight(input.BlockPair.TransactionsBlock.Header.BlockHeight())) + mode := handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY + if err := s.notifyConsensusAlgos(ctx, input.PrevBlockPair, input.BlockPair, mode); err != nil { if ctx.Err() == nil { // this may fail rightfully on graceful shutdown (ctx.Done), we don't want to report an error in this case logger.Error("ValidateBlockForCommit(): notifyConsensusAlgos() failed (block validation by consensus algo failed)", log.Error(err), log.Stringable("tx-block-header", input.BlockPair.TransactionsBlock.Header)) } return nil, err } else { - logger.Info("ValidateBlockForCommit returned from notifyConsensusAlgos with VERIFY_AND_UPDATE", logfields.BlockHeight(input.BlockPair.TransactionsBlock.Header.BlockHeight())) + logger.Info("ValidateBlockForCommit returned from notifyConsensusAlgos with no error", logfields.BlockHeight(input.BlockPair.TransactionsBlock.Header.BlockHeight())) } return &services.ValidateBlockForCommitOutput{}, nil } +func (s *Service) ValidateChainTip(ctx context.Context, input *services.ValidateChainTipInput) (*services.ValidateChainTipOutput, error) { + logger := s.logger.WithTags(trace.LogFieldFrom(ctx)) + + logger.Info("ValidateChainTipCommit calling notifyConsensusAlgos", logfields.BlockHeight(input.BlockPair.TransactionsBlock.Header.BlockHeight())) + mode := handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_CHAIN_TIP + if err := s.notifyConsensusAlgos(ctx, input.PrevBlockPair, input.BlockPair, mode); err != nil { + if ctx.Err() == nil { // this may fail rightfully on graceful shutdown (ctx.Done), we don't want to report an error in this case + logger.Error("ValidateChainTip(): notifyConsensusAlgos() failed (block validation by consensus algo failed)", log.Error(err), log.Stringable("tx-block-header", input.BlockPair.TransactionsBlock.Header)) + } + return nil, err + } else { + logger.Info("ValidateChainTip returned from notifyConsensusAlgos with no error", logfields.BlockHeight(input.BlockPair.TransactionsBlock.Header.BlockHeight())) + } + + return &services.ValidateChainTipOutput{}, nil +} + // how to check if a block already exists: https://github.com/orbs-network/orbs-spec/issues/50 func (s *Service) validateBlockDoesNotExist(ctx context.Context, txBlockHeader *protocol.TransactionsBlockHeader, rsBlockHeader *protocol.ResultsBlockHeader, lastCommittedBlock *protocol.BlockPairContainer) (bool, error) { logger := s.logger.WithTags(trace.LogFieldFrom(ctx)) @@ -103,7 +121,7 @@ func (s *Service) validateProtocolVersion(blockPair *protocol.BlockPairContainer rsBlockHeader := blockPair.ResultsBlock.Header if txBlockHeader.ProtocolVersion() > config.MAXIMAL_PROTOCOL_VERSION_SUPPORTED_VALUE { - return fmt.Errorf("protocol version (%d) higher than maximal supported (%d) in transactions block header",txBlockHeader.ProtocolVersion(), config.MAXIMAL_PROTOCOL_VERSION_SUPPORTED_VALUE) + return fmt.Errorf("protocol version (%d) higher than maximal supported (%d) in transactions block header", txBlockHeader.ProtocolVersion(), config.MAXIMAL_PROTOCOL_VERSION_SUPPORTED_VALUE) } if rsBlockHeader.ProtocolVersion() > config.MAXIMAL_PROTOCOL_VERSION_SUPPORTED_VALUE { @@ -120,7 +138,8 @@ func (s *Service) notifyConsensusAlgos( mode handlers.HandleBlockConsensusMode) error { verifyMode := mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE || - mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY + mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY || + mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_CHAIN_TIP s.consensusBlocksHandlers.RLock() defer s.consensusBlocksHandlers.RUnlock() @@ -129,10 +148,10 @@ func (s *Service) notifyConsensusAlgos( verifiedCount := 0 for _, handler := range s.consensusBlocksHandlers.handlers { _, latestErr := handler.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: mode, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: blockPair, - PrevCommittedBlockPair: prevBlockPair, // TODO (v1) rename to HandleBlockConsensusInput.PrevCommittedBlockPair to PrevBlockPair + Mode: mode, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: blockPair, + PrevBlockPair: prevBlockPair, }) if verifyMode && latestErr == nil { diff --git a/services/consensusalgo/benchmarkconsensus/service.go b/services/consensusalgo/benchmarkconsensus/service.go index 6bd8fca64..b7d5f7909 100644 --- a/services/consensusalgo/benchmarkconsensus/service.go +++ b/services/consensusalgo/benchmarkconsensus/service.go @@ -123,7 +123,7 @@ func NewBenchmarkConsensusAlgo( } func (s *Service) HandleBlockConsensus(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { - return nil, s.handleBlockConsensusFromHandler(input.Mode, input.BlockType, input.BlockPair, input.PrevCommittedBlockPair) + return nil, s.handleBlockConsensusFromHandler(input.Mode, input.BlockType, input.BlockPair, input.PrevBlockPair) } func (s *Service) HandleBenchmarkConsensusCommit(ctx context.Context, input *gossiptopics.BenchmarkConsensusCommitInput) (*gossiptopics.EmptyOutput, error) { diff --git a/services/consensusalgo/benchmarkconsensus/test/harness.go b/services/consensusalgo/benchmarkconsensus/test/harness.go index 8f357a0d8..27b1c4dc1 100644 --- a/services/consensusalgo/benchmarkconsensus/test/harness.go +++ b/services/consensusalgo/benchmarkconsensus/test/harness.go @@ -116,10 +116,10 @@ func (h *harness) verifyHandlerRegistrations(t *testing.T) { func (h *harness) handleBlockConsensus(ctx context.Context, mode handlers.HandleBlockConsensusMode, blockPair *protocol.BlockPairContainer, prevBlockPair *protocol.BlockPairContainer) error { _, err := h.service.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: mode, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: blockPair, - PrevCommittedBlockPair: prevBlockPair, + Mode: mode, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: blockPair, + PrevBlockPair: prevBlockPair, }) return err } diff --git a/services/consensusalgo/benchmarkconsensus/test/leader_test.go b/services/consensusalgo/benchmarkconsensus/test/leader_test.go index 8b1bd0392..3df5a27cd 100644 --- a/services/consensusalgo/benchmarkconsensus/test/leader_test.go +++ b/services/consensusalgo/benchmarkconsensus/test/leader_test.go @@ -49,10 +49,10 @@ func TestLeaderInitWithExistingBlocks_DoesNotCreateGenesisBlock(t *testing.T) { h.blockStorage.Reset().When("RegisterConsensusBlocksHandler", mock.Any).Call(func(handler handlers.ConsensusBlocksHandler) { // this recreates how block storage updates us on last committed block on init (via the call to RegisterConsensusBlocksHandler) _, err := handler.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: block17, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: block17, + PrevBlockPair: nil, }) require.NoError(t, err, "failed calling HandleBlockConsensus") diff --git a/services/consensusalgo/leanhelixconsensus/block_provider.go b/services/consensusalgo/leanhelixconsensus/block_provider.go index 8cce6ae4c..8640ae118 100644 --- a/services/consensusalgo/leanhelixconsensus/block_provider.go +++ b/services/consensusalgo/leanhelixconsensus/block_provider.go @@ -133,7 +133,7 @@ func (p *blockProvider) RequestNewBlockProposal(ctx context.Context, blockHeight } -func (s *Service) validateBlockConsensus(ctx context.Context, blockPair *protocol.BlockPairContainer, prevBlockPair *protocol.BlockPairContainer) error { +func (s *Service) validateBlockConsensus(ctx context.Context, blockPair *protocol.BlockPairContainer, prevBlockPair *protocol.BlockPairContainer, softVerify bool) error { if ctx.Err() != nil { return errors.New("context canceled") } @@ -148,7 +148,7 @@ func (s *Service) validateBlockConsensus(ctx context.Context, blockPair *protoco prevBlockProof = prevBlockPair.TransactionsBlock.BlockProof.LeanHelix() } - err := s.leanHelix.ValidateBlockConsensus(ctx, ToLeanHelixBlock(blockPair), blockProof, ToLeanHelixBlock(prevBlockPair), prevBlockProof) + err := s.leanHelix.ValidateBlockConsensus(ctx, ToLeanHelixBlock(blockPair), blockProof, ToLeanHelixBlock(prevBlockPair), prevBlockProof, softVerify) if err != nil { return errors.Wrapf(err, "validateBlockConsensus(): error when calling leanHelix.ValidateBlockConsensus()") } @@ -189,3 +189,33 @@ func validLeanHelixBlockPair(blockPair *protocol.BlockPairContainer) error { func (p *blockProvider) GenerateGenesisBlockProposal(ctx context.Context) (lh.Block, lhprimitives.BlockHash) { return nil, nil } + +func (s *Service) verifyChainTip(ctx context.Context, blockPair *protocol.BlockPairContainer, prevBlockPair *protocol.BlockPairContainer) error { + if err := s.blockProvider.validateBlockCommittee(ctx, blockPair, prevBlockPair); err != nil { + s.logger.Info("HandleBlockConsensus()::verifyChainTip - Failed to validate block committee", log.Error(err)) + return s.validateBlockConsensus(ctx, blockPair, nil, true) // prevBlock nil => use current ref time (committee) + } + return nil +} + +func (p *blockProvider) validateBlockCommittee(ctx context.Context, blockPair *protocol.BlockPairContainer, prevBlockPair *protocol.BlockPairContainer) error { + _, err := p.consensusContext.ValidateBlockCommittee(ctx, &services.ValidateBlockCommitteeInput{ + BlockHeight: getBlockHeight(blockPair), + PrevBlockReferenceTime: getBlockReferenceTime(prevBlockPair), + }) + return err +} + +func getBlockHeight(block *protocol.BlockPairContainer) primitives.BlockHeight { + if block == nil { + return 0 + } + return block.TransactionsBlock.Header.BlockHeight() +} + +func getBlockReferenceTime(block *protocol.BlockPairContainer) primitives.TimestampSeconds { + if block == nil { + return 0 + } + return block.TransactionsBlock.Header.ReferenceTime() +} diff --git a/services/consensusalgo/leanhelixconsensus/service.go b/services/consensusalgo/leanhelixconsensus/service.go index 8967b9293..b4c44266c 100644 --- a/services/consensusalgo/leanhelixconsensus/service.go +++ b/services/consensusalgo/leanhelixconsensus/service.go @@ -128,7 +128,7 @@ func (s *Service) HandleBlockConsensus(ctx context.Context, input *handlers.Hand blockType := input.BlockType blockPair := input.BlockPair - prevBlockPair := input.PrevCommittedBlockPair + prevBlockPair := input.PrevBlockPair var lhBlockProof []byte var lhBlock lh.Block @@ -136,12 +136,20 @@ func (s *Service) HandleBlockConsensus(ctx context.Context, input *handlers.Hand return nil, errors.Errorf("HandleBlockConsensus(): LeanHelix: received unsupported block type %s", blockType) } + if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_CHAIN_TIP { + err := s.verifyChainTip(ctx, blockPair, prevBlockPair) + if err != nil { + s.logger.Info("HandleBlockConsensus()::VERIFY_CHAIN_TIP - Failed to verify chain tip with LeanHelix", log.Error(err)) + return nil, err + } + } + // validate the lhBlock consensus (lhBlock and proof) if shouldValidateBlockConsensusWithLeanHelix(input.Mode) { //Validate no matter what Should be changed with the full implementation of audit nodes. s.validateBlockExecutionIfYoung(ctx, blockPair, prevBlockPair) - err := s.validateBlockConsensus(ctx, blockPair, prevBlockPair) + err := s.validateBlockConsensus(ctx, blockPair, prevBlockPair, false) if err != nil { s.logger.Info("HandleBlockConsensus(): Failed validating block consensus with LeanHelix", log.Error(err)) return nil, err diff --git a/services/consensusalgo/leanhelixconsensus/test/audit_mode_test.go b/services/consensusalgo/leanhelixconsensus/test/audit_mode_test.go index 7957e15dd..e54cfa62a 100644 --- a/services/consensusalgo/leanhelixconsensus/test/audit_mode_test.go +++ b/services/consensusalgo/leanhelixconsensus/test/audit_mode_test.go @@ -42,10 +42,10 @@ func TestHandleBlockConsensus_ExecutesBlocksYoungerThanThreshold_AndModeIsVerify Return(&services.RequestNewResultsBlockOutput{}, nil).Times(1) h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: block, - PrevCommittedBlockPair: prevBlock, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: block, + PrevBlockPair: prevBlock, }) _, err := h.consensusContext.Verify() @@ -62,10 +62,10 @@ func TestHandleBlockConsensus_DoesNotExecuteBlocksOlderThanThreshold_AndModeIsVe h.consensusContext.Never("RequestNewResultsBlock", mock.Any, mock.Any) h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: block, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: block, + PrevBlockPair: nil, }) _, err := h.consensusContext.Verify() diff --git a/services/consensusalgo/leanhelixconsensus/test/does_not_propose_new_blocks_while_syncing_test.go b/services/consensusalgo/leanhelixconsensus/test/does_not_propose_new_blocks_while_syncing_test.go index 3747d7ef0..3be72a8d4 100644 --- a/services/consensusalgo/leanhelixconsensus/test/does_not_propose_new_blocks_while_syncing_test.go +++ b/services/consensusalgo/leanhelixconsensus/test/does_not_propose_new_blocks_while_syncing_test.go @@ -40,10 +40,10 @@ func TestService_DoesNotProposeNewBlocksWhileSyncingBlocksSequentially(t *testin block := builders.BlockPair().WithHeight(currentHeight).WithEmptyLeanHelixBlockProof().Build() _, _ = h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: block, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: block, + PrevBlockPair: nil, }) require.NoError(t, test.EventuallyVerify(test.EVENTUALLY_ACCEPTANCE_TIMEOUT, h.consensusContext), "expected ordering committee to be requested to determine next leader") diff --git a/services/consensusalgo/leanhelixconsensus/test/harness.go b/services/consensusalgo/leanhelixconsensus/test/harness.go index c4facd740..7e1697c2b 100644 --- a/services/consensusalgo/leanhelixconsensus/test/harness.go +++ b/services/consensusalgo/leanhelixconsensus/test/harness.go @@ -146,7 +146,7 @@ func (h *singleLhcNodeHarness) beFirstInCommittee() { func (h *singleLhcNodeHarness) expectConsensusContextRequestOrderingCommittee(leaderNodeIndex int) { h.consensusContext.When("RequestOrderingCommittee", mock.Any, mock.Any).Return(&services.RequestCommitteeOutput{ NodeAddresses: h.getCommitteeWithNodeIndexAsLeader(leaderNodeIndex), - Weights: h.getCommitteeWeights(), + Weights: h.getCommitteeWeights(), }, nil).Times(1) } @@ -184,10 +184,10 @@ func (h *singleLhcNodeHarness) handleBlockSync(ctx context.Context, blockHeight blockPair := builders.BlockPair().WithHeight(blockHeight).WithEmptyLeanHelixBlockProof().Build() _, err := h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: blockPair, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: blockPair, + PrevBlockPair: nil, }) require.NoError(h.t, err, "expected HandleBlockConsensus to succeed") require.NoError(h.t, test.EventuallyVerify(test.EVENTUALLY_ACCEPTANCE_TIMEOUT, h.consensusContext)) diff --git a/services/consensusalgo/leanhelixconsensus/test/memory_leak_on_sync_test.go b/services/consensusalgo/leanhelixconsensus/test/memory_leak_on_sync_test.go index 94b5c4373..71e8a2415 100644 --- a/services/consensusalgo/leanhelixconsensus/test/memory_leak_on_sync_test.go +++ b/services/consensusalgo/leanhelixconsensus/test/memory_leak_on_sync_test.go @@ -37,10 +37,10 @@ func TestService_MemoryLeakOnBlockSync(t *testing.T) { b5 := builders.BlockPair().WithHeight(5).WithEmptyLeanHelixBlockProof().Build() h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: b5, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: b5, + PrevBlockPair: nil, }) require.NoError(t, test.EventuallyVerify(test.EVENTUALLY_ACCEPTANCE_TIMEOUT, h.consensusContext)) diff --git a/services/consensusalgo/leanhelixconsensus/test/service_init_test.go b/services/consensusalgo/leanhelixconsensus/test/service_init_test.go index 5ca8c21b8..71f722f94 100644 --- a/services/consensusalgo/leanhelixconsensus/test/service_init_test.go +++ b/services/consensusalgo/leanhelixconsensus/test/service_init_test.go @@ -36,10 +36,10 @@ func TestService_StartsActivityOnlyAfterHandleBlockConsensus(t *testing.T) { h.dontBeFirstInCommitee() _, _ = h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: nil, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: nil, + PrevBlockPair: nil, }) require.NoError(t, test.EventuallyVerify(test.EVENTUALLY_ACCEPTANCE_TIMEOUT, h.consensusContext)) @@ -56,10 +56,10 @@ func TestService_LeaderProposesBlock(t *testing.T) { h.expectGossipSendLeanHelixMessage() _, _ = h.consensus.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: nil, - PrevCommittedBlockPair: nil, + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_UPDATE_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: nil, + PrevBlockPair: nil, }) require.NoError(t, test.EventuallyVerify(test.EVENTUALLY_ACCEPTANCE_TIMEOUT, h.consensusContext, h.gossip)) diff --git a/services/consensuscontext/committee.go b/services/consensuscontext/committee.go index 1571a5ebd..eace0012c 100644 --- a/services/consensuscontext/committee.go +++ b/services/consensuscontext/committee.go @@ -13,6 +13,7 @@ import ( "github.com/orbs-network/orbs-spec/types/go/services" "github.com/pkg/errors" "strings" + "time" ) func (s *service) RequestOrderingCommittee(ctx context.Context, input *services.RequestCommitteeInput) (*services.RequestCommitteeOutput, error) { @@ -21,7 +22,7 @@ func (s *service) RequestOrderingCommittee(ctx context.Context, input *services. func (s *service) RequestValidationCommittee(ctx context.Context, input *services.RequestCommitteeInput) (*services.RequestCommitteeOutput, error) { // both committee and weights needs same block height and prevRefTime, and refTime might be adjusted to genesis if blockHeight is 1 - adjustedPrevBlockReferenceTime, err := s.prevReferenceOrGenesis(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) + adjustedPrevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) if err != nil { return nil, errors.Wrap(err, "GetOrderedCommittee") } @@ -44,7 +45,7 @@ func (s *service) RequestValidationCommittee(ctx context.Context, input *service s.metrics.committeeSize.Update(int64(len(committee))) committeeStringArray := make([]string, len(committee)) for j, nodeAddress := range committee { - committeeStringArray[j] = fmt.Sprintf("{\"Address\": \"%v\", \"Weight\": %d}", nodeAddress, orderedWeights[j]) // %v is because NodeAddress has .String() + committeeStringArray[j] = fmt.Sprintf("{\"Address\": \"%v\", \"Weight\": %d}", nodeAddress, orderedWeights[j]) // %v is because NodeAddress has .String() } s.metrics.committeeMembers.Update("[" + strings.Join(committeeStringArray, ", ") + "]") s.metrics.committeeRefTime.Update(int64(input.PrevBlockReferenceTime)) @@ -80,14 +81,13 @@ func orderCommitteeWeights(orderedCommittee []primitives.NodeAddress, committeeM return orderedWeights, nil } - func (s *service) RequestBlockProofOrderingCommittee(ctx context.Context, input *services.RequestBlockProofCommitteeInput) (*services.RequestBlockProofCommitteeOutput, error) { return s.RequestBlockProofValidationCommittee(ctx, input) } func (s *service) RequestBlockProofValidationCommittee(ctx context.Context, input *services.RequestBlockProofCommitteeInput) (*services.RequestBlockProofCommitteeOutput, error) { // refTime might be adjusted to genesis if block height is 1 - adjustedPrevBlockReferenceTime, err := s.prevReferenceOrGenesis(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) + adjustedPrevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) if err != nil { return nil, errors.Wrap(err, "GetOrderedCommittee") } @@ -102,3 +102,19 @@ func (s *service) RequestBlockProofValidationCommittee(ctx context.Context, inpu } return res, nil } + +func (s *service) ValidateBlockCommittee(ctx context.Context, input *services.ValidateBlockCommitteeInput) (*services.ValidateBlockCommitteeOutput, error) { + // refTime might be adjusted to genesis if block height is 1 + adjustedPrevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.BlockHeight, input.PrevBlockReferenceTime) + if err != nil { + return nil, errors.Wrap(err, "ValidateBlockCommittee") + } + + now := time.Duration(time.Now().Unix()) * time.Second + if (time.Duration(adjustedPrevBlockReferenceTime)*time.Second)+s.config.ManagementConsensusGraceTimeout() < now { // prevRefTime-committee is too old + return nil, errors.New("ValidateBlockCommittee: block committee (:=prevBlock.RefTime) is outdated") + } + + return &services.ValidateBlockCommitteeOutput{}, nil + +} diff --git a/services/consensuscontext/create_results_block.go b/services/consensuscontext/create_results_block.go index 0346ce676..4ebd505f5 100644 --- a/services/consensuscontext/create_results_block.go +++ b/services/consensuscontext/create_results_block.go @@ -21,7 +21,7 @@ func (s *service) createResultsBlock(ctx context.Context, input *services.Reques txBlockHeader := input.TransactionsBlock.Header - prevBlockReferenceTime, err := s.prevReferenceOrGenesis(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) + prevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) if err != nil { return nil, errors.Wrap(err, "RequestNewResultsBlock") } diff --git a/services/consensuscontext/create_transactions_block.go b/services/consensuscontext/create_transactions_block.go index 24aa35ade..8f0652b9f 100644 --- a/services/consensuscontext/create_transactions_block.go +++ b/services/consensuscontext/create_transactions_block.go @@ -22,7 +22,7 @@ func (s *service) createTransactionsBlock(ctx context.Context, input *services.R start := time.Now() defer s.metrics.createTxBlockTime.RecordSince(start) - prevBlockReferenceTime, err := s.prevReferenceOrGenesis(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) // For completeness, can't really fail + prevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) // For completeness, can't really fail if err != nil { return nil, errors.Wrapf(ErrFailedGenesisRefTime, "CreateTransactionsBlock failed genesis time %s", err) } @@ -90,7 +90,7 @@ func (s *service) proposeBlockReferenceTime(ctx context.Context, prevReferenceTi return 0, errors.Errorf("leader reference time %d (before grace adjustment) is not upto date compared to previous block reference time %d", leaderReferenceTime.CurrentReference, prevReferenceTime) } - proposedReferenceTime := leaderReferenceTime.CurrentReference - primitives.TimestampSeconds(s.config.ManagementConsensusGraceTimeout() / time.Second) + proposedReferenceTime := leaderReferenceTime.CurrentReference - primitives.TimestampSeconds(s.config.ManagementConsensusGraceTimeout()/time.Second) if prevReferenceTime > proposedReferenceTime { proposedReferenceTime = prevReferenceTime } diff --git a/services/consensuscontext/helpers.go b/services/consensuscontext/helpers.go index b36ba945a..e4bf68a4a 100644 --- a/services/consensuscontext/helpers.go +++ b/services/consensuscontext/helpers.go @@ -17,7 +17,7 @@ import ( "github.com/pkg/errors" ) -func (s *service) prevReferenceOrGenesis(ctx context.Context, blockHeight primitives.BlockHeight, prevBlockReferenceTime primitives.TimestampSeconds) (primitives.TimestampSeconds, error) { +func (s *service) adjustPrevReference(ctx context.Context, blockHeight primitives.BlockHeight, prevBlockReferenceTime primitives.TimestampSeconds) (primitives.TimestampSeconds, error) { if blockHeight == 1 { // genesis block reference, err := s.management.GetGenesisReference(ctx, &services.GetGenesisReferenceInput{SystemTime: prevBlockReferenceTime}) if err != nil { @@ -28,6 +28,13 @@ func (s *service) prevReferenceOrGenesis(ctx context.Context, blockHeight primit return 0, errors.Errorf("failed genesis time reference (%d) cannot be after current time reference (%d)", reference.GenesisReference, reference.CurrentReference) } prevBlockReferenceTime = reference.GenesisReference + } else if prevBlockReferenceTime == 0 { // prev is not genesis - used for syncing against current committee (PoS) if prev reference is too old + reference, err := s.management.GetCurrentReference(ctx, &services.GetCurrentReferenceInput{}) + if err != nil { + s.logger.Error("management.GetCurrentReference should not return error", log.Error(err)) + return 0, err + } + prevBlockReferenceTime = reference.CurrentReference } return prevBlockReferenceTime, nil } diff --git a/services/consensuscontext/helpers_test.go b/services/consensuscontext/helpers_test.go index 505a6a51a..f2128bb54 100644 --- a/services/consensuscontext/helpers_test.go +++ b/services/consensuscontext/helpers_test.go @@ -34,7 +34,7 @@ func TestFixRefForGenesis_IsGenesisAndCorrectGenesis(t *testing.T) { prevRef := currRef - 10 genesis := currRef - 100 s := newHarnessWithManagement(currRef, genesis) - actualRef, err := s.prevReferenceOrGenesis(ctx, 1, prevRef) + actualRef, err := s.adjustPrevReference(ctx, 1, prevRef) require.NoError(t, err, "should not error, gensis correct") require.Equal(t, genesis, actualRef, "should return the management genesis reference") }) @@ -46,7 +46,7 @@ func TestFixRefForGenesis_IsGenesisAndGenesisInFuture(t *testing.T) { prevRef := currRef - 10 genesis := currRef + 1 s := newHarnessWithManagement(currRef, genesis) - actualRef, err := s.prevReferenceOrGenesis(ctx, 1, prevRef) + actualRef, err := s.adjustPrevReference(ctx, 1, prevRef) require.Error(t, err, "should error, genesis in future") require.Equal(t, primitives.TimestampSeconds(0), actualRef, "should return 0") }) @@ -58,7 +58,7 @@ func TestFixRefForGenesis_NotGenesisAndGenInPast(t *testing.T) { prevRef := currRef - 10 genesis := currRef - 100 s := newHarnessWithManagement(currRef, genesis) - actualRef, err := s.prevReferenceOrGenesis(ctx, 2, prevRef) + actualRef, err := s.adjustPrevReference(ctx, 2, prevRef) require.NoError(t, err, "should not error, block height not genesis") require.Equal(t, prevRef, actualRef, "should return the management genesis reference") }) @@ -70,9 +70,8 @@ func TestFixRefForGenesis_NotGenesisAndGensisInFuture_Ignore(t *testing.T) { prevRef := currRef - 10 genesis := currRef + 1 s := newHarnessWithManagement(currRef, genesis) - actualRef, err := s.prevReferenceOrGenesis(ctx, 2, prevRef) + actualRef, err := s.adjustPrevReference(ctx, 2, prevRef) require.NoError(t, err, "should not error, its not genesis block") require.Equal(t, prevRef, actualRef, "should return the management genesis reference") }) } - diff --git a/services/consensuscontext/validate_results_block.go b/services/consensuscontext/validate_results_block.go index 66bb5222f..65e48b59f 100644 --- a/services/consensuscontext/validate_results_block.go +++ b/services/consensuscontext/validate_results_block.go @@ -218,7 +218,7 @@ func compare(expectedDiffs []*protocol.ContractStateDiff, calculatedDiffs []*pro } func (s *service) ValidateResultsBlock(ctx context.Context, input *services.ValidateResultsBlockInput) (*services.ValidateResultsBlockOutput, error) { - prevBlockReferenceTime, err := s.prevReferenceOrGenesis(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) + prevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) if err != nil { return &services.ValidateResultsBlockOutput{}, errors.Wrapf(ErrFailedGenesisRefTime, "ValidateResultsBlock failed genesis time %s", err) } diff --git a/services/consensuscontext/validate_transactions_block.go b/services/consensuscontext/validate_transactions_block.go index 550c52272..c1f259c34 100644 --- a/services/consensuscontext/validate_transactions_block.go +++ b/services/consensuscontext/validate_transactions_block.go @@ -136,11 +136,11 @@ func validateTxTransactionOrdering(ctx context.Context, cfg config.ConsensusCont } validationInput := &services.ValidateTransactionsForOrderingInput{ - BlockProtocolVersion: transactionBlock.Header.ProtocolVersion(), - CurrentBlockHeight: transactionBlock.Header.BlockHeight(), - CurrentBlockTimestamp: transactionBlock.Header.Timestamp(), - CurrentBlockReferenceTime: transactionBlock.Header.ReferenceTime(), - SignedTransactions: txs, + BlockProtocolVersion: transactionBlock.Header.ProtocolVersion(), + CurrentBlockHeight: transactionBlock.Header.BlockHeight(), + CurrentBlockTimestamp: transactionBlock.Header.Timestamp(), + CurrentBlockReferenceTime: transactionBlock.Header.ReferenceTime(), + SignedTransactions: txs, } _, err := validateForOrderingFunc(ctx, validationInput) if err != nil { @@ -242,7 +242,7 @@ func (s *service) ValidateTransactionsBlock(ctx context.Context, input *services return nil, err } - prevBlockReferenceTime, err := s.prevReferenceOrGenesis(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) // For completeness, can't really fail + prevBlockReferenceTime, err := s.adjustPrevReference(ctx, input.CurrentBlockHeight, input.PrevBlockReferenceTime) // For completeness, can't really fail if err != nil { return nil, errors.Wrapf(ErrFailedGenesisRefTime, "ValidateTransactionsBlock failed genesis time %s", err) } @@ -251,7 +251,7 @@ func (s *service) ValidateTransactionsBlock(ctx context.Context, input *services return nil, err2 } - pvOutput, err3 := s.management.GetProtocolVersion(ctx, &services.GetProtocolVersionInput{Reference:input.TransactionsBlock.Header.ReferenceTime()}) + pvOutput, err3 := s.management.GetProtocolVersion(ctx, &services.GetProtocolVersionInput{Reference: input.TransactionsBlock.Header.ReferenceTime()}) if err3 != nil { s.logger.Error("management.GetProtocolVersion should not return error", log.Error(err)) return nil, err3