Skip to content

Commit

Permalink
Merge pull request #1543 from orbs-network/bugfix/node-sync-committee…
Browse files Browse the repository at this point in the history
…-polling

Bugfix/node sync committee polling
  • Loading branch information
gadcl authored Mar 22, 2020
2 parents b95a82c + 6cb4d34 commit 87c6492
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 41 deletions.
12 changes: 6 additions & 6 deletions config/system_factory_presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ func defaultProductionConfig() mutableNodeConfig {
cfg.SetDuration(MANAGEMENT_UPDATE_INTERVAL, 1*time.Minute)
cfg.SetUint32(MANAGEMENT_MAX_FILE_SIZE, 50 * (1<<20)) // 50 MB

// 2*slow_network_latency + avg_network_latency + 2*execution_time = 450ms
cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 4*time.Second)
// 2*slow_network_latency + avg_network_latency + 2*execution_time \ + empty block time
cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 14*time.Second)
cfg.SetDuration(BENCHMARK_CONSENSUS_RETRY_INTERVAL, 2*time.Second)

cfg.SetUint32(LEAN_HELIX_CONSENSUS_MINIMUM_COMMITTEE_SIZE, 4)
cfg.SetUint32(LEAN_HELIX_CONSENSUS_MAXIMUM_COMMITTEE_SIZE, 22)
cfg.SetBool(LEAN_HELIX_SHOW_DEBUG, false)

// if above round time, we'll have leader changes when no traffic
cfg.SetDuration(TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS, 5*time.Second)
cfg.SetDuration(TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS, 9*time.Second)

cfg.SetUint32(BENCHMARK_CONSENSUS_REQUIRED_QUORUM_PERCENTAGE, 66)

// 1MB blocks, 1KB per tx
cfg.SetUint32(CONSENSUS_CONTEXT_MAXIMUM_TRANSACTIONS_IN_BLOCK, 1000)

// max execution time (time validators allow until they get the executed block)
cfg.SetDuration(CONSENSUS_CONTEXT_SYSTEM_TIMESTAMP_ALLOWED_JITTER, 30*time.Second)
cfg.SetDuration(CONSENSUS_CONTEXT_SYSTEM_TIMESTAMP_ALLOWED_JITTER, 60*time.Second)

// have triggers transactions by default
cfg.SetBool(CONSENSUS_CONTEXT_TRIGGERS_ENABLED, true)
Expand All @@ -55,7 +55,7 @@ func defaultProductionConfig() mutableNodeConfig {
cfg.SetUint32(BLOCK_SYNC_NUM_BLOCKS_IN_BATCH, 100)

// 4*LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, if below TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS we'll constantly have syncs
cfg.SetDuration(BLOCK_SYNC_NO_COMMIT_INTERVAL, 6*time.Second)
cfg.SetDuration(BLOCK_SYNC_NO_COMMIT_INTERVAL, 18*time.Second)

// makes sync slower, 4*slow_network_latency
cfg.SetDuration(BLOCK_SYNC_COLLECT_RESPONSE_TIMEOUT, 1*time.Second)
Expand Down Expand Up @@ -209,7 +209,7 @@ func ForAcceptanceTestNetwork(
emptyBlockTime = 50 * time.Millisecond
}

cfg.SetDuration(MANAGEMENT_UPDATE_INTERVAL, 3*time.Millisecond)
cfg.SetDuration(MANAGEMENT_UPDATE_INTERVAL, 2*time.Millisecond)
cfg.SetDuration(BENCHMARK_CONSENSUS_RETRY_INTERVAL, 50*time.Millisecond)
cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 200*time.Millisecond)
cfg.SetBool(LEAN_HELIX_SHOW_DEBUG, true)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/olekukonko/tablewriter v0.0.2 // indirect
github.com/orbs-network/go-mock v1.1.0
github.com/orbs-network/govnr v0.2.0
github.com/orbs-network/lean-helix-go v0.2.6
github.com/orbs-network/lean-helix-go v0.2.7
github.com/orbs-network/membuffers v0.4.0
github.com/orbs-network/orbs-client-sdk-go v0.15.0
github.com/orbs-network/orbs-contract-sdk v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ github.com/orbs-network/lean-helix-go v0.2.4 h1:n3e3PBM86ylQiiJUzOEFf00k6BWRWKCf
github.com/orbs-network/lean-helix-go v0.2.4/go.mod h1:9E/1sZEMZvNLHrP+nif36bio2zKbCkueji4R9e7vJnI=
github.com/orbs-network/lean-helix-go v0.2.6 h1:9b7eGChry3Z18xtlvYHO0wTaj5HBOEwLTVYr61Xpa/Q=
github.com/orbs-network/lean-helix-go v0.2.6/go.mod h1:9E/1sZEMZvNLHrP+nif36bio2zKbCkueji4R9e7vJnI=
github.com/orbs-network/lean-helix-go v0.2.7 h1:d7k67YUIMqXihIl5x/S9p7VIpBGpBzI1D/xR1x2Y/Ro=
github.com/orbs-network/lean-helix-go v0.2.7/go.mod h1:9E/1sZEMZvNLHrP+nif36bio2zKbCkueji4R9e7vJnI=
github.com/orbs-network/membuffers v0.3.2/go.mod h1:M5ABv0m0XBGoJbX+7UKVY02hLF4XhS2SlZVEVABMc6M=
github.com/orbs-network/membuffers v0.3.6 h1:SEfVtDhzi8YHR4qY8ouu8G57sKGsT4vfgdBC5bjG8TE=
github.com/orbs-network/membuffers v0.3.6/go.mod h1:M5ABv0m0XBGoJbX+7UKVY02hLF4XhS2SlZVEVABMc6M=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2019 the orbs-network-go authors
// This file is part of the orbs-network-go library in the Orbs project.
//
// This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree.
// The above notice should be included in all copies or substantial portions of the software.

