Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Leader Rotation Logic to Address Edge Cases in Leader Selection #4798

Merged
merged 11 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
return defaultKey
}
const blocksCountAliveness = 4
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotationInternalValidators(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch))
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v rotation v2: %v",
epoch.Uint64(),
bc.Config().IsLeaderRotationInternalValidators(epoch),
bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch),
bc.Config().IsLeaderRotationV2Epoch(epoch))
ss, err := bc.ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
Expand Down Expand Up @@ -758,7 +762,9 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
)

for i := 0; i < len(committee.Slots); i++ {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
if bc.Config().IsLeaderRotationV2Epoch(epoch) {
wasFound, next = consensus.decider.NthNextValidatorV2(committee.Slots, leader, offset)
} else if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset)
} else {
wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
Expand Down
219 changes: 211 additions & 8 deletions consensus/quorum/quorom_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package quorum

import (
"fmt"
"math/big"
"strings"
"testing"
"time"

bls_core "github.com/harmony-one/bls/ffi/go/bls"
harmony_bls "github.com/harmony-one/harmony/crypto/bls"
Expand Down Expand Up @@ -549,32 +551,233 @@ func TestInvalidAggregateSig(test *testing.T) {
}
}

func TestCIdentities_NthNextValidatorHmy(t *testing.T) {
address := []common.Address{
common.HexToAddress("0x1"),
common.HexToAddress("0x2"),
common.HexToAddress("0x3"),
func createTestCIdentities(numAddresses int, keysPerAddress int) (*cIdentities, shard.SlotList, []harmony_bls.PublicKeyWrapper) {
testAddresses := make([]common.Address, 0, numAddresses*numAddresses)
for i := int(0); i < numAddresses; i++ {
h := fmt.Sprintf("0x%040x", i)
addr := common.HexToAddress(h)
testAddresses = append(testAddresses, addr)
}
slots := shard.SlotList{}
list := []harmony_bls.PublicKeyWrapper{}
for i := 0; i < 3; i++ {
for j := 0; j < 3; j++ {
// generate slots and public keys
for i := 0; i < numAddresses; i++ {
for j := 0; j < keysPerAddress; j++ { // keys per address
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: address[i%3],
EcdsaAddress: testAddresses[i],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}
}
// initialize and update cIdentities
c := newCIdentities()
c.UpdateParticipants(list, []bls.PublicKeyWrapper{})
return c, slots, list
}

func TestCIdentities_NthNextValidatorHmy(t *testing.T) {
c, slots, list := createTestCIdentities(3, 3)

found, key := c.NthNextValidator(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 3 keys of current validator
require.Equal(t, 3, c.IndexOf(key.Bytes))
}

func TestCIdentities_NthNextValidatorFailedEdgeCase1(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Logf("Recovered from panic as expected: %v", r)
} else {
t.Errorf("Expected a panic when next is 0, but no panic occurred")
}
}()

// create test identities and slots
c, slots, _ := createTestCIdentities(3, 3)

// create a public key wrapper that doesn't exist in the identities
t.Log("creating a random public key wrapper not present in test identities")
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}

// Edge Case: Trigger NthNextValidator with next=0, which should cause a panic
t.Log("Calling NthNextValidator with next=0 to test panic handling")
c.NthNextValidator(slots, &wrapper, 0)
}

func TestCIdentities_NthNextValidatorFailedEdgeCase2(t *testing.T) {
// create test identities and slots
c, slots, list := createTestCIdentities(1, 3)

done := make(chan bool)

go func() {
// possible infinite loop, it will time out
c.NthNextValidator(slots, &list[1], 1)

done <- true
}()

select {
case <-done:
t.Error("Expected a timeout, but successfully calculated next leader")

case <-time.After(5 * time.Second):
t.Log("Test timed out, possible infinite loop")
}
}

func TestCIdentities_NthNextValidatorFailedEdgeCase3(t *testing.T) {
// create 3 test addresses
testAddresses := make([]common.Address, 0, 3)
for i := int(0); i < 3; i++ {
h := fmt.Sprintf("0x%040x", i)
addr := common.HexToAddress(h)
testAddresses = append(testAddresses, addr)
}
slots := shard.SlotList{}
list := []harmony_bls.PublicKeyWrapper{}

// First add 4 keys for first address
for i := 0; i < 4; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[0],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// Then add 1 key for next two addresses
for i := 1; i < 3; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[i],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// initialize and update cIdentities
c := newCIdentities()
c.UpdateParticipants(list, []bls.PublicKeyWrapper{})

// current key is the first one.
found, key := c.NthNextValidator(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 4 keys of first validator, the next validator key index is 4 (starts from 0)
// but it returns 5 and skips second validator (key index: 4)
require.Equal(t, 5, c.IndexOf(key.Bytes))
t.Log("second validator were skipped")
}

func TestCIdentities_NthNextValidatorV2Hmy(t *testing.T) {
c, slots, list := createTestCIdentities(3, 3)

found, key := c.NthNextValidatorV2(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 3 keys of current validator
require.Equal(t, 3, c.IndexOf(key.Bytes))
}

func TestCIdentities_NthNextValidatorV2EdgeCase1(t *testing.T) {
// create test identities and slots
c, slots, _ := createTestCIdentities(3, 3)

// create a public key wrapper that doesn't exist in the identities
t.Log("creating a random public key wrapper not present in test identities")
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}

// Edge Case: Trigger NthNextValidator with next=0, which should cause a panic
t.Log("Calling NthNextValidatorV2 with next=0 to test panic handling")
found, key := c.NthNextValidatorV2(slots, &wrapper, 0)

require.Equal(t, true, found)
require.Equal(t, 0, c.IndexOf(key.Bytes))
}

func TestCIdentities_NthNextValidatorV2EdgeCase2(t *testing.T) {
// create test identities and slots
c, slots, list := createTestCIdentities(1, 3)

done := make(chan bool)

go func() {
c.NthNextValidatorV2(slots, &list[1], 1)

done <- true
}()

select {
case <-done:
t.Log("Test completed successfully ")
case <-time.After(5 * time.Second):
t.Error("timeout, possible infinite loop")
}
}

func TestCIdentities_NthNextValidatorV2EdgeCase3(t *testing.T) {
// create 3 test addresses
testAddresses := make([]common.Address, 0, 3)
for i := int(0); i < 3; i++ {
h := fmt.Sprintf("0x%040x", i)
addr := common.HexToAddress(h)
testAddresses = append(testAddresses, addr)
}
slots := shard.SlotList{}
list := []harmony_bls.PublicKeyWrapper{}

// First add 4 keys for first address
for i := 0; i < 4; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[0],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// Then add 1 key for next two addresses
for i := 1; i < 3; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[i],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// initialize and update cIdentities
c := newCIdentities()
c.UpdateParticipants(list, []bls.PublicKeyWrapper{})

// current key is the first one.
found, key := c.NthNextValidatorV2(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 4 keys of first validator, the next validator key index is 4 (starts from 0)
require.Equal(t, 4, c.IndexOf(key.Bytes))
}
42 changes: 41 additions & 1 deletion consensus/quorum/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ParticipantTracker interface {
ParticipantsCount() int64
// NthNextValidator returns key for next validator. It assumes external validators and leader rotation.
NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
FirstParticipant(shardingconfig.Instance) *bls.PublicKeyWrapper
UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper)
Expand Down Expand Up @@ -217,7 +218,46 @@ func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bl
return found, &s.publicKeys[idx]
}

// NthNextValidator return the Nth next pubkey nodes, but from another validator.
// NthNextValidatorV2 returns the Nth next pubkey nodes, but from another validator.
func (s *cIdentities) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
if len(s.publicKeys) == 0 || next < 0 {
return false, pubKey
}

publicToAddress := make(map[bls.SerializedPublicKey]common.Address, len(slotList))
for _, slot := range slotList {
publicToAddress[slot.BLSPublicKey] = slot.EcdsaAddress
}

pubKeyIndex := s.IndexOf(pubKey.Bytes)
if pubKeyIndex == -1 {
utils.Logger().Error().
Str("key", pubKey.Bytes.Hex()).
Msg("[NthNextValidator] pubKey not found")
}

if pubKeyIndex == -1 && next == 0 {
return true, &s.publicKeys[0]
}

numKeys := len(s.publicKeys)
attempts := 0

for {
idx := (pubKeyIndex + attempts + next) % numKeys
if attempts > numKeys {
utils.Logger().Warn().
Str("key", pubKey.Bytes.Hex()).
Msg("[NthNextValidator] Could not find a different validator within limit")
return false, pubKey
}
if publicToAddress[s.publicKeys[idx].Bytes] != publicToAddress[pubKey.Bytes] {
return true, &s.publicKeys[idx]
}
attempts++
}
}

func (s *cIdentities) NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false

Expand Down
6 changes: 6 additions & 0 deletions consensus/quorum/thread_safe_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey
return a.decider.NthNextValidator(slotList, pubKey, next)
}

func (a threadSafeDeciderImpl) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextValidator(slotList, pubKey, next)
}

func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
Expand Down
7 changes: 6 additions & 1 deletion consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,12 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
var wasFound bool
var next *bls.PublicKeyWrapper
if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
if blockchain.Config().IsLeaderRotationV2Epoch(epoch) {
wasFound, next = consensus.decider.NthNextValidatorV2(
committee.Slots,
lastLeaderPubKey,
gap)
} else if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.decider.NthNextValidator(
committee.Slots,
lastLeaderPubKey,
Expand Down
Loading