Skip to content

Commit

Permalink
configurable number of shards
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Sep 4, 2024
1 parent 357efad commit c9cc5f5
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 73 deletions.
3 changes: 3 additions & 0 deletions cmd/proxy/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
# With this flag disabled, /transaction/pool route will return an error
AllowEntireTxPoolFetch = false

# NumberOfShards represents the total number of shards from the network (excluding metachain)
NumberOfShards = 3

[AddressPubkeyConverter]
#Length specifies the length in bytes of an address
Length = 32
Expand Down
21 changes: 2 additions & 19 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func createVersionsRegistryTestOrProduction(
ValStatsCacheValidityDurationSec: 60,
EconomicsMetricsCacheValidityDurationSec: 6,
FaucetValue: "10000000000",
NumberOfShards: 2,
},
ApiLogging: config.ApiLoggingConfig{
LoggingEnabled: true,
Expand Down Expand Up @@ -408,7 +409,7 @@ func createVersionsRegistry(
return nil, err
}

shardCoord, err := getShardCoordinator(cfg)
shardCoord, err := sharding.NewMultiShardCoordinator(uint32(cfg.GeneralSettings.NumberOfShards)+1, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -556,24 +557,6 @@ func createVersionsRegistry(
return versionsFactory.CreateVersionsRegistry(facadeArgs, apiConfigParser)
}

func getShardCoordinator(cfg *config.Config) (common.Coordinator, error) {
maxShardID := uint32(0)
for _, obs := range cfg.Observers {
shardID := obs.ShardId
isMetaChain := shardID == core.MetachainShardId
if maxShardID < shardID && !isMetaChain {
maxShardID = shardID
}
}

shardCoordinator, err := sharding.NewMultiShardCoordinator(maxShardID+1, 0)
if err != nil {
return nil, err
}

return shardCoordinator, nil
}

func startWebServer(
versionsRegistry data.VersionsRegistryHandler,
generalConfig *config.Config,
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type GeneralSettingsConfig struct {
BalancedObservers bool
BalancedFullHistoryNodes bool
AllowEntireTxPoolFetch bool
NumberOfShards uint32
}

// Config will hold the whole config file's data
Expand Down
37 changes: 24 additions & 13 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type baseNodeProvider struct {
mutNodes sync.RWMutex
shardIds []uint32
numOfShards uint32
configurationFilePath string
regularNodes NodesHolder
snapshotlessNodes NodesHolder
Expand All @@ -28,6 +29,19 @@ func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error {
for _, observer := range nodes {
shardId := observer.ShardId
newNodes[shardId] = append(newNodes[shardId], observer)
isMeta := shardId == core.MetachainShardId
if isMeta {
continue
}

if shardId > bnp.numOfShards {
return fmt.Errorf("%w for observer %s, provided shard %d, number of shards configured %d",
ErrInvalidShard,
observer.Address,
observer.ShardId,
bnp.numOfShards,
)
}
}

err := checkNodesInShards(newNodes)
Expand Down Expand Up @@ -116,10 +130,6 @@ func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []*

// ReloadNodes will reload the observers or the full history observers
func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse {
bnp.mutNodes.RLock()
numOldShardsCount := len(bnp.shardIds)
bnp.mutNodes.RUnlock()

newConfig, err := loadMainConfig(bnp.configurationFilePath)
if err != nil {
return data.NodesReloadResponse{
Expand All @@ -129,21 +139,22 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
}
}

numOldShards := bnp.numOfShards
numNewShards := newConfig.GeneralSettings.NumberOfShards
if numOldShards != numNewShards {
return data.NodesReloadResponse{
OkRequest: false,
Description: "not reloaded",
Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShards, numNewShards),
}
}

nodes := newConfig.Observers
if nodesType == data.FullHistoryNode {
nodes = newConfig.FullHistoryNodes
}

newNodes := nodesSliceToShardedMap(nodes)
numNewShardsCount := len(newNodes)

if numOldShardsCount != numNewShardsCount {
return data.NodesReloadResponse{
OkRequest: false,
Description: "not reloaded",
Error: fmt.Sprintf("different number of shards. before: %d, now: %d", numOldShardsCount, numNewShardsCount),
}
}

bnp.mutNodes.Lock()
defer bnp.mutNodes.Unlock()
Expand Down
65 changes: 57 additions & 8 deletions observer/baseNodeProvider_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package observer

import (
"errors"
"strings"
"testing"

"github.com/multiversx/mx-chain-core-go/core"
Expand Down Expand Up @@ -40,15 +42,42 @@ func TestBaseNodeProvider_InvalidNodesConfiguration(t *testing.T) {
},
}

bnp := baseNodeProvider{}
bnp := baseNodeProvider{
numOfShards: 1,
}
err := bnp.initNodes(nodes)
require.Contains(t, err.Error(), "observers for shard 1 must include at least one historical (non-snapshotless) observer")
}

func TestBaseNodeProvider_InvalidShardForObserver(t *testing.T) {
t.Parallel()

nodes := []*data.NodeData{
{
Address: "addr0",
ShardId: 0,
IsSnapshotless: false,
},
{
Address: "addr1",
ShardId: 2,
IsSnapshotless: true,
},
}

bnp := baseNodeProvider{
numOfShards: 1,
}
err := bnp.initNodes(nodes)
require.True(t, errors.Is(err, ErrInvalidShard))
require.True(t, strings.Contains(err.Error(), "addr1"))
}

func TestBaseNodeProvider_ReloadNodesDifferentNumberOfNewShard(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1},
numOfShards: 2,
}

response := bnp.ReloadNodes(data.Observer)
Expand All @@ -66,14 +95,34 @@ func TestBaseNodeProvider_ReloadNodesConfigurationFileNotFound(t *testing.T) {
}

func TestBaseNodeProvider_ReloadNodesShouldWork(t *testing.T) {
bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1, core.MetachainShardId},
}
t.Parallel()

response := bnp.ReloadNodes(data.Observer)
require.True(t, response.OkRequest)
require.Empty(t, response.Error)
t.Run("same number of observer shards provided", func(t *testing.T) {
t.Parallel()

bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1, core.MetachainShardId},
numOfShards: 3,
}

response := bnp.ReloadNodes(data.Observer)
require.True(t, response.OkRequest)
require.Empty(t, response.Error)
})
t.Run("more observer shards provided", func(t *testing.T) {
t.Parallel()

bnp := &baseNodeProvider{
configurationFilePath: configurationPath,
shardIds: []uint32{0, 1, core.MetachainShardId}, // no observer for shard 2, will come after reload
numOfShards: 3, // same as in configurationPath
}

response := bnp.ReloadNodes(data.Observer)
require.True(t, response.OkRequest)
require.Empty(t, response.Error)
})
}