package test

import (
"context"
"errors"
"fmt"
"github.com/orbs-network/go-mock"
"github.com/orbs-network/orbs-network-go/config"
"github.com/orbs-network/orbs-network-go/instrumentation/metric"
"github.com/orbs-network/orbs-network-go/services/consensuscontext"
"github.com/orbs-network/orbs-network-go/test/builders"
"github.com/orbs-network/orbs-network-go/test/with"
"github.com/orbs-network/orbs-spec/types/go/primitives"
"github.com/orbs-network/orbs-spec/types/go/protocol"
"github.com/orbs-network/orbs-spec/types/go/services"
"github.com/orbs-network/orbs-spec/types/go/services/gossiptopics"
"github.com/orbs-network/orbs-spec/types/go/services/handlers"
"testing"
"time"
)


// Node Sync assumes consensus verifies block without blocking
// Start syncing - in parallel to consensus service progressing
// During HandleBlockConsensus and before block proof verification, commit blocks from consensus:
// Example: during block execution check (audit mode) receive "numOfStateRevisionsToRetain" commits from consensus
// Calling old state for committee fails - too far back (out of stateStorage cache reach)
// Recover from "Old State" query (consensusContext does not poll forever)
func TestSyncPetitioner_ConsensusVerify_NonBlocking(t *testing.T) {
with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) {
harness := newBlockStorageHarness(parent).
withSyncBroadcast(1).
withSyncNoCommitTimeout(10 * time.Millisecond).
withSyncCollectResponsesTimeout(10 * time.Millisecond).
withSyncCollectChunksTimeout(50 * time.Millisecond).
allowingErrorsMatching("FORK!! block already in storage, timestamp mismatch")


blocks := []*protocol.BlockPairContainer{
builders.BlockPair().WithHeight(primitives.BlockHeight(1)).WithBlockCreated(time.Now()).Build(),
builders.BlockPair().WithHeight(primitives.BlockHeight(2)).WithBlockCreated(time.Now()).Build(),
builders.BlockPair().WithHeight(primitives.BlockHeight(3)).WithBlockCreated(time.Now()).Build(),
builders.BlockPair().WithHeight(primitives.BlockHeight(4)).WithBlockCreated(time.Now()).Build(),
}

numOfStateRevisionsToRetain := 2
virtualMachine := &services.MockVirtualMachine{}
consensusContext := consensuscontext.NewConsensusContext(harness.txPool, virtualMachine, harness.stateStorage, config.ForConsensusContextTests(false), harness.Logger, metric.NewRegistry())
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
committedBlockHeights := make(chan primitives.BlockHeight, 10)
done := make(chan struct{})
simulatedCommitsTarget := numOfStateRevisionsToRetain + 1

harness.gossip.When("BroadcastBlockAvailabilityRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockAvailabilityRequestInput) (*gossiptopics.EmptyOutput, error) {
respondToBroadcastAvailabilityRequest(ctx, harness, input, 4, 1)
return nil, nil
})

