Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdan-rosianu committed Sep 21, 2023
1 parent 29ad567 commit a713a29
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 1,337 deletions.
120 changes: 57 additions & 63 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func (bnp *baseNodeProvider) initNodes(nodes []*data.NodeData) error {

bnp.shardIds = getSortedShardIDsSlice(newNodes)
syncedNodes, syncedFallbackNodes, syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes := initAllNodesSlice(newNodes)
bnp.regularNodes, err = holder.NewNodesHolder(syncedNodes, syncedFallbackNodes, bnp.shardIds, data.AvailabilityAll)
bnp.regularNodes, err = holder.NewNodesHolder(syncedNodes, syncedFallbackNodes, data.AvailabilityAll)
if err != nil {
return err
}
bnp.snapshotlessNodes, err = holder.NewNodesHolder(syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes, bnp.shardIds, data.AvailabilityRecent)
bnp.snapshotlessNodes, err = holder.NewNodesHolder(syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes, data.AvailabilityRecent)
if err != nil {
return err
}
Expand Down Expand Up @@ -75,23 +75,22 @@ func (bnp *baseNodeProvider) GetAllNodesWithSyncState() []*data.NodeData {
defer bnp.mutNodes.RUnlock()

nodesSlice := make([]*data.NodeData, 0)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetSyncedNodes()...)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetOutOfSyncNodes()...)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetSyncedFallbackNodes()...)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetOutOfSyncFallbackNodes()...)

nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetSyncedNodes()...)
nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetOutOfSyncNodes()...)
nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetSyncedFallbackNodes()...)
nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetOutOfSyncFallbackNodes()...)
for _, shardID := range bnp.shardIds {
nodesSlice = append(nodesSlice, bnp.regularNodes.GetSyncedNodes(shardID)...)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetOutOfSyncNodes(shardID)...)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetSyncedFallbackNodes(shardID)...)
nodesSlice = append(nodesSlice, bnp.regularNodes.GetOutOfSyncFallbackNodes(shardID)...)

nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetSyncedNodes(shardID)...)
nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetOutOfSyncNodes(shardID)...)
nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetSyncedFallbackNodes(shardID)...)
nodesSlice = append(nodesSlice, bnp.snapshotlessNodes.GetOutOfSyncFallbackNodes(shardID)...)
}

return nodesSlice
}

