From c6450c519f561c99d7a21286e79a31b96e668ffc Mon Sep 17 00:00:00 2001 From: gadcl Date: Thu, 19 Mar 2020 09:59:59 +0200 Subject: [PATCH 1/7] test depicting the issue --- ...committee_does_not_block_node_sync_test.go | 241 ++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go diff --git a/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go b/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go new file mode 100644 index 000000000..1c61ba5dd --- /dev/null +++ b/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go @@ -0,0 +1,241 @@ +// Copyright 2019 the orbs-network-go authors +// This file is part of the orbs-network-go library in the Orbs project. +// +// This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. +// The above notice should be included in all copies or substantial portions of the software. + +package test + +import ( + "context" + "errors" + "fmt" + "github.com/orbs-network/go-mock" + "github.com/orbs-network/orbs-network-go/test/builders" + "github.com/orbs-network/orbs-spec/types/go/primitives" + "github.com/orbs-network/orbs-spec/types/go/protocol" + + //"github.com/orbs-network/orbs-spec/types/go/protocol" + + //"github.com/orbs-network/orbs-network-go/test/builders" + "github.com/orbs-network/orbs-network-go/test/with" + //"github.com/orbs-network/orbs-spec/types/go/protocol" + "github.com/orbs-network/orbs-spec/types/go/services" + "github.com/orbs-network/orbs-spec/types/go/services/handlers" + "testing" +) + +func (h *harness) expectStateStorageNotRead() { + h.stateStorage.When("ReadKeys", mock.Any, mock.Any).Return(&services.ReadKeysOutput{ + StateRecords: []*protocol.StateRecord{ + (&protocol.StateRecordBuilder{ + Key: []byte{0x01}, + Value: []byte{0x02}, + }).Build(), + }, + }, nil).Times(0) +} + +// audit mode execute +// sync on 2 blocks +// stuck on execution +// receive 10 commit blocks from consensus +// release execution +// fail on validate consensus - retrieve committee from old state +// robust - does not loop forever on request committee +// Recover FromOldStateQuery in consensusContext + +const STATE_STORAGE_HISTORY_SNAPSHOT_NUM = 5 + +func TestRequestCommittee_NonBlocking_NodeSync(t *testing.T) { + with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { + harness := newHarness(parent.Logger, true) + done := make(chan struct{}) + blockStorage := &services.MockBlockStorage{} + consensusAlgo := &services.MockConsensusAlgoLeanHelix{} + blockStorageHeight := 0 + + harness.stateStorage.When("GetLastCommittedBlockInfo", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.GetLastCommittedBlockInfoInput) (*services.GetLastCommittedBlockInfoOutput, error) { + output, _ := blockStorage.GetLastCommittedBlockHeight(ctx, &services.GetLastCommittedBlockHeightInput{}) + return &services.GetLastCommittedBlockInfoOutput{ + LastCommittedBlockHeight: output.LastCommittedBlockHeight, + }, nil + }) + + harness.virtualMachine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { + output, _ := harness.stateStorage.GetLastCommittedBlockInfo(ctx, &services.GetLastCommittedBlockInfoInput{}) + fmt.Println(input.BlockHeight) + currentHeight := output.LastCommittedBlockHeight + if currentHeight >= input.BlockHeight + STATE_STORAGE_HISTORY_SNAPSHOT_NUM { + return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, STATE_STORAGE_HISTORY_SNAPSHOT_NUM)) + } + return &services.CallSystemContractOutput{ + OutputArgumentArray: &protocol.ArgumentArray{}, + CallResult: protocol.EXECUTION_RESULT_SUCCESS, + }, nil + }) + + consensusAlgo.MockConsensusBlocksHandler.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { + go func() { + harness.service.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ + CurrentBlockHeight: 1, + RandomSeed: 0, + MaxCommitteeSize: 22, + }) + done <- struct{}{} + } () + return nil, nil + }) + + blockStorage.Mock.When("GetLastCommittedBlockHeight", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.GetLastCommittedBlockHeightInput) (*services.GetLastCommittedBlockHeightOutput, error) { + return &services.GetLastCommittedBlockHeightOutput{ + LastCommittedBlockHeight: primitives.BlockHeight(blockStorageHeight), + }, nil + }) + block := builders.BlockPair().WithHeight(1).WithEmptyLeanHelixBlockProof().Build() + prevBlock := builders.BlockPair().WithHeight(0).WithEmptyLeanHelixBlockProof().Build() + blockStorage.When("ValidateBlockForCommit", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.ValidateBlockForCommitInput) (*services.ValidateBlockForCommitOutput, error) { + consensusAlgo.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ + Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, + BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, + BlockPair: block, + PrevCommittedBlockPair: prevBlock, + }) + + return nil, nil + }) + + // start of flow "after receiving blocks chunk from peer" + blockStorage.ValidateBlockForCommit(ctx, &services.ValidateBlockForCommitInput{BlockPair: block}) + + select { + case <-done: + // test passed + case <-ctx.Done(): + t.Fatalf("timed out waiting for sync flow to complete") + } + }) +} + + //_, err := s.storage.ValidateBlockForCommit(ctx, &services.ValidateBlockForCommitInput{BlockPair: blockPair}) + + //.MockBlockSyncHandler.HandleBlockSyncResponse().ValidateBlockForCommit() + + //consensusAlgo.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { + // + // + //}) + // + //txPool := &services.MockTransactionPool{} + //machine := &services.MockVirtualMachine{} +// harness := newHarness() +// with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { +// +// }) +// +// +//harness := test2.newSingleLhcNodeHarness(). +// withSyncNoCommitTimeout(10 * time.Millisecond). +// withSyncCollectResponsesTimeout(10 * time.Millisecond). +// withSyncCollectChunksTimeout(50 * time.Millisecond) + + + +//} + +// +////newSingleLhcNodeHarness +//func TestSyncPetitioner_Stress_CommitsDuringSync(t *testing.T) { +// with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { +// harness := test.newBlockStorageHarness(parent). +// withSyncNoCommitTimeout(10 * time.Millisecond). +// withSyncCollectResponsesTimeout(10 * time.Millisecond). +// withSyncCollectChunksTimeout(50 * time.Millisecond) +// +// const NUM_BLOCKS = 50 +// done := make(chan struct{}) +// +// harness.gossip.When("BroadcastBlockAvailabilityRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockAvailabilityRequestInput) (*gossiptopics.EmptyOutput, error) { +// test.respondToBroadcastAvailabilityRequest(ctx, harness, input, NUM_BLOCKS, 7) +// return nil, nil +// }) +// +// harness.gossip.When("SendBlockSyncRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockSyncRequestInput) (*gossiptopics.EmptyOutput, error) { +// if input.Message.SignedChunkRange.LastBlockHeight() >= NUM_BLOCKS { +// done <- struct{}{} +// } +// respondToBlockSyncRequestWithConcurrentCommit(t, ctx, harness, input, NUM_BLOCKS) +// return nil, nil +// }) +// +// machine := &services.MockVirtualMachine{} +// machine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { +// fmt.Println("unsupported block height") +// return &services.CallSystemContractOutput{ +// }, errors.New("unsupported block height") +// }) +// consensusContextService := consensuscontext.NewConsensusContext(harness.txPool, machine, harness.stateStorage, config.ForConsensusContextTests(false), harness.Logger, metric.NewRegistry()) +// +// harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { +// +// if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE && input.PrevCommittedBlockPair != nil { +// fmt.Println("ADFDSFSDF") +// currHeight := input.BlockPair.TransactionsBlock.Header.BlockHeight() +// prevHeight := input.PrevCommittedBlockPair.TransactionsBlock.Header.BlockHeight() +// // audit mode + long execution -> +// // consensus algo concurrently commits multiple blocks > state storage cache threshold +// // no support for +// +// consensusContextService.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ +// CurrentBlockHeight: currHeight, +// RandomSeed: 0, +// MaxCommitteeSize: 22, +// }) +// fmt.Println("123123") +// if currHeight != prevHeight+1 { +// done <- struct{}{} +// require.Failf(t, "HandleBlockConsensus given invalid args", "called with height %d and prev height %d", currHeight, prevHeight) +// } +// } +// return nil, nil +// }) +// +// harness.start(ctx) +// +// select { +// case <-done: +// // test passed +// case <-ctx.Done(): +// t.Fatalf("timed out waiting for sync flow to complete") +// } +// }) +//} +// +//// this would attempt to commit the same blocks at the same time from the sync flow and directly (simulating blocks arriving from consensus) +//func respondToBlockSyncRequestWithConcurrentCommit(t testing.TB, ctx context.Context, harness *test.harness, input *gossiptopics.BlockSyncRequestInput, availableBlocks int) { +// response := builders.BlockSyncResponseInput(). +// WithFirstBlockHeight(input.Message.SignedChunkRange.FirstBlockHeight()). +// WithLastBlockHeight(input.Message.SignedChunkRange.LastBlockHeight()). +// WithLastCommittedBlockHeight(primitives.BlockHeight(availableBlocks)). +// WithSenderNodeAddress(input.RecipientNodeAddress).Build() +// +// go func() { +// time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) +// _, err := harness.blockStorage.HandleBlockSyncResponse(ctx, response) +// require.NoError(t, err, "failed handling block sync response") +// +// }() +// +// go func() { +// time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) +// _, err := harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ +// BlockPair: response.Message.BlockPairs[0], +// }) +// require.NoError(t, err, "failed committing first block in parallel to sync") +// _, err = harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ +// BlockPair: response.Message.BlockPairs[1], +// }) +// require.NoError(t, err, "failed committing second block in parallel to sync") +// +// }() +//} From 626ecf70f60a4a29557b61350ed16dabeb940503 Mon Sep 17 00:00:00 2001 From: gadcl Date: Thu, 19 Mar 2020 10:08:34 +0200 Subject: [PATCH 2/7] test depicting the issue --- ...committee_does_not_block_node_sync_test.go | 140 +----------------- 1 file changed, 4 insertions(+), 136 deletions(-) diff --git a/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go b/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go index 1c61ba5dd..64f6166ab 100644 --- a/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go +++ b/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go @@ -25,16 +25,6 @@ import ( "testing" ) -func (h *harness) expectStateStorageNotRead() { - h.stateStorage.When("ReadKeys", mock.Any, mock.Any).Return(&services.ReadKeysOutput{ - StateRecords: []*protocol.StateRecord{ - (&protocol.StateRecordBuilder{ - Key: []byte{0x01}, - Value: []byte{0x02}, - }).Build(), - }, - }, nil).Times(0) -} // audit mode execute // sync on 2 blocks @@ -45,7 +35,7 @@ func (h *harness) expectStateStorageNotRead() { // robust - does not loop forever on request committee // Recover FromOldStateQuery in consensusContext -const STATE_STORAGE_HISTORY_SNAPSHOT_NUM = 5 +const stateStorageHistorySnapshotNum = 5 func TestRequestCommittee_NonBlocking_NodeSync(t *testing.T) { with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { @@ -64,10 +54,9 @@ func TestRequestCommittee_NonBlocking_NodeSync(t *testing.T) { harness.virtualMachine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { output, _ := harness.stateStorage.GetLastCommittedBlockInfo(ctx, &services.GetLastCommittedBlockInfoInput{}) - fmt.Println(input.BlockHeight) currentHeight := output.LastCommittedBlockHeight - if currentHeight >= input.BlockHeight + STATE_STORAGE_HISTORY_SNAPSHOT_NUM { - return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, STATE_STORAGE_HISTORY_SNAPSHOT_NUM)) + if currentHeight >= input.BlockHeight + stateStorageHistorySnapshotNum { + return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, stateStorageHistorySnapshotNum)) } return &services.CallSystemContractOutput{ OutputArgumentArray: &protocol.ArgumentArray{}, @@ -76,6 +65,7 @@ func TestRequestCommittee_NonBlocking_NodeSync(t *testing.T) { }) consensusAlgo.MockConsensusBlocksHandler.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { + blockStorageHeight = 10 go func() { harness.service.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ CurrentBlockHeight: 1, @@ -117,125 +107,3 @@ func TestRequestCommittee_NonBlocking_NodeSync(t *testing.T) { }) } - //_, err := s.storage.ValidateBlockForCommit(ctx, &services.ValidateBlockForCommitInput{BlockPair: blockPair}) - - //.MockBlockSyncHandler.HandleBlockSyncResponse().ValidateBlockForCommit() - - //consensusAlgo.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { - // - // - //}) - // - //txPool := &services.MockTransactionPool{} - //machine := &services.MockVirtualMachine{} -// harness := newHarness() -// with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { -// -// }) -// -// -//harness := test2.newSingleLhcNodeHarness(). -// withSyncNoCommitTimeout(10 * time.Millisecond). -// withSyncCollectResponsesTimeout(10 * time.Millisecond). -// withSyncCollectChunksTimeout(50 * time.Millisecond) - - - -//} - -// -////newSingleLhcNodeHarness -//func TestSyncPetitioner_Stress_CommitsDuringSync(t *testing.T) { -// with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { -// harness := test.newBlockStorageHarness(parent). -// withSyncNoCommitTimeout(10 * time.Millisecond). -// withSyncCollectResponsesTimeout(10 * time.Millisecond). -// withSyncCollectChunksTimeout(50 * time.Millisecond) -// -// const NUM_BLOCKS = 50 -// done := make(chan struct{}) -// -// harness.gossip.When("BroadcastBlockAvailabilityRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockAvailabilityRequestInput) (*gossiptopics.EmptyOutput, error) { -// test.respondToBroadcastAvailabilityRequest(ctx, harness, input, NUM_BLOCKS, 7) -// return nil, nil -// }) -// -// harness.gossip.When("SendBlockSyncRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockSyncRequestInput) (*gossiptopics.EmptyOutput, error) { -// if input.Message.SignedChunkRange.LastBlockHeight() >= NUM_BLOCKS { -// done <- struct{}{} -// } -// respondToBlockSyncRequestWithConcurrentCommit(t, ctx, harness, input, NUM_BLOCKS) -// return nil, nil -// }) -// -// machine := &services.MockVirtualMachine{} -// machine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { -// fmt.Println("unsupported block height") -// return &services.CallSystemContractOutput{ -// }, errors.New("unsupported block height") -// }) -// consensusContextService := consensuscontext.NewConsensusContext(harness.txPool, machine, harness.stateStorage, config.ForConsensusContextTests(false), harness.Logger, metric.NewRegistry()) -// -// harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { -// -// if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE && input.PrevCommittedBlockPair != nil { -// fmt.Println("ADFDSFSDF") -// currHeight := input.BlockPair.TransactionsBlock.Header.BlockHeight() -// prevHeight := input.PrevCommittedBlockPair.TransactionsBlock.Header.BlockHeight() -// // audit mode + long execution -> -// // consensus algo concurrently commits multiple blocks > state storage cache threshold -// // no support for -// -// consensusContextService.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ -// CurrentBlockHeight: currHeight, -// RandomSeed: 0, -// MaxCommitteeSize: 22, -// }) -// fmt.Println("123123") -// if currHeight != prevHeight+1 { -// done <- struct{}{} -// require.Failf(t, "HandleBlockConsensus given invalid args", "called with height %d and prev height %d", currHeight, prevHeight) -// } -// } -// return nil, nil -// }) -// -// harness.start(ctx) -// -// select { -// case <-done: -// // test passed -// case <-ctx.Done(): -// t.Fatalf("timed out waiting for sync flow to complete") -// } -// }) -//} -// -//// this would attempt to commit the same blocks at the same time from the sync flow and directly (simulating blocks arriving from consensus) -//func respondToBlockSyncRequestWithConcurrentCommit(t testing.TB, ctx context.Context, harness *test.harness, input *gossiptopics.BlockSyncRequestInput, availableBlocks int) { -// response := builders.BlockSyncResponseInput(). -// WithFirstBlockHeight(input.Message.SignedChunkRange.FirstBlockHeight()). -// WithLastBlockHeight(input.Message.SignedChunkRange.LastBlockHeight()). -// WithLastCommittedBlockHeight(primitives.BlockHeight(availableBlocks)). -// WithSenderNodeAddress(input.RecipientNodeAddress).Build() -// -// go func() { -// time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) -// _, err := harness.blockStorage.HandleBlockSyncResponse(ctx, response) -// require.NoError(t, err, "failed handling block sync response") -// -// }() -// -// go func() { -// time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) -// _, err := harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ -// BlockPair: response.Message.BlockPairs[0], -// }) -// require.NoError(t, err, "failed committing first block in parallel to sync") -// _, err = harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ -// BlockPair: response.Message.BlockPairs[1], -// }) -// require.NoError(t, err, "failed committing second block in parallel to sync") -// -// }() -//} From 03067486aa1110e7ac2d34ba696b3cf15bc9c87a Mon Sep 17 00:00:00 2001 From: gadcl Date: Sun, 22 Mar 2020 09:34:15 +0200 Subject: [PATCH 3/7] bug fix for node sync stuck in endless loop during block consensus verification --- ...sync_consensus_verify_non_blocking_test.go | 97 ++++++++++++++++ ...committee_does_not_block_node_sync_test.go | 109 ------------------ 2 files changed, 97 insertions(+), 109 deletions(-) create mode 100644 services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go delete mode 100644 services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go diff --git a/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go b/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go new file mode 100644 index 000000000..8c4e84447 --- /dev/null +++ b/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go @@ -0,0 +1,97 @@ +// Copyright 2019 the orbs-network-go authors +// This file is part of the orbs-network-go library in the Orbs project. +// +// This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. +// The above notice should be included in all copies or substantial portions of the software. + +package test + +import ( + "context" + "github.com/orbs-network/go-mock" + "github.com/orbs-network/orbs-network-go/test/builders" + "github.com/orbs-network/orbs-network-go/test/with" + "github.com/orbs-network/orbs-spec/types/go/primitives" + "github.com/orbs-network/orbs-spec/types/go/services" + "github.com/orbs-network/orbs-spec/types/go/services/gossiptopics" + "github.com/orbs-network/orbs-spec/types/go/services/handlers" + "github.com/stretchr/testify/require" + "math/rand" + "testing" + "time" +) + +func TestSyncPetitioner_Stress_CommitsDuringSync(t *testing.T) { + with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { + harness := newBlockStorageHarness(parent). + withSyncNoCommitTimeout(10 * time.Millisecond). + withSyncCollectResponsesTimeout(10 * time.Millisecond). + withSyncCollectChunksTimeout(50 * time.Millisecond) + + const NUM_BLOCKS = 50 + done := make(chan struct{}) + + harness.gossip.When("BroadcastBlockAvailabilityRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockAvailabilityRequestInput) (*gossiptopics.EmptyOutput, error) { + respondToBroadcastAvailabilityRequest(ctx, harness, input, NUM_BLOCKS, 7) + return nil, nil + }) + + harness.gossip.When("SendBlockSyncRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockSyncRequestInput) (*gossiptopics.EmptyOutput, error) { + if input.Message.SignedChunkRange.LastBlockHeight() >= NUM_BLOCKS { + done <- struct{}{} + } + respondToBlockSyncRequestWithConcurrentCommit(t, ctx, harness, input, NUM_BLOCKS) + return nil, nil + }) + + harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { + if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE && input.PrevCommittedBlockPair != nil { + currHeight := input.BlockPair.TransactionsBlock.Header.BlockHeight() + prevHeight := input.PrevCommittedBlockPair.TransactionsBlock.Header.BlockHeight() + if currHeight != prevHeight+1 { + done <- struct{}{} + require.Failf(t, "HandleBlockConsensus given invalid args", "called with height %d and prev height %d", currHeight, prevHeight) + } + } + return nil, nil + }) + + harness.start(ctx) + + select { + case <-done: + // test passed + case <-ctx.Done(): + t.Fatalf("timed out waiting for sync flow to complete") + } + }) +} + +// this would attempt to commit the same blocks at the same time from the sync flow and directly (simulating blocks arriving from consensus) +func respondToBlockSyncRequestWithConcurrentCommit(t testing.TB, ctx context.Context, harness *harness, input *gossiptopics.BlockSyncRequestInput, availableBlocks int) { + response := builders.BlockSyncResponseInput(). + WithFirstBlockHeight(input.Message.SignedChunkRange.FirstBlockHeight()). + WithLastBlockHeight(input.Message.SignedChunkRange.LastBlockHeight()). + WithLastCommittedBlockHeight(primitives.BlockHeight(availableBlocks)). + WithSenderNodeAddress(input.RecipientNodeAddress).Build() + + go func() { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + _, err := harness.blockStorage.HandleBlockSyncResponse(ctx, response) + require.NoError(t, err, "failed handling block sync response") + + }() + + go func() { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + _, err := harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ + BlockPair: response.Message.BlockPairs[0], + }) + require.NoError(t, err, "failed committing first block in parallel to sync") + _, err = harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ + BlockPair: response.Message.BlockPairs[1], + }) + require.NoError(t, err, "failed committing second block in parallel to sync") + + }() +} diff --git a/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go b/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go deleted file mode 100644 index 64f6166ab..000000000 --- a/services/consensuscontext/test/request_committee_does_not_block_node_sync_test.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2019 the orbs-network-go authors -// This file is part of the orbs-network-go library in the Orbs project. -// -// This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. -// The above notice should be included in all copies or substantial portions of the software. - -package test - -import ( - "context" - "errors" - "fmt" - "github.com/orbs-network/go-mock" - "github.com/orbs-network/orbs-network-go/test/builders" - "github.com/orbs-network/orbs-spec/types/go/primitives" - "github.com/orbs-network/orbs-spec/types/go/protocol" - - //"github.com/orbs-network/orbs-spec/types/go/protocol" - - //"github.com/orbs-network/orbs-network-go/test/builders" - "github.com/orbs-network/orbs-network-go/test/with" - //"github.com/orbs-network/orbs-spec/types/go/protocol" - "github.com/orbs-network/orbs-spec/types/go/services" - "github.com/orbs-network/orbs-spec/types/go/services/handlers" - "testing" -) - - -// audit mode execute -// sync on 2 blocks -// stuck on execution -// receive 10 commit blocks from consensus -// release execution -// fail on validate consensus - retrieve committee from old state -// robust - does not loop forever on request committee -// Recover FromOldStateQuery in consensusContext - -const stateStorageHistorySnapshotNum = 5 - -func TestRequestCommittee_NonBlocking_NodeSync(t *testing.T) { - with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { - harness := newHarness(parent.Logger, true) - done := make(chan struct{}) - blockStorage := &services.MockBlockStorage{} - consensusAlgo := &services.MockConsensusAlgoLeanHelix{} - blockStorageHeight := 0 - - harness.stateStorage.When("GetLastCommittedBlockInfo", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.GetLastCommittedBlockInfoInput) (*services.GetLastCommittedBlockInfoOutput, error) { - output, _ := blockStorage.GetLastCommittedBlockHeight(ctx, &services.GetLastCommittedBlockHeightInput{}) - return &services.GetLastCommittedBlockInfoOutput{ - LastCommittedBlockHeight: output.LastCommittedBlockHeight, - }, nil - }) - - harness.virtualMachine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { - output, _ := harness.stateStorage.GetLastCommittedBlockInfo(ctx, &services.GetLastCommittedBlockInfoInput{}) - currentHeight := output.LastCommittedBlockHeight - if currentHeight >= input.BlockHeight + stateStorageHistorySnapshotNum { - return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, stateStorageHistorySnapshotNum)) - } - return &services.CallSystemContractOutput{ - OutputArgumentArray: &protocol.ArgumentArray{}, - CallResult: protocol.EXECUTION_RESULT_SUCCESS, - }, nil - }) - - consensusAlgo.MockConsensusBlocksHandler.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { - blockStorageHeight = 10 - go func() { - harness.service.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ - CurrentBlockHeight: 1, - RandomSeed: 0, - MaxCommitteeSize: 22, - }) - done <- struct{}{} - } () - return nil, nil - }) - - blockStorage.Mock.When("GetLastCommittedBlockHeight", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.GetLastCommittedBlockHeightInput) (*services.GetLastCommittedBlockHeightOutput, error) { - return &services.GetLastCommittedBlockHeightOutput{ - LastCommittedBlockHeight: primitives.BlockHeight(blockStorageHeight), - }, nil - }) - block := builders.BlockPair().WithHeight(1).WithEmptyLeanHelixBlockProof().Build() - prevBlock := builders.BlockPair().WithHeight(0).WithEmptyLeanHelixBlockProof().Build() - blockStorage.When("ValidateBlockForCommit", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.ValidateBlockForCommitInput) (*services.ValidateBlockForCommitOutput, error) { - consensusAlgo.HandleBlockConsensus(ctx, &handlers.HandleBlockConsensusInput{ - Mode: handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_ONLY, - BlockType: protocol.BLOCK_TYPE_BLOCK_PAIR, - BlockPair: block, - PrevCommittedBlockPair: prevBlock, - }) - - return nil, nil - }) - - // start of flow "after receiving blocks chunk from peer" - blockStorage.ValidateBlockForCommit(ctx, &services.ValidateBlockForCommitInput{BlockPair: block}) - - select { - case <-done: - // test passed - case <-ctx.Done(): - t.Fatalf("timed out waiting for sync flow to complete") - } - }) -} - From 6a732473baf731f5937692bcc47f70c576a3505b Mon Sep 17 00:00:00 2001 From: gadcl Date: Sun, 22 Mar 2020 11:03:43 +0200 Subject: [PATCH 4/7] updated lean-helix-go reference --- go.mod | 3 +- go.sum | 2 + ...sync_consensus_verify_non_blocking_test.go | 125 ++++++++++++------ .../call_contract_ordered_committee.go | 35 +---- 4 files changed, 88 insertions(+), 77 deletions(-) diff --git a/go.mod b/go.mod index 77ce1a474..ebf2b1fd0 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/ethereum/go-ethereum v1.9.6 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect + github.com/go-stack/stack v1.8.0 // indirect github.com/google/go-cmp v0.3.1 github.com/huin/goupnp v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.1 // indirect @@ -26,7 +27,7 @@ require ( github.com/olekukonko/tablewriter v0.0.2 // indirect github.com/orbs-network/go-mock v1.1.0 github.com/orbs-network/govnr v0.2.0 - github.com/orbs-network/lean-helix-go v0.2.6 + github.com/orbs-network/lean-helix-go v0.2.7 github.com/orbs-network/membuffers v0.4.0 github.com/orbs-network/orbs-client-sdk-go v0.15.0 github.com/orbs-network/orbs-contract-sdk v1.5.0 diff --git a/go.sum b/go.sum index 60b07b694..53e937293 100644 --- a/go.sum +++ b/go.sum @@ -192,6 +192,8 @@ github.com/orbs-network/lean-helix-go v0.2.4 h1:n3e3PBM86ylQiiJUzOEFf00k6BWRWKCf github.com/orbs-network/lean-helix-go v0.2.4/go.mod h1:9E/1sZEMZvNLHrP+nif36bio2zKbCkueji4R9e7vJnI= github.com/orbs-network/lean-helix-go v0.2.6 h1:9b7eGChry3Z18xtlvYHO0wTaj5HBOEwLTVYr61Xpa/Q= github.com/orbs-network/lean-helix-go v0.2.6/go.mod h1:9E/1sZEMZvNLHrP+nif36bio2zKbCkueji4R9e7vJnI= +github.com/orbs-network/lean-helix-go v0.2.7 h1:d7k67YUIMqXihIl5x/S9p7VIpBGpBzI1D/xR1x2Y/Ro= +github.com/orbs-network/lean-helix-go v0.2.7/go.mod h1:9E/1sZEMZvNLHrP+nif36bio2zKbCkueji4R9e7vJnI= github.com/orbs-network/membuffers v0.3.2/go.mod h1:M5ABv0m0XBGoJbX+7UKVY02hLF4XhS2SlZVEVABMc6M= github.com/orbs-network/membuffers v0.3.6 h1:SEfVtDhzi8YHR4qY8ouu8G57sKGsT4vfgdBC5bjG8TE= github.com/orbs-network/membuffers v0.3.6/go.mod h1:M5ABv0m0XBGoJbX+7UKVY02hLF4XhS2SlZVEVABMc6M= diff --git a/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go b/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go index 8c4e84447..abb31a043 100644 --- a/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go +++ b/services/blockstorage/test/block_sync_consensus_verify_non_blocking_test.go @@ -8,50 +8,97 @@ package test import ( "context" + "errors" + "fmt" "github.com/orbs-network/go-mock" + "github.com/orbs-network/orbs-network-go/config" + "github.com/orbs-network/orbs-network-go/instrumentation/metric" + "github.com/orbs-network/orbs-network-go/services/consensuscontext" "github.com/orbs-network/orbs-network-go/test/builders" "github.com/orbs-network/orbs-network-go/test/with" "github.com/orbs-network/orbs-spec/types/go/primitives" + "github.com/orbs-network/orbs-spec/types/go/protocol" "github.com/orbs-network/orbs-spec/types/go/services" "github.com/orbs-network/orbs-spec/types/go/services/gossiptopics" "github.com/orbs-network/orbs-spec/types/go/services/handlers" - "github.com/stretchr/testify/require" - "math/rand" "testing" "time" ) -func TestSyncPetitioner_Stress_CommitsDuringSync(t *testing.T) { + +// Node Sync assumes consensus verifies block without blocking +// Start syncing - in parallel to consensus service progressing +// During HandleBlockConsensus and before block proof verification, commit blocks from consensus: +// Example: during block execution check (audit mode) receive "numOfStateRevisionsToRetain" commits from consensus +// Calling old state for committee fails - too far back (out of stateStorage cache reach) +// Recover from "Old State" query (consensusContext does not poll forever) +func TestSyncPetitioner_ConsensusVerify_NonBlocking(t *testing.T) { with.Concurrency(t, func(ctx context.Context, parent *with.ConcurrencyHarness) { harness := newBlockStorageHarness(parent). + withSyncBroadcast(1). withSyncNoCommitTimeout(10 * time.Millisecond). withSyncCollectResponsesTimeout(10 * time.Millisecond). - withSyncCollectChunksTimeout(50 * time.Millisecond) + withSyncCollectChunksTimeout(50 * time.Millisecond). + allowingErrorsMatching("FORK!! block already in storage, timestamp mismatch") + - const NUM_BLOCKS = 50 + blocks := []*protocol.BlockPairContainer{ + builders.BlockPair().WithHeight(primitives.BlockHeight(1)).WithBlockCreated(time.Now()).Build(), + builders.BlockPair().WithHeight(primitives.BlockHeight(2)).WithBlockCreated(time.Now()).Build(), + builders.BlockPair().WithHeight(primitives.BlockHeight(3)).WithBlockCreated(time.Now()).Build(), + builders.BlockPair().WithHeight(primitives.BlockHeight(4)).WithBlockCreated(time.Now()).Build(), + } + + numOfStateRevisionsToRetain := 2 + virtualMachine := &services.MockVirtualMachine{} + consensusContext := consensuscontext.NewConsensusContext(harness.txPool, virtualMachine, harness.stateStorage, config.ForConsensusContextTests(false), harness.Logger, metric.NewRegistry()) + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + committedBlockHeights := make(chan primitives.BlockHeight, 10) done := make(chan struct{}) + simulatedCommitsTarget := numOfStateRevisionsToRetain + 1 harness.gossip.When("BroadcastBlockAvailabilityRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockAvailabilityRequestInput) (*gossiptopics.EmptyOutput, error) { - respondToBroadcastAvailabilityRequest(ctx, harness, input, NUM_BLOCKS, 7) + respondToBroadcastAvailabilityRequest(ctx, harness, input, 4, 1) return nil, nil }) harness.gossip.When("SendBlockSyncRequest", mock.Any, mock.Any).Call(func(ctx context.Context, input *gossiptopics.BlockSyncRequestInput) (*gossiptopics.EmptyOutput, error) { - if input.Message.SignedChunkRange.LastBlockHeight() >= NUM_BLOCKS { - done <- struct{}{} - } - respondToBlockSyncRequestWithConcurrentCommit(t, ctx, harness, input, NUM_BLOCKS) + response := builders.BlockSyncResponseInput(). + WithFirstBlockHeight(input.Message.SignedChunkRange.FirstBlockHeight()). + WithLastBlockHeight(input.Message.SignedChunkRange.LastBlockHeight()). + WithLastCommittedBlockHeight(primitives.BlockHeight(4)). + WithSenderNodeAddress(input.RecipientNodeAddress).Build() + + go func() { + harness.blockStorage.HandleBlockSyncResponse(ctx, response) + }() return nil, nil }) + harness.stateStorage.When("GetLastCommittedBlockInfo", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.GetLastCommittedBlockInfoInput) (*services.GetLastCommittedBlockInfoOutput, error) { + output := harness.getLastBlockHeight(ctx, t) + return &services.GetLastCommittedBlockInfoOutput{ + LastCommittedBlockHeight: output.LastCommittedBlockHeight, + }, nil + }) + + virtualMachine.When("CallSystemContract", mock.Any, mock.Any).Call(func(ctx context.Context, input *services.CallSystemContractInput) (*services.CallSystemContractOutput, error) { + output, _ := harness.stateStorage.GetLastCommittedBlockInfo(ctx, &services.GetLastCommittedBlockInfoInput{}) + currentHeight := output.LastCommittedBlockHeight + if currentHeight >= input.BlockHeight + primitives.BlockHeight(numOfStateRevisionsToRetain) { + return nil, errors.New(fmt.Sprintf("unsupported block height: block %d too old. currently at %d. keeping %d back", input.BlockHeight, currentHeight, numOfStateRevisionsToRetain)) + } + return &services.CallSystemContractOutput{ + OutputArgumentArray: &protocol.ArgumentArray{}, + CallResult: protocol.EXECUTION_RESULT_SUCCESS, + }, nil + }) + harness.consensus.When("HandleBlockConsensus", mock.Any, mock.Any).Call(func(ctx context.Context, input *handlers.HandleBlockConsensusInput) (*handlers.HandleBlockConsensusOutput, error) { - if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE && input.PrevCommittedBlockPair != nil { - currHeight := input.BlockPair.TransactionsBlock.Header.BlockHeight() - prevHeight := input.PrevCommittedBlockPair.TransactionsBlock.Header.BlockHeight() - if currHeight != prevHeight+1 { - done <- struct{}{} - require.Failf(t, "HandleBlockConsensus given invalid args", "called with height %d and prev height %d", currHeight, prevHeight) - } + if input.Mode == handlers.HANDLE_BLOCK_CONSENSUS_MODE_VERIFY_AND_UPDATE { + simulateConsensusCommits(ctx, harness, blocks, committedBlockHeights, simulatedCommitsTarget) + simulateVerifyBlockConsensus(ctx, t, consensusContext, input.BlockPair.TransactionsBlock.Header.BlockHeight(), done) } return nil, nil }) @@ -59,39 +106,31 @@ func TestSyncPetitioner_Stress_CommitsDuringSync(t *testing.T) { harness.start(ctx) select { - case <-done: - // test passed - case <-ctx.Done(): - t.Fatalf("timed out waiting for sync flow to complete") + case <-done: + // test passed + case <-timeoutCtx.Done(): + t.Fatalf("timed out waiting for sync flow to recover") } }) } -// this would attempt to commit the same blocks at the same time from the sync flow and directly (simulating blocks arriving from consensus) -func respondToBlockSyncRequestWithConcurrentCommit(t testing.TB, ctx context.Context, harness *harness, input *gossiptopics.BlockSyncRequestInput, availableBlocks int) { - response := builders.BlockSyncResponseInput(). - WithFirstBlockHeight(input.Message.SignedChunkRange.FirstBlockHeight()). - WithLastBlockHeight(input.Message.SignedChunkRange.LastBlockHeight()). - WithLastCommittedBlockHeight(primitives.BlockHeight(availableBlocks)). - WithSenderNodeAddress(input.RecipientNodeAddress).Build() - go func() { - time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) - _, err := harness.blockStorage.HandleBlockSyncResponse(ctx, response) - require.NoError(t, err, "failed handling block sync response") - - }() +func simulateConsensusCommits(ctx context.Context, harness *harness, blocks []*protocol.BlockPairContainer, committedBlockHeights chan primitives.BlockHeight, target int) { + for i := 0; i < target; i++ { + _, err := harness.commitBlock(ctx, blocks[i]) + if err == nil { + committedBlockHeights <- blocks[i].ResultsBlock.Header.BlockHeight() + } + } +} +func simulateVerifyBlockConsensus(ctx context.Context, tb testing.TB, consensusContext services.ConsensusContext, currentBlockHeight primitives.BlockHeight, done chan struct{}) { go func() { - time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) - _, err := harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ - BlockPair: response.Message.BlockPairs[0], + consensusContext.RequestOrderingCommittee(ctx, &services.RequestCommitteeInput{ + CurrentBlockHeight: currentBlockHeight, + RandomSeed: 0, + MaxCommitteeSize: 4, }) - require.NoError(t, err, "failed committing first block in parallel to sync") - _, err = harness.blockStorage.CommitBlock(ctx, &services.CommitBlockInput{ - BlockPair: response.Message.BlockPairs[1], - }) - require.NoError(t, err, "failed committing second block in parallel to sync") - + done <- struct{}{} }() } diff --git a/services/consensuscontext/call_contract_ordered_committee.go b/services/consensuscontext/call_contract_ordered_committee.go index 621ad949f..b4e323d6a 100644 --- a/services/consensuscontext/call_contract_ordered_committee.go +++ b/services/consensuscontext/call_contract_ordered_committee.go @@ -19,14 +19,12 @@ import ( "time" ) -const CALL_COMMITTEE_CONTRACT_INTERVAL = 200 * time.Millisecond - func (s *service) getOrderedCommittee(ctx context.Context, currentBlockHeight primitives.BlockHeight) ([]primitives.NodeAddress, error) { logger := s.logger.WithTags(trace.LogFieldFrom(ctx)) // current block is used as seed and needs to be for the block being calculated Now. - logger.Info("system-call GetOrderedCommittee", logfields.BlockHeight(currentBlockHeight), log.Stringable("interval-between-attempts", CALL_COMMITTEE_CONTRACT_INTERVAL)) - orderedCommittee, err := s.callGetOrderedCommitteeSystemContractUntilSuccess(ctx, currentBlockHeight) + logger.Info("system-call GetOrderedCommittee", logfields.BlockHeight(currentBlockHeight)) + orderedCommittee, err := s.callGetOrderedCommitteeSystemContract(ctx, currentBlockHeight) if err != nil { return nil, err } @@ -35,35 +33,6 @@ func (s *service) getOrderedCommittee(ctx context.Context, currentBlockHeight pr return orderedCommittee, nil } -func (s *service) callGetOrderedCommitteeSystemContractUntilSuccess(ctx context.Context, blockHeight primitives.BlockHeight) ([]primitives.NodeAddress, error) { - attempts := 1 - for { - // exit on system shutdown - if ctx.Err() != nil { - return nil, errors.New("context terminated during callGetOrderedCommitteeSystemContractUntilSuccess") - } - - orderedCommittee, err := s.callGetOrderedCommitteeSystemContract(ctx, blockHeight) - if err == nil { - return orderedCommittee, nil - } - - // log every 500 failures - if attempts%500 == 1 { - if ctx.Err() == nil { // this may fail rightfully on graceful shutdown (ctx.Done), we don't want to report an error in this case - s.logger.Info("cannot get ordered committee from system contract", log.Error(err), logfields.BlockHeight(blockHeight), log.Int("attempts", attempts)) - } - } - - // sleep or wait for ctx done, whichever comes first - sleepOrShutdown, cancel := context.WithTimeout(ctx, CALL_COMMITTEE_CONTRACT_INTERVAL) - <-sleepOrShutdown.Done() - cancel() - - attempts++ - } -} - func (s *service) callGetOrderedCommitteeSystemContract(ctx context.Context, blockHeight primitives.BlockHeight) ([]primitives.NodeAddress, error) { systemContractName := primitives.ContractName(committee_systemcontract.CONTRACT_NAME) systemMethodName := primitives.MethodName(committee_systemcontract.METHOD_GET_ORDERED_COMMITTEE) From 4d54f92046355bb2c78153cbaf68718f1f8df0a6 Mon Sep 17 00:00:00 2001 From: gadcl Date: Sun, 22 Mar 2020 11:34:17 +0200 Subject: [PATCH 5/7] fixing test timing for race --- test/acceptance/committee_election_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/acceptance/committee_election_test.go b/test/acceptance/committee_election_test.go index 3f59480df..f6b93cdb7 100644 --- a/test/acceptance/committee_election_test.go +++ b/test/acceptance/committee_election_test.go @@ -175,8 +175,8 @@ func waitUntilCommitteeApplies(t testing.TB, ctx context.Context, network *Netwo lastBlock, err := network.BlockPersistence(0).GetLastBlockHeight() require.NoError(t, err) - network.committeeProvider.AddCommittee(uint64(lastBlock+3), committee) - network.WaitForBlock(ctx, lastBlock+4) + network.committeeProvider.AddCommittee(uint64(lastBlock+4), committee) + network.WaitForBlock(ctx, lastBlock+6) } func verifyTxSignersAreFromGroup(t testing.TB, ctx context.Context, api callcontract.CallContractAPI, txHash primitives.Sha256, nodeIndex int, allowedIndexes []int) { From ddd1d2ecd75621d057b0871a4996ec2f782e467a Mon Sep 17 00:00:00 2001 From: Noam Berg Date: Sun, 22 Mar 2020 12:10:00 +0200 Subject: [PATCH 6/7] fix that ref of committees is ordered in ascending order --- config/system_factory_presets.go | 2 +- go.mod | 1 - test/acceptance/committee_election_test.go | 5 ++++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/config/system_factory_presets.go b/config/system_factory_presets.go index d9cfeb45c..2c0d2adbb 100644 --- a/config/system_factory_presets.go +++ b/config/system_factory_presets.go @@ -209,7 +209,7 @@ func ForAcceptanceTestNetwork( emptyBlockTime = 50 * time.Millisecond } - cfg.SetDuration(MANAGEMENT_UPDATE_INTERVAL, 3*time.Millisecond) + cfg.SetDuration(MANAGEMENT_UPDATE_INTERVAL, 2*time.Millisecond) cfg.SetDuration(BENCHMARK_CONSENSUS_RETRY_INTERVAL, 50*time.Millisecond) cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 200*time.Millisecond) cfg.SetBool(LEAN_HELIX_SHOW_DEBUG, true) diff --git a/go.mod b/go.mod index ebf2b1fd0..7995c535b 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/ethereum/go-ethereum v1.9.6 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect - github.com/go-stack/stack v1.8.0 // indirect github.com/google/go-cmp v0.3.1 github.com/huin/goupnp v1.0.0 // indirect github.com/jackpal/go-nat-pmp v1.0.1 // indirect diff --git a/test/acceptance/committee_election_test.go b/test/acceptance/committee_election_test.go index f6b93cdb7..36fea9fae 100644 --- a/test/acceptance/committee_election_test.go +++ b/test/acceptance/committee_election_test.go @@ -21,6 +21,7 @@ import ( func TestLeanHelix_CommitTransactionToElected(t *testing.T) { NewHarness(). WithNumNodes(6). + WithLogFilters(log.DiscardAll()). WithConsensusAlgos(consensus.CONSENSUS_ALGO_TYPE_LEAN_HELIX). Start(t, func(t testing.TB, ctx context.Context, network *Network) { contract := callcontract.NewContractClient(network) @@ -61,6 +62,7 @@ func TestLeanHelix_CommitTransactionToElected(t *testing.T) { func TestLeanHelix_MultipleReElections(t *testing.T) { NewHarness(). WithNumNodes(6). + WithLogFilters(log.DiscardAll()). WithConsensusAlgos(consensus.CONSENSUS_ALGO_TYPE_LEAN_HELIX). Start(t, func(t testing.TB, ctx context.Context, network *Network) { contract := callcontract.NewContractClient(network) @@ -134,6 +136,7 @@ func TestLeanHelix_AllNodesLoseElectionButReturn(t *testing.T) { func TestLeanHelix_GrowingElectedAmount(t *testing.T) { NewHarness(). WithNumNodes(7). + WithLogFilters(log.DiscardAll()). WithConsensusAlgos(consensus.CONSENSUS_ALGO_TYPE_LEAN_HELIX). Start(t, func(t testing.TB, ctx context.Context, network *Network) { contract := callcontract.NewContractClient(network) @@ -175,7 +178,7 @@ func waitUntilCommitteeApplies(t testing.TB, ctx context.Context, network *Netwo lastBlock, err := network.BlockPersistence(0).GetLastBlockHeight() require.NoError(t, err) - network.committeeProvider.AddCommittee(uint64(lastBlock+4), committee) + network.committeeProvider.AddCommittee(uint64(lastBlock+3), committee) network.WaitForBlock(ctx, lastBlock+6) } From 6cb4d34ed8d2bded47851ba8b9eb425200737e37 Mon Sep 17 00:00:00 2001 From: gadcl Date: Sun, 22 Mar 2020 15:07:08 +0200 Subject: [PATCH 7/7] Update production config defaults - assumes reputation feature --- config/system_factory_presets.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/config/system_factory_presets.go b/config/system_factory_presets.go index 2c0d2adbb..df00812b2 100644 --- a/config/system_factory_presets.go +++ b/config/system_factory_presets.go @@ -25,8 +25,8 @@ func defaultProductionConfig() mutableNodeConfig { cfg.SetDuration(MANAGEMENT_UPDATE_INTERVAL, 1*time.Minute) cfg.SetUint32(MANAGEMENT_MAX_FILE_SIZE, 50 * (1<<20)) // 50 MB - // 2*slow_network_latency + avg_network_latency + 2*execution_time = 450ms - cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 4*time.Second) + // 2*slow_network_latency + avg_network_latency + 2*execution_time \ + empty block time + cfg.SetDuration(LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, 14*time.Second) cfg.SetDuration(BENCHMARK_CONSENSUS_RETRY_INTERVAL, 2*time.Second) cfg.SetUint32(LEAN_HELIX_CONSENSUS_MINIMUM_COMMITTEE_SIZE, 4) @@ -34,7 +34,7 @@ func defaultProductionConfig() mutableNodeConfig { cfg.SetBool(LEAN_HELIX_SHOW_DEBUG, false) // if above round time, we'll have leader changes when no traffic - cfg.SetDuration(TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS, 5*time.Second) + cfg.SetDuration(TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS, 9*time.Second) cfg.SetUint32(BENCHMARK_CONSENSUS_REQUIRED_QUORUM_PERCENTAGE, 66) @@ -42,7 +42,7 @@ func defaultProductionConfig() mutableNodeConfig { cfg.SetUint32(CONSENSUS_CONTEXT_MAXIMUM_TRANSACTIONS_IN_BLOCK, 1000) // max execution time (time validators allow until they get the executed block) - cfg.SetDuration(CONSENSUS_CONTEXT_SYSTEM_TIMESTAMP_ALLOWED_JITTER, 30*time.Second) + cfg.SetDuration(CONSENSUS_CONTEXT_SYSTEM_TIMESTAMP_ALLOWED_JITTER, 60*time.Second) // have triggers transactions by default cfg.SetBool(CONSENSUS_CONTEXT_TRIGGERS_ENABLED, true) @@ -55,7 +55,7 @@ func defaultProductionConfig() mutableNodeConfig { cfg.SetUint32(BLOCK_SYNC_NUM_BLOCKS_IN_BATCH, 100) // 4*LEAN_HELIX_CONSENSUS_ROUND_TIMEOUT_INTERVAL, if below TRANSACTION_POOL_TIME_BETWEEN_EMPTY_BLOCKS we'll constantly have syncs - cfg.SetDuration(BLOCK_SYNC_NO_COMMIT_INTERVAL, 6*time.Second) + cfg.SetDuration(BLOCK_SYNC_NO_COMMIT_INTERVAL, 18*time.Second) // makes sync slower, 4*slow_network_latency cfg.SetDuration(BLOCK_SYNC_COLLECT_RESPONSE_TIMEOUT, 1*time.Second)