Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rc/spica patch relayedv3 #474

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ VERSION:
Name: "start-swagger-ui",
Usage: "If set to true, will start a Swagger UI on the root",
}
// noStatusCheck defines a flag that specifies if the status checks for the observers should be skipped
noStatusCheck = cli.BoolFlag{
Name: "no-status-check",
Usage: "If set to true, will skip the status check for observers, treating them as always synced. ⚠️ This relies on proper " +
"observers management on the provider side.",
}

testServer *testing.TestHttpServer
)
Expand All @@ -184,6 +190,7 @@ func main() {
workingDirectory,
memBallast,
startSwaggerUI,
noStatusCheck,
}
app.Authors = []cli.Author{
{
Expand Down Expand Up @@ -273,7 +280,8 @@ func startProxy(ctx *cli.Context) error {
statusMetricsProvider := metrics.NewStatusMetrics()

shouldStartSwaggerUI := ctx.GlobalBool(startSwaggerUI.Name)
versionsRegistry, err := createVersionsRegistryTestOrProduction(ctx, generalConfig, configurationFileName, statusMetricsProvider, closableComponents)
skipStatusCheck := ctx.GlobalBool(noStatusCheck.Name)
versionsRegistry, err := createVersionsRegistryTestOrProduction(ctx, generalConfig, configurationFileName, statusMetricsProvider, closableComponents, skipStatusCheck)
if err != nil {
return err
}
Expand Down Expand Up @@ -309,6 +317,7 @@ func createVersionsRegistryTestOrProduction(
configurationFilePath string,
statusMetricsHandler data.StatusMetricsProvider,
closableComponents *data.ClosableComponentsHandler,
skipStatusCheck bool,
) (data.VersionsRegistryHandler, error) {

var testHTTPServerEnabled bool
Expand Down Expand Up @@ -373,6 +382,7 @@ func createVersionsRegistryTestOrProduction(
ctx.GlobalString(walletKeyPemFile.Name),
ctx.GlobalString(apiConfigDirectory.Name),
closableComponents,
skipStatusCheck,
)
}

Expand All @@ -383,6 +393,7 @@ func createVersionsRegistryTestOrProduction(
ctx.GlobalString(walletKeyPemFile.Name),
ctx.GlobalString(apiConfigDirectory.Name),
closableComponents,
skipStatusCheck,
)
}

Expand All @@ -393,6 +404,7 @@ func createVersionsRegistry(
pemFileLocation string,
apiConfigDirectoryPath string,
closableComponents *data.ClosableComponentsHandler,
skipStatusCheck bool,
) (data.VersionsRegistryHandler, error) {
pubKeyConverter, err := pubkeyConverter.NewBech32PubkeyConverter(cfg.AddressPubkeyConverter.Length, addressHRP)
if err != nil {
Expand Down Expand Up @@ -441,6 +453,7 @@ func createVersionsRegistry(
observersProvider,
fullHistoryNodesProvider,
pubKeyConverter,
skipStatusCheck,
)
if err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions observer/baseNodeProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func (bnp *baseNodeProvider) UpdateNodesBasedOnSyncState(nodesWithSyncStatus []*
bnp.snapshotlessNodes.UpdateNodes(snapshotlessNodes)
}

// PrintNodesInShards will only print the nodes in shards
func (bnp *baseNodeProvider) PrintNodesInShards() {
bnp.mutNodes.RLock()
defer bnp.mutNodes.RUnlock()

bnp.regularNodes.PrintNodesInShards()
bnp.snapshotlessNodes.PrintNodesInShards()
}

func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []*data.NodeData) {
regularNodes := make([]*data.NodeData, 0)
snapshotlessNodes := make([]*data.NodeData, 0)
Expand Down
4 changes: 4 additions & 0 deletions observer/disabledNodesProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (d *disabledNodesProvider) ReloadNodes(_ data.NodeType) data.NodesReloadRes
return data.NodesReloadResponse{Description: "disabled nodes provider", Error: d.returnMessage}
}

// PrintNodesInShards does nothing as it is disabled
func (d *disabledNodesProvider) PrintNodesInShards() {
}

// IsInterfaceNil returns true if there is no value under the interface
func (d *disabledNodesProvider) IsInterfaceNil() bool {
return d == nil
Expand Down
8 changes: 8 additions & 0 deletions observer/holder/nodesHolder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ func (nh *nodesHolder) UpdateNodes(nodesWithSyncStatus []*data.NodeData) {
nh.printNodesInShardsUnprotected()
}

// PrintNodesInShards will only print the nodes in shards
func (nh *nodesHolder) PrintNodesInShards() {
nh.mut.RLock()
defer nh.mut.RUnlock()

nh.printNodesInShardsUnprotected()
}

// GetSyncedNodes returns all the synced nodes
func (nh *nodesHolder) GetSyncedNodes(shardID uint32) []*data.NodeData {
return nh.getObservers(syncedNodesCache, shardID)
Expand Down
2 changes: 2 additions & 0 deletions observer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ type NodesProviderHandler interface {
UpdateNodesBasedOnSyncState(nodesWithSyncStatus []*data.NodeData)
GetAllNodesWithSyncState() []*data.NodeData
ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse
PrintNodesInShards()
IsInterfaceNil() bool
}

// NodesHolder defines the actions of a component that is able to hold nodes
type NodesHolder interface {
UpdateNodes(nodesWithSyncStatus []*data.NodeData)
PrintNodesInShards()
GetSyncedNodes(shardID uint32) []*data.NodeData
GetSyncedFallbackNodes(shardID uint32) []*data.NodeData
GetOutOfSyncNodes(shardID uint32) []*data.NodeData
Expand Down
23 changes: 21 additions & 2 deletions process/baseProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type BaseProcessor struct {
chanTriggerNodesState chan struct{}
delayForCheckingNodesSyncState time.Duration
cancelFunc func()
noStatusCheck bool

httpClient *http.Client
}
Expand All @@ -53,6 +54,7 @@ func NewBaseProcessor(
observersProvider observer.NodesProviderHandler,
fullHistoryNodesProvider observer.NodesProviderHandler,
pubKeyConverter core.PubkeyConverter,
noStatusCheck bool,
) (*BaseProcessor, error) {
if check.IfNil(shardCoord) {
return nil, ErrNilShardCoordinator
Expand Down Expand Up @@ -84,9 +86,14 @@ func NewBaseProcessor(
shardIDs: computeShardIDs(shardCoord),
delayForCheckingNodesSyncState: stepDelayForCheckingNodesSyncState,
chanTriggerNodesState: make(chan struct{}),
noStatusCheck: noStatusCheck,
}
bp.nodeStatusFetcher = bp.getNodeStatusResponseFromAPI

if noStatusCheck {
log.Info("Proxy started with no status check! The provided observers will always be considered synced!")
}

return bp, nil
}

Expand Down Expand Up @@ -347,7 +354,7 @@ func (bp *BaseProcessor) handleOutOfSyncNodes(ctx context.Context) {
timer := time.NewTimer(bp.delayForCheckingNodesSyncState)
defer timer.Stop()

bp.updateNodesWithSync()
bp.handleNodes()
for {
timer.Reset(bp.delayForCheckingNodesSyncState)

Expand All @@ -359,10 +366,22 @@ func (bp *BaseProcessor) handleOutOfSyncNodes(ctx context.Context) {
return
}

bp.updateNodesWithSync()
bp.handleNodes()
}
}

func (bp *BaseProcessor) handleNodes() {
// if proxy is started with no-status-check flag, only print the observers.
// they are already initialized by default as synced.
if bp.noStatusCheck {
bp.observersProvider.PrintNodesInShards()
bp.fullHistoryNodesProvider.PrintNodesInShards()
return
}

bp.updateNodesWithSync()
}

func (bp *BaseProcessor) updateNodesWithSync() {
observers := bp.observersProvider.GetAllNodesWithSyncState()
observersWithSyncStatus := bp.getNodesWithSyncStatus(observers)
Expand Down
Loading
Loading