Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

View change: Support for checking if validators belongs to the same key. #4802

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
5 changes: 0 additions & 5 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,6 @@ func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
return consensus.setCurBlockViewID(viewID)
}

// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}

// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
Expand Down
2 changes: 1 addition & 1 deletion consensus/quorum/quorom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func TestCIdentities_NthNextValidatorFailedEdgeCase2(t *testing.T) {
case <-done:
t.Error("Expected a timeout, but successfully calculated next leader")

case <-time.After(5 * time.Second):
case <-time.After(1 * time.Second):
t.Log("Test timed out, possible infinite loop")
}
}
Expand Down
15 changes: 7 additions & 8 deletions consensus/quorum/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type ParticipantTracker interface {
NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
FirstParticipant(shardingconfig.Instance) *bls.PublicKeyWrapper
NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error)
FirstParticipant() *bls.PublicKeyWrapper
UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper)
}

Expand Down Expand Up @@ -202,20 +203,18 @@ func (s *cIdentities) IndexOf(pubKey bls.SerializedPublicKey) int {
}

// NthNext return the Nth next pubkey, next can be negative number
func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false

func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) {
Frozen marked this conversation as resolved.
Show resolved Hide resolved
idx := s.IndexOf(pubKey.Bytes)
if idx != -1 {
found = true
if idx == -1 {
return nil, errors.Errorf("pubKey not found %x", pubKey.Bytes)
}
numNodes := int(s.ParticipantsCount())
// sanity check to avoid out of bound access
if numNodes <= 0 || numNodes > len(s.publicKeys) {
numNodes = len(s.publicKeys)
}
idx = (idx + next) % numNodes
return found, &s.publicKeys[idx]
return &s.publicKeys[idx], nil
}

// NthNextValidatorV2 returns the Nth next pubkey nodes, but from another validator.
Expand Down Expand Up @@ -314,7 +313,7 @@ func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.P
}

// FirstParticipant returns the first participant of the shard
func (s *cIdentities) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper {
func (s *cIdentities) FirstParticipant() *bls.PublicKeyWrapper {
return &s.publicKeys[0]
}

Expand Down
10 changes: 8 additions & 2 deletions consensus/quorum/thread_safe_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey
return a.decider.NthNextValidator(slotList, pubKey, next)
}

func (a threadSafeDeciderImpl) NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNext(pubKey, next)
}

func (a threadSafeDeciderImpl) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
Expand All @@ -68,10 +74,10 @@ func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubk
return a.decider.NthNextHmy(instance, pubkey, next)
}

func (a threadSafeDeciderImpl) FirstParticipant(instance shardingconfig.Instance) *bls.PublicKeyWrapper {
func (a threadSafeDeciderImpl) FirstParticipant() *bls.PublicKeyWrapper {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.FirstParticipant(instance)
return a.decider.FirstParticipant()
}

func (a threadSafeDeciderImpl) UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper) {
Expand Down
78 changes: 73 additions & 5 deletions consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (pm *State) SetCurBlockViewID(viewID uint64) uint64 {
return viewID
}

// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}

// GetViewChangingID return the current view changing id
// It is meaningful during view change mode
func (pm *State) GetViewChangingID() uint64 {
Expand Down Expand Up @@ -140,10 +145,27 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) {
return nextViewID, viewChangeDuration
}

// getNextLeaderKey uniquely determine who is the leader for given viewID
// It reads the current leader's pubkey based on the blockchain data and returns
// getNextLeaderKeySkipSameAddress uniquely determine who is the leader for given viewID
// It receives the committee and returns
// the next leader based on the gap of the viewID of the view change and the last
// know view id of the block.
func (consensus *Consensus) getNextLeaderKeySkipSameAddress(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper {
Frozen marked this conversation as resolved.
Show resolved Hide resolved
gap := 1

cur := consensus.getCurBlockViewID()
if viewID > cur {
gap = int(viewID - cur)
}
// use pubkey as default key as well
leaderPubKey := consensus.getLeaderPubKey()
rs, err := viewChangeNextValidator(consensus.decider, gap, committee.Slots, leaderPubKey)
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[getNextLeaderKeySkipSameAddress] viewChangeNextValidator failed")
return leaderPubKey
}
return rs
}

func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Committee) *bls.PublicKeyWrapper {
gap := 1

Expand Down Expand Up @@ -182,7 +204,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
// it can still sync with other validators.
if curHeader.IsLastBlockInEpoch() {
consensus.getLogger().Info().Msg("[getNextLeaderKey] view change in the first block of new epoch")
lastLeaderPubKey = consensus.decider.FirstParticipant(shard.Schedule.InstanceForEpoch(epoch))
lastLeaderPubKey = consensus.decider.FirstParticipant()
}
}
}
Expand Down Expand Up @@ -231,6 +253,46 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
return next
}