func TestBaseNodeProvider_prepareReloadResponseMessage(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion observer/circularQueueNodesProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ type circularQueueNodesProvider struct {
}

// NewCircularQueueNodesProvider returns a new instance of circularQueueNodesProvider
func NewCircularQueueNodesProvider(observers []*data.NodeData, configurationFilePath string) (*circularQueueNodesProvider, error) {
func NewCircularQueueNodesProvider(
observers []*data.NodeData,
configurationFilePath string,
numberOfShards uint32,
) (*circularQueueNodesProvider, error) {
bop := &baseNodeProvider{
configurationFilePath: configurationFilePath,
numOfShards: numberOfShards,
}

err := bop.initNodes(observers)
Expand Down
19 changes: 11 additions & 8 deletions observer/circularQueueNodesProvider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func getDummyConfig() config.Config {
ShardId: 1,
},
},
GeneralSettings: config.GeneralSettingsConfig{
NumberOfShards: 2,
},
}
}

Expand All @@ -31,7 +34,7 @@ func TestNewCircularQueueObserverProvider_EmptyObserversListShouldErr(t *testing

cfg := getDummyConfig()
cfg.Observers = make([]*data.NodeData, 0)
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))
assert.Nil(t, cqop)
assert.Equal(t, ErrEmptyObserversList, err)
}
Expand All @@ -40,7 +43,7 @@ func TestNewCircularQueueObserverProvider_ShouldWork(t *testing.T) {
t.Parallel()

cfg := getDummyConfig()
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, err := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))
assert.Nil(t, err)
assert.False(t, check.IfNil(cqop))
}
Expand All @@ -50,7 +53,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldWork(t *testi

shardId := uint32(0)
cfg := getDummyConfig()
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res, err := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
assert.Nil(t, err)
Expand All @@ -77,7 +80,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardIdShouldBalanceObserv
},
},
}
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res1, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
res2, _ := cqop.GetNodesByShardId(shardId, data.AvailabilityAll)
Expand All @@ -94,7 +97,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWork(t *testing.T)
t.Parallel()

