Skip to content

Commit

Permalink
fixes after system tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdan-rosianu committed Sep 14, 2023
1 parent e1b2a6f commit 29ad567
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 36 deletions.
43 changes: 29 additions & 14 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,34 +178,38 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo

func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) {
var syncedNodesSource []*data.NodeData
if dataAvailability == data.AvailabilityRecent && len(bnp.snapshotlessNodes.GetSyncedNodes()) > 0 {
if dataAvailability == data.AvailabilityRecent {
syncedNodesSource = bnp.snapshotlessNodes.GetSyncedNodes()
} else {
syncedNodesSource = bnp.regularNodes.GetSyncedNodes()
}
syncedNodes := make([]*data.NodeData, 0)
for _, node := range syncedNodesSource {
if node.ShardId != shardId {
continue
}

syncedNodes = append(syncedNodes, node)
}
syncedNodes := filterNodesInShard(syncedNodesSource, shardId)
if len(syncedNodes) != 0 {
return syncedNodes, nil
}

if dataAvailability == data.AvailabilityRecent {
regularNodes := filterNodesInShard(bnp.regularNodes.GetSyncedNodes(), shardId)
if len(regularNodes) > 0 {
return regularNodes, nil
}
}

var fallbackNodesSource []*data.NodeData
if dataAvailability == data.AvailabilityRecent {
fallbackNodesSource = bnp.snapshotlessNodes.GetSyncedNodes()
fallbackNodesSource = bnp.snapshotlessNodes.GetSyncedFallbackNodes()
} else {
fallbackNodesSource = bnp.regularNodes.GetSyncedNodes()
fallbackNodesSource = bnp.regularNodes.GetSyncedFallbackNodes()
}
for _, node := range fallbackNodesSource {
if node.ShardId == shardId {
syncedNodes = append(syncedNodes, node)

if dataAvailability == data.AvailabilityRecent {
regularNodes := filterNodesInShard(bnp.regularNodes.GetSyncedNodes(), shardId)
if len(regularNodes) > 0 {
return regularNodes, nil
}
}

syncedNodes = filterNodesInShard(fallbackNodesSource, shardId)
if len(syncedNodes) != 0 {
return syncedNodes, nil
}
Expand All @@ -224,6 +228,17 @@ func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, d
return nil, ErrShardNotAvailable
}

func filterNodesInShard(nodes []*data.NodeData, shardID uint32) []*data.NodeData {
filteredSlice := make([]*data.NodeData, 0)
for _, node := range nodes {
if node.ShardId == shardID {
filteredSlice = append(filteredSlice, node)
}
}

return filteredSlice
}

func (bnp *baseNodeProvider) getSyncedNodesUnprotected(dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) {
syncedNodes := make([]*data.NodeData, 0)
for _, shardId := range bnp.shardIds {
Expand Down
26 changes: 18 additions & 8 deletions observer/holder/nodesHolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (nh *nodesHolder) UpdateNodes(nodesWithSyncStatus []*data.NodeData) {
if len(nodesWithSyncStatus) == 0 {
return
}
syncedNodes, syncedFallbackNodes, outOfSyncNodes, err := computeSyncedAndOutOfSyncNodes(nodesWithSyncStatus, nh.shardIDs)
syncedNodes, syncedFallbackNodes, outOfSyncNodes, err := computeSyncedAndOutOfSyncNodes(nodesWithSyncStatus, nh.shardIDs, nh.availability)
if err != nil {
log.Error("cannot update nodes based on sync state", "error", err)
return
Expand Down Expand Up @@ -148,14 +148,18 @@ func (nh *nodesHolder) printSyncedNodesInShardsUnprotected() {
totalNumOfActiveNodes++
inSyncAddresses[shardID] = append(inSyncAddresses[shardID], nh.lastSyncedNodes[shardID].Address)
}
log.Info(fmt.Sprintf("shard %d active nodes", shardID),
nodesType := "regular active nodes"
if nh.availability == data.AvailabilityRecent {
nodesType = "snapshotless active nodes"
}
log.Info(fmt.Sprintf("shard %d %s", shardID, nodesType),
"observers count", totalNumOfActiveNodes,
"addresses", strings.Join(inSyncAddresses[shardID], ", "),
"fallback addresses", strings.Join(inSyncFallbackAddresses[shardID], ", "))
}
}

func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, error) {
func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32, availability data.ObserverDataAvailabilityType) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, error) {
tempSyncedNodesMap := make(map[uint32][]*data.NodeData)
tempSyncedFallbackNodesMap := make(map[uint32][]*data.NodeData)
tempNotSyncedNodesMap := make(map[uint32][]*data.NodeData)
Expand Down Expand Up @@ -183,7 +187,9 @@ func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32) (

totalLen := len(tempSyncedNodesMap[shardID]) + len(tempSyncedFallbackNodesMap[shardID]) + len(tempNotSyncedNodesMap[shardID])
if totalLen == 0 {
return nil, nil, nil, fmt.Errorf("%w for shard %d - no synced or not synced node", errWrongConfiguration, shardID)
if availability != data.AvailabilityRecent {
return nil, nil, nil, fmt.Errorf("%w for shard %d - no synced or not synced node", errWrongConfiguration, shardID)
}
}
}

Expand Down Expand Up @@ -282,15 +288,19 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected(
syncedNodesMap map[uint32][]*data.NodeData,
syncedFallbackNodesMap map[uint32][]*data.NodeData,
) {
minSyncedNodes := 1
if nh.availability == data.AvailabilityRecent {
minSyncedNodes = 0 // allow the snapshotless list to be empty so regular observers can be used
}
if len(outOfSyncNodes) == 0 {
nh.outOfSyncNodes = make([]*data.NodeData, 0)
nh.outOfSyncFallbackNodes = make([]*data.NodeData, 0)
return
}

for _, outOfSyncNode := range outOfSyncNodes {
hasOneSyncedNode := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1
hasEnoughSyncedFallbackNodes := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) > 1
hasOneSyncedNode := len(syncedNodesMap[outOfSyncNode.ShardId]) >= minSyncedNodes
hasEnoughSyncedFallbackNodes := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) > minSyncedNodes
canDeleteFallbackNode := hasOneSyncedNode || hasEnoughSyncedFallbackNodes
if outOfSyncNode.IsFallback && canDeleteFallbackNode {
nh.removeNodeUnprotected(outOfSyncNode)
Expand All @@ -305,7 +315,7 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected(
continue
}

hasEnoughSyncedNodes := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1
hasEnoughSyncedNodes := len(syncedNodesMap[outOfSyncNode.ShardId]) >= minSyncedNodes
if hasEnoughSyncedNodes {
nh.removeNodeUnprotected(outOfSyncNode)
continue
Expand All @@ -326,7 +336,7 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected(
"shard", outOfSyncNode.ShardId)
nh.lastSyncedNodes[outOfSyncNode.ShardId] = outOfSyncNode
}
hasOneSyncedFallbackNode := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) >= 1
hasOneSyncedFallbackNode := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) >= minSyncedNodes
if hasOneSyncedFallbackNode {
nh.removeNodeUnprotected(outOfSyncNode)
continue
Expand Down
47 changes: 41 additions & 6 deletions observer/holder/nodesHolder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,28 @@ import (
"sort"
"testing"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-proxy-go/data"
"github.com/stretchr/testify/require"
)

func TestNodesHolder_GetLastSyncedNodes(t *testing.T) {
t.Parallel()

syncedNodes := []*data.NodeData{{Address: "addr0", ShardId: core.MetachainShardId}, {Address: "addr1", ShardId: 0}}
fallbackNodes := []*data.NodeData{{Address: "fallback-addr0", ShardId: core.MetachainShardId}, {Address: "fallback-addr1", ShardId: 0}}
shardIds := []uint32{0, core.MetachainShardId}

nodesHolder, err := NewNodesHolder(syncedNodes, fallbackNodes, shardIds, data.AvailabilityAll)
require.NoError(t, err)

require.Equal(t, syncedNodes, nodesHolder.GetSyncedNodes())
require.Equal(t, fallbackNodes, nodesHolder.GetSyncedFallbackNodes())
require.Empty(t, nodesHolder.GetOutOfSyncFallbackNodes())
require.Empty(t, nodesHolder.GetOutOfSyncNodes())
require.Empty(t, nodesHolder.GetLastSyncedNodes())
}

func TestComputeSyncAndOutOfSyncNodes(t *testing.T) {
t.Parallel()

Expand All @@ -18,6 +36,7 @@ func TestComputeSyncAndOutOfSyncNodes(t *testing.T) {
t.Run("all nodes are out of sync", testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced)
t.Run("invalid config - no node", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll)
t.Run("invalid config - no node in a shard", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard)
t.Run("snapshotless nodes should work with no node in a shard", testSnapshotlessNodesShouldWorkIfNoNodeInShardExists)
t.Run("edge case - address should not exist in both sync and not-synced lists", testEdgeCaseAddressShouldNotExistInBothLists)
}

Expand All @@ -32,7 +51,7 @@ func testComputeSyncedAndOutOfSyncNodesAllNodesSynced(t *testing.T) {
{Address: "3", ShardId: 1, IsSynced: true, IsFallback: true},
}

synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs)
synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll)
require.Equal(t, []*data.NodeData{
{Address: "0", ShardId: 0, IsSynced: true},
{Address: "2", ShardId: 1, IsSynced: true},
Expand All @@ -57,7 +76,7 @@ func testComputeSyncedAndOutOfSyncNodesEnoughSyncedObservers(t *testing.T) {
{Address: "5", ShardId: 1, IsSynced: true, IsFallback: true},
}

synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs)
synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll)
require.Equal(t, []*data.NodeData{
{Address: "0", ShardId: 0, IsSynced: true},
{Address: "3", ShardId: 1, IsSynced: true},
Expand All @@ -83,7 +102,7 @@ func testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced(t *testing.T) {
{Address: "3", ShardId: 1, IsSynced: false, IsFallback: true},
}

synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs)
synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll)
require.Equal(t, []*data.NodeData{}, synced)
require.Equal(t, []*data.NodeData{}, syncedFb)
require.Equal(t, input, notSynced)
Expand Down Expand Up @@ -143,7 +162,7 @@ func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll(t *testin

shardIDs := []uint32{0, 1}
var input []*data.NodeData
synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs)
synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll)
require.Error(t, err)
require.Nil(t, synced)
require.Nil(t, syncedFb)
Expand All @@ -156,7 +175,7 @@ func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll(t *testin
Address: "0", ShardId: 0, IsSynced: true,
},
}
synced, syncedFb, notSynced, err = computeSyncedAndOutOfSyncNodes(input, shardIDs)
synced, syncedFb, notSynced, err = computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll)
require.True(t, errors.Is(err, errWrongConfiguration))
require.Nil(t, synced)
require.Nil(t, syncedFb)
Expand All @@ -173,13 +192,29 @@ func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard(t *tes
Address: "0", ShardId: 0, IsSynced: true,
},
}
synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs)
synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll)
require.True(t, errors.Is(err, errWrongConfiguration))
require.Nil(t, synced)
require.Nil(t, syncedFb)
require.Nil(t, notSynced)
}