type nthNext interface {
NthNext(pubKey *bls.PublicKeyWrapper, next int) (*bls.PublicKeyWrapper, error)
}

func viewChangeNextValidator(decider nthNext, gap int, slots shard.SlotList, lastLeaderPubKey *bls.PublicKeyWrapper) (*bls.PublicKeyWrapper, error) {
if gap > 1 {
current, err := decider.NthNext(
lastLeaderPubKey,
gap-1)
if err != nil {
return nil, errors.WithMessagef(err, "NthNext failed, gap %d", gap)
}

publicToAddress := make(map[bls.SerializedPublicKey]common.Address)
for _, slot := range slots {
publicToAddress[slot.BLSPublicKey] = slot.EcdsaAddress
}

for i := 0; i < len(slots); i++ {
gap = gap + i
next, err := decider.NthNext(
lastLeaderPubKey,
gap)
if err != nil {
return nil, errors.New("current leader not found")
}

if publicToAddress[current.Bytes] != publicToAddress[next.Bytes] {
return next, nil
}
}
} else {
next, err := decider.NthNext(
lastLeaderPubKey,
gap)
return next, errors.WithMessagef(err, "NthNext failed, gap %d", gap)
}
return nil, errors.New("current leader not found")
}

func createTimeout() map[TimeoutType]*utils.Timeout {
timeouts := make(map[TimeoutType]*utils.Timeout)
timeouts[timeoutConsensus] = utils.NewTimeout(phaseDuration)
Expand All @@ -250,7 +312,8 @@ func (consensus *Consensus) startViewChange() {
consensus.current.SetMode(ViewChanging)
nextViewID, duration := consensus.getNextViewID()
consensus.setViewChangingID(nextViewID)
epoch := consensus.Blockchain().CurrentHeader().Epoch()
currentHeader := consensus.Blockchain().CurrentHeader()
epoch := currentHeader.Epoch()
ss, err := consensus.Blockchain().ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
Expand All @@ -267,7 +330,12 @@ func (consensus *Consensus) startViewChange() {
// aganist the consensus.LeaderPubKey variable.
// Ideally, we shall use another variable to keep track of the
// leader pubkey in viewchange mode
consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee))
c := consensus.Blockchain().Config()
if c.IsLeaderRotationV2Epoch(currentHeader.Epoch()) {
consensus.setLeaderPubKey(consensus.getNextLeaderKeySkipSameAddress(nextViewID, committee))
} else {
consensus.setLeaderPubKey(consensus.getNextLeaderKey(nextViewID, committee))
}

consensus.getLogger().Warn().
Uint64("nextViewID", nextViewID).
Expand Down
140 changes: 138 additions & 2 deletions consensus/view_change_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package consensus

import (
"math/big"
"testing"

"github.com/harmony-one/harmony/crypto/bls"

"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/crypto/bls"
harmony_bls "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/shard"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBasicViewChanging(t *testing.T) {
Expand Down Expand Up @@ -118,3 +122,135 @@ func TestGetNextLeaderKeyShouldSucceed(t *testing.T) {

assert.Equal(t, nextKey, &wrappedBLSKeys[1])
}

func TestViewChangeNextValidator(t *testing.T) {
decider := quorum.NewDecider(quorum.SuperMajorityVote, shard.BeaconChainShardID)
assert.Equal(t, int64(0), decider.ParticipantsCount())
wrappedBLSKeys := []bls.PublicKeyWrapper{}

const keyCount = 5
for i := 0; i < keyCount; i++ {
blsKey := harmony_bls.RandPrivateKey()
blsPubKey := harmony_bls.WrapperFromPrivateKey(blsKey)
wrappedBLSKeys = append(wrappedBLSKeys, *blsPubKey.Pub)
}

decider.UpdateParticipants(wrappedBLSKeys, []bls.PublicKeyWrapper{})
assert.EqualValues(t, keyCount, decider.ParticipantsCount())

t.Run("check_different_address_for_validators", func(t *testing.T) {
var (
rs *bls.PublicKeyWrapper
err error
slots []shard.Slot
)
for i := 0; i < keyCount; i++ {
slot := shard.Slot{
EcdsaAddress: common.BigToAddress(big.NewInt(int64(i))),
BLSPublicKey: wrappedBLSKeys[i].Bytes,
}
slots = append(slots, slot)
}

rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[0], rs)

rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)

rs, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[2], rs)

// and no panic or error for future 1k gaps
for i := 0; i < 1000; i++ {
_, err = viewChangeNextValidator(decider, i, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
}
})

// we can't find next validator, because all validators have the same address
t.Run("same_address_for_all_validators", func(t *testing.T) {
var (
rs *bls.PublicKeyWrapper
err error
slots []shard.Slot
)
for i := 0; i < keyCount; i++ {
slot := shard.Slot{
EcdsaAddress: common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3")),
BLSPublicKey: wrappedBLSKeys[i].Bytes,
}
slots = append(slots, slot)
}

rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[0], rs)

rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)

// error because all validators belong same address
_, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0])
require.Error(t, err)

