diff --git a/observer/baseNodeProvider.go b/observer/baseNodeProvider.go index 5fbe5b22..a4d7c18c 100644 --- a/observer/baseNodeProvider.go +++ b/observer/baseNodeProvider.go @@ -178,34 +178,38 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) { var syncedNodesSource []*data.NodeData - if dataAvailability == data.AvailabilityRecent && len(bnp.snapshotlessNodes.GetSyncedNodes()) > 0 { + if dataAvailability == data.AvailabilityRecent { syncedNodesSource = bnp.snapshotlessNodes.GetSyncedNodes() } else { syncedNodesSource = bnp.regularNodes.GetSyncedNodes() } - syncedNodes := make([]*data.NodeData, 0) - for _, node := range syncedNodesSource { - if node.ShardId != shardId { - continue - } - - syncedNodes = append(syncedNodes, node) - } + syncedNodes := filterNodesInShard(syncedNodesSource, shardId) if len(syncedNodes) != 0 { return syncedNodes, nil } + if dataAvailability == data.AvailabilityRecent { + regularNodes := filterNodesInShard(bnp.regularNodes.GetSyncedNodes(), shardId) + if len(regularNodes) > 0 { + return regularNodes, nil + } + } + var fallbackNodesSource []*data.NodeData if dataAvailability == data.AvailabilityRecent { - fallbackNodesSource = bnp.snapshotlessNodes.GetSyncedNodes() + fallbackNodesSource = bnp.snapshotlessNodes.GetSyncedFallbackNodes() } else { - fallbackNodesSource = bnp.regularNodes.GetSyncedNodes() + fallbackNodesSource = bnp.regularNodes.GetSyncedFallbackNodes() } - for _, node := range fallbackNodesSource { - if node.ShardId == shardId { - syncedNodes = append(syncedNodes, node) + + if dataAvailability == data.AvailabilityRecent { + regularNodes := filterNodesInShard(bnp.regularNodes.GetSyncedNodes(), shardId) + if len(regularNodes) > 0 { + return regularNodes, nil } } + + syncedNodes = filterNodesInShard(fallbackNodesSource, shardId) if len(syncedNodes) != 0 { return syncedNodes, nil } @@ -224,6 +228,17 @@ func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, d return nil, ErrShardNotAvailable } +func filterNodesInShard(nodes []*data.NodeData, shardID uint32) []*data.NodeData { + filteredSlice := make([]*data.NodeData, 0) + for _, node := range nodes { + if node.ShardId == shardID { + filteredSlice = append(filteredSlice, node) + } + } + + return filteredSlice +} + func (bnp *baseNodeProvider) getSyncedNodesUnprotected(dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) { syncedNodes := make([]*data.NodeData, 0) for _, shardId := range bnp.shardIds { diff --git a/observer/holder/nodesHolder.go b/observer/holder/nodesHolder.go index b28f2625..a765feb5 100644 --- a/observer/holder/nodesHolder.go +++ b/observer/holder/nodesHolder.go @@ -49,7 +49,7 @@ func (nh *nodesHolder) UpdateNodes(nodesWithSyncStatus []*data.NodeData) { if len(nodesWithSyncStatus) == 0 { return } - syncedNodes, syncedFallbackNodes, outOfSyncNodes, err := computeSyncedAndOutOfSyncNodes(nodesWithSyncStatus, nh.shardIDs) + syncedNodes, syncedFallbackNodes, outOfSyncNodes, err := computeSyncedAndOutOfSyncNodes(nodesWithSyncStatus, nh.shardIDs, nh.availability) if err != nil { log.Error("cannot update nodes based on sync state", "error", err) return @@ -148,14 +148,18 @@ func (nh *nodesHolder) printSyncedNodesInShardsUnprotected() { totalNumOfActiveNodes++ inSyncAddresses[shardID] = append(inSyncAddresses[shardID], nh.lastSyncedNodes[shardID].Address) } - log.Info(fmt.Sprintf("shard %d active nodes", shardID), + nodesType := "regular active nodes" + if nh.availability == data.AvailabilityRecent { + nodesType = "snapshotless active nodes" + } + log.Info(fmt.Sprintf("shard %d %s", shardID, nodesType), "observers count", totalNumOfActiveNodes, "addresses", strings.Join(inSyncAddresses[shardID], ", "), "fallback addresses", strings.Join(inSyncFallbackAddresses[shardID], ", ")) } } -func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, error) { +func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32, availability data.ObserverDataAvailabilityType) ([]*data.NodeData, []*data.NodeData, []*data.NodeData, error) { tempSyncedNodesMap := make(map[uint32][]*data.NodeData) tempSyncedFallbackNodesMap := make(map[uint32][]*data.NodeData) tempNotSyncedNodesMap := make(map[uint32][]*data.NodeData) @@ -183,7 +187,9 @@ func computeSyncedAndOutOfSyncNodes(nodes []*data.NodeData, shardIDs []uint32) ( totalLen := len(tempSyncedNodesMap[shardID]) + len(tempSyncedFallbackNodesMap[shardID]) + len(tempNotSyncedNodesMap[shardID]) if totalLen == 0 { - return nil, nil, nil, fmt.Errorf("%w for shard %d - no synced or not synced node", errWrongConfiguration, shardID) + if availability != data.AvailabilityRecent { + return nil, nil, nil, fmt.Errorf("%w for shard %d - no synced or not synced node", errWrongConfiguration, shardID) + } } } @@ -282,6 +288,10 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected( syncedNodesMap map[uint32][]*data.NodeData, syncedFallbackNodesMap map[uint32][]*data.NodeData, ) { + minSyncedNodes := 1 + if nh.availability == data.AvailabilityRecent { + minSyncedNodes = 0 // allow the snapshotless list to be empty so regular observers can be used + } if len(outOfSyncNodes) == 0 { nh.outOfSyncNodes = make([]*data.NodeData, 0) nh.outOfSyncFallbackNodes = make([]*data.NodeData, 0) @@ -289,8 +299,8 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected( } for _, outOfSyncNode := range outOfSyncNodes { - hasOneSyncedNode := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1 - hasEnoughSyncedFallbackNodes := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) > 1 + hasOneSyncedNode := len(syncedNodesMap[outOfSyncNode.ShardId]) >= minSyncedNodes + hasEnoughSyncedFallbackNodes := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) > minSyncedNodes canDeleteFallbackNode := hasOneSyncedNode || hasEnoughSyncedFallbackNodes if outOfSyncNode.IsFallback && canDeleteFallbackNode { nh.removeNodeUnprotected(outOfSyncNode) @@ -305,7 +315,7 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected( continue } - hasEnoughSyncedNodes := len(syncedNodesMap[outOfSyncNode.ShardId]) >= 1 + hasEnoughSyncedNodes := len(syncedNodesMap[outOfSyncNode.ShardId]) >= minSyncedNodes if hasEnoughSyncedNodes { nh.removeNodeUnprotected(outOfSyncNode) continue @@ -326,7 +336,7 @@ func (nh *nodesHolder) removeOutOfSyncNodesUnprotected( "shard", outOfSyncNode.ShardId) nh.lastSyncedNodes[outOfSyncNode.ShardId] = outOfSyncNode } - hasOneSyncedFallbackNode := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) >= 1 + hasOneSyncedFallbackNode := len(syncedFallbackNodesMap[outOfSyncNode.ShardId]) >= minSyncedNodes if hasOneSyncedFallbackNode { nh.removeNodeUnprotected(outOfSyncNode) continue diff --git a/observer/holder/nodesHolder_test.go b/observer/holder/nodesHolder_test.go index 741ae0c1..0c392aa0 100644 --- a/observer/holder/nodesHolder_test.go +++ b/observer/holder/nodesHolder_test.go @@ -6,10 +6,28 @@ import ( "sort" "testing" + "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-proxy-go/data" "github.com/stretchr/testify/require" ) +func TestNodesHolder_GetLastSyncedNodes(t *testing.T) { + t.Parallel() + + syncedNodes := []*data.NodeData{{Address: "addr0", ShardId: core.MetachainShardId}, {Address: "addr1", ShardId: 0}} + fallbackNodes := []*data.NodeData{{Address: "fallback-addr0", ShardId: core.MetachainShardId}, {Address: "fallback-addr1", ShardId: 0}} + shardIds := []uint32{0, core.MetachainShardId} + + nodesHolder, err := NewNodesHolder(syncedNodes, fallbackNodes, shardIds, data.AvailabilityAll) + require.NoError(t, err) + + require.Equal(t, syncedNodes, nodesHolder.GetSyncedNodes()) + require.Equal(t, fallbackNodes, nodesHolder.GetSyncedFallbackNodes()) + require.Empty(t, nodesHolder.GetOutOfSyncFallbackNodes()) + require.Empty(t, nodesHolder.GetOutOfSyncNodes()) + require.Empty(t, nodesHolder.GetLastSyncedNodes()) +} + func TestComputeSyncAndOutOfSyncNodes(t *testing.T) { t.Parallel() @@ -18,6 +36,7 @@ func TestComputeSyncAndOutOfSyncNodes(t *testing.T) { t.Run("all nodes are out of sync", testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced) t.Run("invalid config - no node", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll) t.Run("invalid config - no node in a shard", testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard) + t.Run("snapshotless nodes should work with no node in a shard", testSnapshotlessNodesShouldWorkIfNoNodeInShardExists) t.Run("edge case - address should not exist in both sync and not-synced lists", testEdgeCaseAddressShouldNotExistInBothLists) } @@ -32,7 +51,7 @@ func testComputeSyncedAndOutOfSyncNodesAllNodesSynced(t *testing.T) { {Address: "3", ShardId: 1, IsSynced: true, IsFallback: true}, } - synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) + synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll) require.Equal(t, []*data.NodeData{ {Address: "0", ShardId: 0, IsSynced: true}, {Address: "2", ShardId: 1, IsSynced: true}, @@ -57,7 +76,7 @@ func testComputeSyncedAndOutOfSyncNodesEnoughSyncedObservers(t *testing.T) { {Address: "5", ShardId: 1, IsSynced: true, IsFallback: true}, } - synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) + synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll) require.Equal(t, []*data.NodeData{ {Address: "0", ShardId: 0, IsSynced: true}, {Address: "3", ShardId: 1, IsSynced: true}, @@ -83,7 +102,7 @@ func testComputeSyncedAndOutOfSyncNodesAllNodesNotSynced(t *testing.T) { {Address: "3", ShardId: 1, IsSynced: false, IsFallback: true}, } - synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs) + synced, syncedFb, notSynced, _ := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll) require.Equal(t, []*data.NodeData{}, synced) require.Equal(t, []*data.NodeData{}, syncedFb) require.Equal(t, input, notSynced) @@ -143,7 +162,7 @@ func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll(t *testin shardIDs := []uint32{0, 1} var input []*data.NodeData - synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs) + synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll) require.Error(t, err) require.Nil(t, synced) require.Nil(t, syncedFb) @@ -156,7 +175,7 @@ func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeAtAll(t *testin Address: "0", ShardId: 0, IsSynced: true, }, } - synced, syncedFb, notSynced, err = computeSyncedAndOutOfSyncNodes(input, shardIDs) + synced, syncedFb, notSynced, err = computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll) require.True(t, errors.Is(err, errWrongConfiguration)) require.Nil(t, synced) require.Nil(t, syncedFb) @@ -173,13 +192,29 @@ func testComputeSyncedAndOutOfSyncNodesInvalidConfigurationNoNodeInAShard(t *tes Address: "0", ShardId: 0, IsSynced: true, }, } - synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs) + synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityAll) require.True(t, errors.Is(err, errWrongConfiguration)) require.Nil(t, synced) require.Nil(t, syncedFb) require.Nil(t, notSynced) } +func testSnapshotlessNodesShouldWorkIfNoNodeInShardExists(t *testing.T) { + t.Parallel() + + shardIDs := []uint32{0, core.MetachainShardId} + input := []*data.NodeData{ + { + Address: "m", ShardId: core.MetachainShardId, IsSynced: true, IsSnapshotless: true, + }, + } + synced, syncedFb, notSynced, err := computeSyncedAndOutOfSyncNodes(input, shardIDs, data.AvailabilityRecent) + require.NoError(t, err) + require.Empty(t, notSynced) + require.Empty(t, syncedFb) + require.Equal(t, input, synced) +} + func slicesHaveCommonObjects(firstSlice []*data.NodeData, secondSlice []*data.NodeData) bool { nodeDataToStr := func(nd *data.NodeData) string { return fmt.Sprintf("%s%d", nd.Address, nd.ShardId) diff --git a/process/baseProcessor.go b/process/baseProcessor.go index 80fde998..332b3393 100644 --- a/process/baseProcessor.go +++ b/process/baseProcessor.go @@ -6,7 +6,7 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" + "io" "net" "net/http" "strconv" @@ -202,12 +202,11 @@ func (bp *BaseProcessor) CallGetRestEndPoint( resp, err := bp.httpClient.Do(req) if err != nil { + bp.triggerNodesSyncCheck(address) if isTimeoutError(err) { - bp.triggerNodesSyncCheck(address) return http.StatusRequestTimeout, err } - bp.triggerNodesSyncCheck(address) return http.StatusNotFound, err } @@ -218,7 +217,7 @@ func (bp *BaseProcessor) CallGetRestEndPoint( } }() - responseBodyBytes, err := ioutil.ReadAll(resp.Body) + responseBodyBytes, err := io.ReadAll(resp.Body) if err != nil { return http.StatusInternalServerError, err } @@ -262,12 +261,11 @@ func (bp *BaseProcessor) CallPostRestEndPoint( resp, err := bp.httpClient.Do(req) if err != nil { + bp.triggerNodesSyncCheck(address) if isTimeoutError(err) { - bp.triggerNodesSyncCheck(address) return http.StatusRequestTimeout, err } - bp.triggerNodesSyncCheck(address) return http.StatusNotFound, err } @@ -278,7 +276,7 @@ func (bp *BaseProcessor) CallPostRestEndPoint( } }() - responseBodyBytes, err := ioutil.ReadAll(resp.Body) + responseBodyBytes, err := io.ReadAll(resp.Body) if err != nil { return http.StatusInternalServerError, err } @@ -421,6 +419,7 @@ func (bp *BaseProcessor) isNodeSynced(node *proxyData.NodeData) (bool, error) { "probable highest nonce", probableHighestNonce, "is synced", isNodeSynced, "is ready for VM Queries", isReadyForVMQueries, + "is snapshotless", node.IsSnapshotless, "is fallback", node.IsFallback) if !isReadyForVMQueries { @@ -454,7 +453,7 @@ func (bp *BaseProcessor) getNodeStatusResponseFromAPI(url string) (*proxyData.No return nil, resp.StatusCode, nil } - responseBodyBytes, err := ioutil.ReadAll(resp.Body) + responseBodyBytes, err := io.ReadAll(resp.Body) if err != nil { return nil, http.StatusInternalServerError, err }