func testSnapshotlessNodesShouldWorkIfNoNodeInShardExists(t *testing.T) {
t.Parallel()

shardIDs := []uint32{0, core.MetachainShardId}
input := []*data.NodeData{
{
Address: "m", ShardId: core.MetachainShardId, IsSynced: true, IsSnapshotless: true,
},
}
synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityRecent)
require.NoError(t, err)
require.Empty(t, notSynced)
require.Empty(t, syncedFb)
require.Equal(t, input, synced)
}

func slicesHaveCommonObjects(firstSlice []*data.NodeData, secondSlice []*data.NodeData) bool {
nodeDataToStr := func(nd *data.NodeData) string {
return fmt.Sprintf("%s%d", nd.Address, nd.ShardId)
Expand Down
15 changes: 7 additions & 8 deletions process/baseProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -202,12 +202,11 @@ func (bp *BaseProcessor) CallGetRestEndPoint(

resp, err := bp.httpClient.Do(req)
if err != nil {
bp.triggerNodesSyncCheck(address)
if isTimeoutError(err) {
bp.triggerNodesSyncCheck(address)
return http.StatusRequestTimeout, err
}

bp.triggerNodesSyncCheck(address)
return http.StatusNotFound, err
}

Expand All @@ -218,7 +217,7 @@ func (bp *BaseProcessor) CallGetRestEndPoint(
}
}()

responseBodyBytes, err := ioutil.ReadAll(resp.Body)
responseBodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return http.StatusInternalServerError, err
}
Expand Down Expand Up @@ -262,12 +261,11 @@ func (bp *BaseProcessor) CallPostRestEndPoint(

resp, err := bp.httpClient.Do(req)
if err != nil {
bp.triggerNodesSyncCheck(address)
if isTimeoutError(err) {
bp.triggerNodesSyncCheck(address)
return http.StatusRequestTimeout, err
}

bp.triggerNodesSyncCheck(address)
return http.StatusNotFound, err
}

Expand All @@ -278,7 +276,7 @@ func (bp *BaseProcessor) CallPostRestEndPoint(
}
}()

responseBodyBytes, err := ioutil.ReadAll(resp.Body)
responseBodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return http.StatusInternalServerError, err
}
Expand Down Expand Up @@ -421,6 +419,7 @@ func (bp *BaseProcessor) isNodeSynced(node *proxyData.NodeData) (bool, error) {
"probable highest nonce", probableHighestNonce,
"is synced", isNodeSynced,
"is ready for VM Queries", isReadyForVMQueries,
"is snapshotless", node.IsSnapshotless,
"is fallback", node.IsFallback)

if !isReadyForVMQueries {
Expand Down Expand Up @@ -454,7 +453,7 @@ func (bp *BaseProcessor) getNodeStatusResponseFromAPI(url string) (*proxyData.No
return nil, resp.StatusCode, nil
}

responseBodyBytes, err := ioutil.ReadAll(resp.Body)
responseBodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, http.StatusInternalServerError, err
}
Expand Down

0 comments on commit 29ad567

Please sign in to comment.