// all of them return error, no way to recover
for i := 2; i < 1000; i++ {
_, err = viewChangeNextValidator(decider, i, slots, &wrappedBLSKeys[0])
require.Errorf(t, err, "error because all validators belong same address %d", i)
}
})

// we can't find next validator, because all validators have the same address
t.Run("check_5_validators_2_addrs", func(t *testing.T) {
// Slot represents node id (BLS address)
var (
addr1 = common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z3"))
addr2 = common.BytesToAddress([]byte("one1ay37rp2pc3kjarg7a322vu3sa8j9puahg679z4"))
rs *bls.PublicKeyWrapper
err error
)
slots := []shard.Slot{
{
EcdsaAddress: addr1,
BLSPublicKey: wrappedBLSKeys[0].Bytes,
},
{
EcdsaAddress: addr1,
BLSPublicKey: wrappedBLSKeys[1].Bytes,
},
{
EcdsaAddress: addr2,
BLSPublicKey: wrappedBLSKeys[2].Bytes,
},
{
EcdsaAddress: addr2,
BLSPublicKey: wrappedBLSKeys[3].Bytes,
},
{
EcdsaAddress: addr2,
BLSPublicKey: wrappedBLSKeys[4].Bytes,
},
}

rs, err = viewChangeNextValidator(decider, 0, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[0], rs)

rs, err = viewChangeNextValidator(decider, 1, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)

rs, err = viewChangeNextValidator(decider, 2, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[2], rs)

rs, err = viewChangeNextValidator(decider, 3, slots, &wrappedBLSKeys[0])
require.NoError(t, err)
require.Equal(t, &wrappedBLSKeys[1], rs)
})
}
2 changes: 1 addition & 1 deletion internal/chain/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"sort"
"time"

bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/numeric"

bls2 "github.com/harmony-one/bls/ffi/go/bls"
blsvrf "github.com/harmony-one/harmony/crypto/vrf/bls"

"github.com/ethereum/go-ethereum/common"
Expand Down