harness.gossip.When("SendBlockSyncRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockSyncRequestInput) (*gossiptopics.EmptyOutput, error) {
response := builders.BlockSyncResponseInput().
WithFirstBlockHeight(input.Message.SignedChunkRange.FirstBlockHeight()).
WithLastBlockHeight(input.Message.SignedChunkRange.LastBlockHeight()).
WithLastCommittedBlockHeight(primitives.BlockHeight(4)).
WithSenderNodeAddress(input.RecipientNodeAddress).Build()

go func() {
harness.blockStorage.HandleBlockSyncResponse(ctx, response)
}()
return nil, nil
})

harness.stateStorage.When("GetLastCommittedBlockInfo", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.GetLastCommittedBlockInfoInput) (*services.GetLastCommittedBlockInfoOutput, error) {
output := harness.getLastBlockHeight(ctx, t)
return &services.GetLastCommittedBlockInfoOutput{
LastCommittedBlockHeight: output.LastCommittedBlockHeight,
}, nil
})

virtualMachine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) {
output, _ := harness.stateStorage.GetLastCommittedBlockInfo(ctx, &services.GetLastCommittedBlockInfoInput{})
currentHeight := output.LastCommittedBlockHeight
if currentHeight >= input.BlockHeight + primitives.BlockHeight(numOfStateRevisionsToRetain) {
return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, numOfStateRevisionsToRetain))
}
return &services.CallSystemContractOutput{
OutputArgumentArray: &protocol.ArgumentArray{},
CallResult: protocol.EXECUTION_RESULT_SUCCESS,
}, nil
})

harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) {
if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE {
simulateConsensusCommits(ctx, harness, blocks, committedBlockHeights, simulatedCommitsTarget)
simulateVerifyBlockConsensus(ctx, t, consensusContext, input.BlockPair.TransactionsBlock.Header.BlockHeight(), done)
}
return nil, nil
})

harness.start(ctx)

select {
case <-done:
// test passed
case <-timeoutCtx.Done():
t.Fatalf("timed out waiting for sync flow to recover")
}
})
}


func simulateConsensusCommits(ctx context.Context, harness *harness, blocks []*protocol.BlockPairContainer, committedBlockHeights chan primitives.BlockHeight, target int) {
for i := 0; i < target; i++ {
_, err := harness.commitBlock(ctx, blocks[i])
if err == nil {
committedBlockHeights <- blocks[i].ResultsBlock.Header.BlockHeight()
}
}
}

func simulateVerifyBlockConsensus(ctx context.Context, tb testing.TB, consensusContext services.ConsensusContext, currentBlockHeight primitives.BlockHeight, done chan struct{}) {
go func() {
consensusContext.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{
CurrentBlockHeight: currentBlockHeight,
RandomSeed: 0,
MaxCommitteeSize: 4,
})
done <- struct{}{}
}()
}
35 changes: 2 additions & 33 deletions services/consensuscontext/call_contract_ordered_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"time"
)

const CALL_COMMITTEE_CONTRACT_INTERVAL = 200 * time.Millisecond