// UpdateNodesBasedOnSyncState will handle the nodes lists, by removing out of sync observers or by adding back observers
// that were previously removed because they were out of sync.
// If all observers are removed, the last one synced will be saved and the fallbacks will be used.
// If even the fallbacks are out of sync, the last regular observer synced will be used, even though it is out of sync.
// When one or more regular observers are back in sync, the fallbacks will not be used anymore.
// UpdateNodesBasedOnSyncState will simply call the corresponding function for both regular and snapshotless observers
func (bnp *baseNodeProvider) UpdateNodesBasedOnSyncState(nodesWithSyncStatus []*data.NodeData) {
bnp.mutNodes.Lock()
defer bnp.mutNodes.Unlock()
Expand Down Expand Up @@ -150,7 +149,7 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
defer bnp.mutNodes.Unlock()
bnp.shardIds = getSortedShardIDsSlice(newNodes)
syncedNodes, syncedFallbackNodes, syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes := initAllNodesSlice(newNodes)
bnp.regularNodes, err = holder.NewNodesHolder(syncedNodes, syncedFallbackNodes, bnp.shardIds, data.AvailabilityAll)
bnp.regularNodes, err = holder.NewNodesHolder(syncedNodes, syncedFallbackNodes, data.AvailabilityAll)
if err != nil {
log.Error("cannot reload regular nodes: NewNodesHolder", "error", err)
return data.NodesReloadResponse{
Expand All @@ -159,7 +158,8 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
Error: "cannot create the regular nodes holder: " + err.Error(),
}
}
bnp.snapshotlessNodes, err = holder.NewNodesHolder(syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes, bnp.shardIds, data.AvailabilityRecent)

bnp.snapshotlessNodes, err = holder.NewNodesHolder(syncedSnapshotlessNodes, syncedSnapshotlessFallbackNodes, data.AvailabilityRecent)
if err != nil {
log.Error("cannot reload snapshotless nodes: NewNodesHolder", "error", err)
return data.NodesReloadResponse{
Expand All @@ -176,67 +176,61 @@ func (bnp *baseNodeProvider) ReloadNodes(nodesType data.NodeType) data.NodesRelo
}
}

func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardId uint32, dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) {
var syncedNodesSource []*data.NodeData
if dataAvailability == data.AvailabilityRecent {
syncedNodesSource = bnp.snapshotlessNodes.GetSyncedNodes()
} else {
syncedNodesSource = bnp.regularNodes.GetSyncedNodes()
}
syncedNodes := filterNodesInShard(syncedNodesSource, shardId)
func (bnp *baseNodeProvider) getSyncedNodesForShardUnprotected(shardID uint32, dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) {
var syncedNodes []*data.NodeData

syncedNodes = bnp.getSyncedNodes(dataAvailability, shardID)
if len(syncedNodes) != 0 {
return syncedNodes, nil
}

if dataAvailability == data.AvailabilityRecent {
regularNodes := filterNodesInShard(bnp.regularNodes.GetSyncedNodes(), shardId)
if len(regularNodes) > 0 {
return regularNodes, nil
}
}

var fallbackNodesSource []*data.NodeData
if dataAvailability == data.AvailabilityRecent {
fallbackNodesSource = bnp.snapshotlessNodes.GetSyncedFallbackNodes()
} else {
fallbackNodesSource = bnp.regularNodes.GetSyncedFallbackNodes()
fallbackNodesSource := bnp.getFallbackNodes(dataAvailability, shardID)
if len(fallbackNodesSource) != 0 {
return fallbackNodesSource, nil
}

if dataAvailability == data.AvailabilityRecent {
regularNodes := filterNodesInShard(bnp.regularNodes.GetSyncedNodes(), shardId)
if len(regularNodes) > 0 {
return regularNodes, nil
}
outOfSyncNodes := bnp.getOutOfSyncNodes(dataAvailability, shardID)
if len(outOfSyncNodes) > 0 {
return outOfSyncNodes, nil
}

syncedNodes = filterNodesInShard(fallbackNodesSource, shardId)
if len(syncedNodes) != 0 {
return syncedNodes, nil
}

var lastSyncedNodesMap map[uint32]*data.NodeData
if dataAvailability == data.AvailabilityAll {
lastSyncedNodesMap = bnp.regularNodes.GetLastSyncedNodes()
} else {
lastSyncedNodesMap = bnp.snapshotlessNodes.GetLastSyncedNodes()
}
backupNode, hasBackup := lastSyncedNodesMap[shardId]
if hasBackup {
return []*data.NodeData{backupNode}, nil
outOfSyncFallbackNodesSource := bnp.getOutOfSyncFallbackNodes(dataAvailability, shardID)
if len(outOfSyncFallbackNodesSource) != 0 {
return outOfSyncFallbackNodesSource, nil
}

return nil, ErrShardNotAvailable
}

func filterNodesInShard(nodes []*data.NodeData, shardID uint32) []*data.NodeData {
filteredSlice := make([]*data.NodeData, 0)
for _, node := range nodes {
if node.ShardId == shardID {
filteredSlice = append(filteredSlice, node)
func (bnp *baseNodeProvider) getNodesByType(
availabilityType data.ObserverDataAvailabilityType,
shardID uint32,
getSnapshotlessNodesFunc func(uint32) []*data.NodeData,
getRegularNodesFunc func(uint32) []*data.NodeData) []*data.NodeData {

if availabilityType == data.AvailabilityRecent {
nodes := getSnapshotlessNodesFunc(shardID)
if len(nodes) > 0 {
return nodes
}
}
return getRegularNodesFunc(shardID)
}

func (bnp *baseNodeProvider) getSyncedNodes(availabilityType data.ObserverDataAvailabilityType, shardID uint32) []*data.NodeData {
return bnp.getNodesByType(availabilityType, shardID, bnp.snapshotlessNodes.GetSyncedNodes, bnp.regularNodes.GetSyncedNodes)
}

func (bnp *baseNodeProvider) getFallbackNodes(availabilityType data.ObserverDataAvailabilityType, shardID uint32) []*data.NodeData {
return bnp.getNodesByType(availabilityType, shardID, bnp.snapshotlessNodes.GetSyncedFallbackNodes, bnp.regularNodes.GetSyncedFallbackNodes)
}

func (bnp *baseNodeProvider) getOutOfSyncNodes(availabilityType data.ObserverDataAvailabilityType, shardID uint32) []*data.NodeData {
return bnp.getNodesByType(availabilityType, shardID, bnp.snapshotlessNodes.GetOutOfSyncNodes, bnp.regularNodes.GetOutOfSyncNodes)
}

return filteredSlice
func (bnp *baseNodeProvider) getOutOfSyncFallbackNodes(availabilityType data.ObserverDataAvailabilityType, shardID uint32) []*data.NodeData {
return bnp.getNodesByType(availabilityType, shardID, bnp.snapshotlessNodes.GetOutOfSyncFallbackNodes, bnp.regularNodes.GetOutOfSyncFallbackNodes)
}

func (bnp *baseNodeProvider) getSyncedNodesUnprotected(dataAvailability data.ObserverDataAvailabilityType) ([]*data.NodeData, error) {
Expand Down
Loading

0 comments on commit a713a29

Please sign in to comment.