cfg := getDummyConfig()
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res, err := cqop.GetAllNodes(data.AvailabilityAll)
assert.NoError(t, err)
Expand All @@ -120,7 +123,7 @@ func TestCircularQueueObserversProvider_GetAllObserversShouldWorkAndBalanceObser
},
},
}
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

res1, _ := cqop.GetAllNodes(data.AvailabilityAll)
res2, _ := cqop.GetAllNodes(data.AvailabilityAll)
Expand Down Expand Up @@ -166,7 +169,7 @@ func TestCircularQueueObserversProvider_GetAllObservers_ConcurrentSafe(t *testin

expectedNumOfTimesAnObserverIsCalled := (numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart) / len(observers)

cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

for i := 0; i < numOfGoRoutinesToStart; i++ {
for j := 0; j < numOfTimesToCallForEachRoutine; j++ {
Expand Down Expand Up @@ -225,7 +228,7 @@ func TestCircularQueueObserversProvider_GetObserversByShardId_ConcurrentSafe(t *

expectedNumOfTimesAnObserverIsCalled := 2 * ((numOfTimesToCallForEachRoutine * numOfGoRoutinesToStart) / len(observers))

cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path")
cqop, _ := NewCircularQueueNodesProvider(cfg.Observers, "path", uint32(len(cfg.Observers)))

for i := 0; i < numOfGoRoutinesToStart; i++ {
for j := 0; j < numOfTimesToCallForEachRoutine; j++ {
Expand Down
3 changes: 3 additions & 0 deletions observer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ var ErrEmptyObserversList = errors.New("empty observers list")

// ErrShardNotAvailable signals that the specified shard ID cannot be found in internal maps
var ErrShardNotAvailable = errors.New("the specified shard ID does not exist in proxy's configuration")

// ErrInvalidShard signals that an invalid shard has been provided
var ErrInvalidShard = errors.New("invalid shard")
20 changes: 16 additions & 4 deletions observer/nodesProviderFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,36 @@ func NewNodesProviderFactory(cfg config.Config, configurationFilePath string) (*
// CreateObservers will create and return an object of type NodesProviderHandler based on a flag
func (npf *nodesProviderFactory) CreateObservers() (NodesProviderHandler, error) {
if npf.cfg.GeneralSettings.BalancedObservers {
return NewCircularQueueNodesProvider(npf.cfg.Observers, npf.configurationFilePath)
return NewCircularQueueNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
}

return NewSimpleNodesProvider(npf.cfg.Observers, npf.configurationFilePath)
return NewSimpleNodesProvider(
npf.cfg.Observers,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
}

// CreateFullHistoryNodes will create and return an object of type NodesProviderHandler based on a flag
func (npf *nodesProviderFactory) CreateFullHistoryNodes() (NodesProviderHandler, error) {
if npf.cfg.GeneralSettings.BalancedFullHistoryNodes {
nodesProviderHandler, err := NewCircularQueueNodesProvider(npf.cfg.FullHistoryNodes, npf.configurationFilePath)
nodesProviderHandler, err := NewCircularQueueNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}

return nodesProviderHandler, nil
}

nodesProviderHandler, err := NewSimpleNodesProvider(npf.cfg.FullHistoryNodes, npf.configurationFilePath)
nodesProviderHandler, err := NewSimpleNodesProvider(
npf.cfg.FullHistoryNodes,
npf.configurationFilePath,
npf.cfg.GeneralSettings.NumberOfShards)
if err != nil {
return getDisabledFullHistoryNodesProviderIfNeeded(err)
}
Expand Down
7 changes: 6 additions & 1 deletion observer/simpleNodesProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ type simpleNodesProvider struct {
}

// NewSimpleNodesProvider will return a new instance of simpleNodesProvider
func NewSimpleNodesProvider(observers []*data.NodeData, configurationFilePath string) (*simpleNodesProvider, error) {
func NewSimpleNodesProvider(
observers []*data.NodeData,
configurationFilePath string,
numberOfShards uint32,
) (*simpleNodesProvider, error) {
bop := &baseNodeProvider{
configurationFilePath: configurationFilePath,
numOfShards: numberOfShards,
}

err := bop.initNodes(observers)
Expand Down
Loading

0 comments on commit c9cc5f5

Please sign in to comment.