func (s *service) getOrderedCommittee(ctx context.Context, currentBlockHeight primitives.BlockHeight) ([]primitives.NodeAddress, error) {
logger := s.logger.WithTags(trace.LogFieldFrom(ctx))

// current block is used as seed and needs to be for the block being calculated Now.
logger.Info("system-call GetOrderedCommittee", logfields.BlockHeight(currentBlockHeight), log.Stringable("interval-between-attempts", CALL_COMMITTEE_CONTRACT_INTERVAL))
orderedCommittee, err := s.callGetOrderedCommitteeSystemContractUntilSuccess(ctx, currentBlockHeight)
logger.Info("system-call GetOrderedCommittee", logfields.BlockHeight(currentBlockHeight))
orderedCommittee, err := s.callGetOrderedCommitteeSystemContract(ctx, currentBlockHeight)
if err != nil {
return nil, err
}
Expand All @@ -35,35 +33,6 @@ func (s *service) getOrderedCommittee(ctx context.Context, currentBlockHeight pr
return orderedCommittee, nil
}

func (s *service) callGetOrderedCommitteeSystemContractUntilSuccess(ctx context.Context, blockHeight primitives.BlockHeight) ([]primitives.NodeAddress, error) {
attempts := 1
for {
// exit on system shutdown
if ctx.Err() != nil {
return nil, errors.New("context terminated during callGetOrderedCommitteeSystemContractUntilSuccess")
}

orderedCommittee, err := s.callGetOrderedCommitteeSystemContract(ctx, blockHeight)
if err == nil {
return orderedCommittee, nil
}

// log every 500 failures
if attempts%500 == 1 {
if ctx.Err() == nil { // this may fail rightfully on graceful shutdown (ctx.Done), we don't want to report an error in this case
s.logger.Info("cannot get ordered committee from system contract", log.Error(err), logfields.BlockHeight(blockHeight), log.Int("attempts", attempts))
}
}

// sleep or wait for ctx done, whichever comes first
sleepOrShutdown, cancel := context.WithTimeout(ctx, CALL_COMMITTEE_CONTRACT_INTERVAL)
<-sleepOrShutdown.Done()
cancel()

attempts++
}
}

func (s *service) callGetOrderedCommitteeSystemContract(ctx context.Context, blockHeight primitives.BlockHeight) ([]primitives.NodeAddress, error) {
systemContractName := primitives.ContractName(committee_systemcontract.CONTRACT_NAME)
systemMethodName := primitives.MethodName(committee_systemcontract.METHOD_GET_ORDERED_COMMITTEE)
Expand Down
5 changes: 4 additions & 1 deletion test/acceptance/committee_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
func TestLeanHelix_CommitTransactionToElected(t *testing.T) {
NewHarness().
WithNumNodes(6).
WithLogFilters(log.DiscardAll()).
WithConsensusAlgos(consensus.CONSENSUS_ALGO_TYPE_LEAN_HELIX).
Start(t, func(t testing.TB, ctx context.Context, network *Network) {
contract := callcontract.NewContractClient(network)
Expand Down Expand Up @@ -61,6 +62,7 @@ func TestLeanHelix_CommitTransactionToElected(t *testing.T) {
func TestLeanHelix_MultipleReElections(t *testing.T) {
NewHarness().
WithNumNodes(6).
WithLogFilters(log.DiscardAll()).
WithConsensusAlgos(consensus.CONSENSUS_ALGO_TYPE_LEAN_HELIX).
Start(t, func(t testing.TB, ctx context.Context, network *Network) {
contract := callcontract.NewContractClient(network)
Expand Down Expand Up @@ -134,6 +136,7 @@ func TestLeanHelix_AllNodesLoseElectionButReturn(t *testing.T) {
func TestLeanHelix_GrowingElectedAmount(t *testing.T) {
NewHarness().
WithNumNodes(7).
WithLogFilters(log.DiscardAll()).
WithConsensusAlgos(consensus.CONSENSUS_ALGO_TYPE_LEAN_HELIX).
Start(t, func(t testing.TB, ctx context.Context, network *Network) {
contract := callcontract.NewContractClient(network)
Expand Down Expand Up @@ -176,7 +179,7 @@ func waitUntilCommitteeApplies(t testing.TB, ctx context.Context, network *Netwo
lastBlock, err := network.BlockPersistence(0).GetLastBlockHeight()
require.NoError(t, err)
network.committeeProvider.AddCommittee(uint64(lastBlock+3), committee)
network.WaitForBlock(ctx, lastBlock+4)
network.WaitForBlock(ctx, lastBlock+6)
}

func verifyTxSignersAreFromGroup(t testing.TB, ctx context.Context, api callcontract.CallContractAPI, txHash primitives.Sha256, nodeIndex int, allowedIndexes []int) {
Expand Down

0 comments on commit 87c6492

Please sign in to comment.