diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 05295c1f..4acd6083 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -160,6 +160,12 @@ VERSION: Name: "start-swagger-ui", Usage: "If set to true, will start a Swagger UI on the root", } + // noStatusCheck defines a flag that specifies if the status checks for the observers should be skipped + noStatusCheck = cli.BoolFlag{ + Name: "no-status-check", + Usage: "If set to true, will skip the status check for observers, treating them as always synced. ⚠️ This relies on proper " + + "observers management on the provider side.", + } testServer *testing.TestHttpServer ) @@ -184,6 +190,7 @@ func main() { workingDirectory, memBallast, startSwaggerUI, + noStatusCheck, } app.Authors = []cli.Author{ { @@ -273,7 +280,8 @@ func startProxy(ctx *cli.Context) error { statusMetricsProvider := metrics.NewStatusMetrics() shouldStartSwaggerUI := ctx.GlobalBool(startSwaggerUI.Name) - versionsRegistry, err := createVersionsRegistryTestOrProduction(ctx, generalConfig, configurationFileName, statusMetricsProvider, closableComponents) + skipStatusCheck := ctx.GlobalBool(noStatusCheck.Name) + versionsRegistry, err := createVersionsRegistryTestOrProduction(ctx, generalConfig, configurationFileName, statusMetricsProvider, closableComponents, skipStatusCheck) if err != nil { return err } @@ -309,6 +317,7 @@ func createVersionsRegistryTestOrProduction( configurationFilePath string, statusMetricsHandler data.StatusMetricsProvider, closableComponents *data.ClosableComponentsHandler, + skipStatusCheck bool, ) (data.VersionsRegistryHandler, error) { var testHTTPServerEnabled bool @@ -373,6 +382,7 @@ func createVersionsRegistryTestOrProduction( ctx.GlobalString(walletKeyPemFile.Name), ctx.GlobalString(apiConfigDirectory.Name), closableComponents, + skipStatusCheck, ) } @@ -383,6 +393,7 @@ func createVersionsRegistryTestOrProduction( ctx.GlobalString(walletKeyPemFile.Name), ctx.GlobalString(apiConfigDirectory.Name), closableComponents, + skipStatusCheck, ) } @@ -393,6 +404,7 @@ func createVersionsRegistry( pemFileLocation string, apiConfigDirectoryPath string, closableComponents *data.ClosableComponentsHandler, + skipStatusCheck bool, ) (data.VersionsRegistryHandler, error) { pubKeyConverter, err := pubkeyConverter.NewBech32PubkeyConverter(cfg.AddressPubkeyConverter.Length, addressHRP) if err != nil { @@ -441,6 +453,7 @@ func createVersionsRegistry( observersProvider, fullHistoryNodesProvider, pubKeyConverter, + skipStatusCheck, ) if err != nil { return nil, err diff --git a/observer/baseNodeProvider.go b/observer/baseNodeProvider.go index 49b964ec..22fba4fe 100644 --- a/observer/baseNodeProvider.go +++ b/observer/baseNodeProvider.go @@ -114,6 +114,15 @@ func (bnp *baseNodeProvider) UpdateNodesBasedOnSyncState(nodesWithSyncStatus []* bnp.snapshotlessNodes.UpdateNodes(snapshotlessNodes) } +// PrintNodesInShards will only print the nodes in shards +func (bnp *baseNodeProvider) PrintNodesInShards() { + bnp.mutNodes.RLock() + defer bnp.mutNodes.RUnlock() + + bnp.regularNodes.PrintNodesInShards() + bnp.snapshotlessNodes.PrintNodesInShards() +} + func splitNodesByDataAvailability(nodes []*data.NodeData) ([]*data.NodeData, []*data.NodeData) { regularNodes := make([]*data.NodeData, 0) snapshotlessNodes := make([]*data.NodeData, 0) diff --git a/observer/disabledNodesProvider.go b/observer/disabledNodesProvider.go index 5256fe07..fde4c107 100644 --- a/observer/disabledNodesProvider.go +++ b/observer/disabledNodesProvider.go @@ -44,6 +44,10 @@ func (d *disabledNodesProvider) ReloadNodes(_ data.NodeType) data.NodesReloadRes return data.NodesReloadResponse{Description: "disabled nodes provider", Error: d.returnMessage} } +// PrintNodesInShards does nothing as it is disabled +func (d *disabledNodesProvider) PrintNodesInShards() { +} + // IsInterfaceNil returns true if there is no value under the interface func (d *disabledNodesProvider) IsInterfaceNil() bool { return d == nil diff --git a/observer/holder/nodesHolder.go b/observer/holder/nodesHolder.go index 41d418b8..5742a335 100644 --- a/observer/holder/nodesHolder.go +++ b/observer/holder/nodesHolder.go @@ -67,6 +67,14 @@ func (nh *nodesHolder) UpdateNodes(nodesWithSyncStatus []*data.NodeData) { nh.printNodesInShardsUnprotected() } +// PrintNodesInShards will only print the nodes in shards +func (nh *nodesHolder) PrintNodesInShards() { + nh.mut.RLock() + defer nh.mut.RUnlock() + + nh.printNodesInShardsUnprotected() +} + // GetSyncedNodes returns all the synced nodes func (nh *nodesHolder) GetSyncedNodes(shardID uint32) []*data.NodeData { return nh.getObservers(syncedNodesCache, shardID) diff --git a/observer/interface.go b/observer/interface.go index d41cb70c..6821f6bb 100644 --- a/observer/interface.go +++ b/observer/interface.go @@ -9,12 +9,14 @@ type NodesProviderHandler interface { UpdateNodesBasedOnSyncState(nodesWithSyncStatus []*data.NodeData) GetAllNodesWithSyncState() []*data.NodeData ReloadNodes(nodesType data.NodeType) data.NodesReloadResponse + PrintNodesInShards() IsInterfaceNil() bool } // NodesHolder defines the actions of a component that is able to hold nodes type NodesHolder interface { UpdateNodes(nodesWithSyncStatus []*data.NodeData) + PrintNodesInShards() GetSyncedNodes(shardID uint32) []*data.NodeData GetSyncedFallbackNodes(shardID uint32) []*data.NodeData GetOutOfSyncNodes(shardID uint32) []*data.NodeData diff --git a/process/baseProcessor.go b/process/baseProcessor.go index 1124c720..f3573778 100644 --- a/process/baseProcessor.go +++ b/process/baseProcessor.go @@ -42,6 +42,7 @@ type BaseProcessor struct { chanTriggerNodesState chan struct{} delayForCheckingNodesSyncState time.Duration cancelFunc func() + noStatusCheck bool httpClient *http.Client } @@ -53,6 +54,7 @@ func NewBaseProcessor( observersProvider observer.NodesProviderHandler, fullHistoryNodesProvider observer.NodesProviderHandler, pubKeyConverter core.PubkeyConverter, + noStatusCheck bool, ) (*BaseProcessor, error) { if check.IfNil(shardCoord) { return nil, ErrNilShardCoordinator @@ -84,9 +86,14 @@ func NewBaseProcessor( shardIDs: computeShardIDs(shardCoord), delayForCheckingNodesSyncState: stepDelayForCheckingNodesSyncState, chanTriggerNodesState: make(chan struct{}), + noStatusCheck: noStatusCheck, } bp.nodeStatusFetcher = bp.getNodeStatusResponseFromAPI + if noStatusCheck { + log.Info("Proxy started with no status check! The provided observers will always be considered synced!") + } + return bp, nil } @@ -347,7 +354,7 @@ func (bp *BaseProcessor) handleOutOfSyncNodes(ctx context.Context) { timer := time.NewTimer(bp.delayForCheckingNodesSyncState) defer timer.Stop() - bp.updateNodesWithSync() + bp.handleNodes() for { timer.Reset(bp.delayForCheckingNodesSyncState) @@ -359,10 +366,22 @@ func (bp *BaseProcessor) handleOutOfSyncNodes(ctx context.Context) { return } - bp.updateNodesWithSync() + bp.handleNodes() } } +func (bp *BaseProcessor) handleNodes() { + // if proxy is started with no-status-check flag, only print the observers. + // they are already initialized by default as synced. + if bp.noStatusCheck { + bp.observersProvider.PrintNodesInShards() + bp.fullHistoryNodesProvider.PrintNodesInShards() + return + } + + bp.updateNodesWithSync() +} + func (bp *BaseProcessor) updateNodesWithSync() { observers := bp.observersProvider.GetAllNodesWithSyncState() observersWithSyncStatus := bp.getNodesWithSyncStatus(observers) diff --git a/process/baseProcessor_test.go b/process/baseProcessor_test.go index 1ca4bc9f..4708652e 100644 --- a/process/baseProcessor_test.go +++ b/process/baseProcessor_test.go @@ -53,6 +53,7 @@ func TestNewBaseProcessor_WithInvalidRequestTimeoutShouldErr(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) assert.Nil(t, bp) @@ -68,6 +69,7 @@ func TestNewBaseProcessor_WithNilShardCoordinatorShouldErr(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) assert.Nil(t, bp) @@ -83,6 +85,7 @@ func TestNewBaseProcessor_WithNilObserversProviderShouldErr(t *testing.T) { &mock.ObserversProviderStub{}, nil, &mock.PubKeyConverterMock{}, + false, ) assert.Nil(t, bp) @@ -98,6 +101,7 @@ func TestNewBaseProcessor_WithNilFullHistoryNodesProviderShouldErr(t *testing.T) nil, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) assert.Nil(t, bp) @@ -113,6 +117,7 @@ func TestNewBaseProcessor_WithOkValuesShouldWork(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) assert.NotNil(t, bp) @@ -135,6 +140,7 @@ func TestBaseProcessor_GetObserversEmptyListShouldWork(t *testing.T) { }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) observers, err := bp.GetObservers(0, data.AvailabilityAll) @@ -169,6 +175,7 @@ func TestBaseProcessor_ComputeShardId(t *testing.T) { }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) //there are 2 shards, compute ID should correctly process @@ -203,6 +210,7 @@ func TestBaseProcessor_CallGetRestEndPoint(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) _, err := bp.CallGetRestEndPoint(server.URL, "/some/path", tsRecovered) @@ -231,6 +239,7 @@ func TestBaseProcessor_CallGetRestEndPointShouldTimeout(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) _, err := bp.CallGetRestEndPoint(testServer.URL, "/some/path", tsRecovered) @@ -255,6 +264,7 @@ func TestBaseProcessor_CallPostRestEndPoint(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) rc, err := bp.CallPostRestEndPoint(server.URL, "/some/path", ts, tsRecv) @@ -285,6 +295,7 @@ func TestBaseProcessor_CallPostRestEndPointShouldTimeout(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) rc, err := bp.CallPostRestEndPoint(testServer.URL, "/some/path", ts, tsRecv) @@ -325,6 +336,7 @@ func TestBaseProcessor_GetAllObserversWithOkValuesShouldPass(t *testing.T) { }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) assert.Nil(t, err) @@ -375,6 +387,7 @@ func TestBaseProcessor_GetObserversOnePerShardShouldWork(t *testing.T) { }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) observers, err := bp.GetObserversOnePerShard(data.AvailabilityAll) @@ -423,6 +436,7 @@ func TestBaseProcessor_GetObserversOnePerShardOneShardHasNoObserverShouldWork(t }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) observers, err := bp.GetObserversOnePerShard(data.AvailabilityAll) @@ -471,6 +485,7 @@ func TestBaseProcessor_GetObserversOnePerShardMetachainHasNoObserverShouldWork(t }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) observers, err := bp.GetObserversOnePerShard(data.AvailabilityAll) @@ -523,6 +538,7 @@ func TestBaseProcessor_GetFullHistoryNodesOnePerShardShouldWork(t *testing.T) { }, }, &mock.PubKeyConverterMock{}, + false, ) observers, err := bp.GetFullHistoryNodesOnePerShard(data.AvailabilityAll) @@ -543,6 +559,7 @@ func TestBaseProcessor_GetShardIDs(t *testing.T) { &mock.ObserversProviderStub{}, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) expected := []uint32{0, 1, 2, core.MetachainShardId} @@ -570,6 +587,7 @@ func TestBaseProcessor_HandleNodesSyncStateShouldSetNodeOutOfSyncIfVMQueriesNotR }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) bp.SetNodeStatusFetcher(func(url string) (*data.NodeStatusAPIResponse, int, error) { @@ -630,6 +648,7 @@ func TestBaseProcessor_HandleNodesSyncStateShouldTreatObserverThatWasDown(t *tes }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) bp.SetNodeStatusFetcher(func(url string) (*data.NodeStatusAPIResponse, int, error) { @@ -691,6 +710,7 @@ func TestBaseProcessor_HandleNodesSyncStateShouldBeTriggeredEarlierIfANodeIsOffl }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) bp.SetNodeStatusFetcher(func(url string) (*data.NodeStatusAPIResponse, int, error) { @@ -739,6 +759,7 @@ func TestBaseProcessor_HandleNodesSyncStateShouldConsiderNodeAsOnlineIfProbableN }, &mock.ObserversProviderStub{}, &mock.PubKeyConverterMock{}, + false, ) bp.SetNodeStatusFetcher(func(url string) (*data.NodeStatusAPIResponse, int, error) { @@ -798,6 +819,7 @@ func TestBaseProcessor_HandleNodesSyncState(t *testing.T) { }, }, &mock.PubKeyConverterMock{}, + false, ) bp.SetNodeStatusFetcher(func(url string) (*data.NodeStatusAPIResponse, int, error) { @@ -827,6 +849,45 @@ func TestBaseProcessor_HandleNodesSyncState(t *testing.T) { time.Sleep(50 * time.Millisecond) } +func TestBaseProcessor_NoStatusCheck(t *testing.T) { + + numPrintNodesInShardsCalled := uint32(0) + bp, _ := process.NewBaseProcessor( + 5, + &mock.ShardCoordinatorMock{}, + &mock.ObserversProviderStub{ + GetAllNodesWithSyncStateCalled: func() []*data.NodeData { + require.Fail(t, "should have not been called") + return nil + }, + UpdateNodesBasedOnSyncStateCalled: func(nodesWithSyncStatus []*data.NodeData) { + require.Fail(t, "should have not been called") + }, + PrintNodesInShardsCalled: func() { + atomic.AddUint32(&numPrintNodesInShardsCalled, 1) + }, + }, + &mock.ObserversProviderStub{}, + &mock.PubKeyConverterMock{}, + true, + ) + + bp.SetNodeStatusFetcher(func(url string) (*data.NodeStatusAPIResponse, int, error) { + require.Fail(t, "should have not been called") + + return nil, 400, nil + }) + bp.SetDelayForCheckingNodesSyncState(5 * time.Millisecond) + bp.StartNodesSyncStateChecks() + + time.Sleep(50 * time.Millisecond) + + require.GreaterOrEqual(t, atomic.LoadUint32(&numPrintNodesInShardsCalled), uint32(1)) + + _ = bp.Close() + time.Sleep(50 * time.Millisecond) +} + func getResponseForNodeStatus(synced bool, vmQueriesReadyStr string) *data.NodeStatusAPIResponse { nonce, probableHighestNonce := uint64(10), uint64(11) if !synced { diff --git a/process/mock/observersProviderStub.go b/process/mock/observersProviderStub.go index acaa612d..2f9e5b9d 100644 --- a/process/mock/observersProviderStub.go +++ b/process/mock/observersProviderStub.go @@ -11,6 +11,7 @@ type ObserversProviderStub struct { ReloadNodesCalled func(nodesType data.NodeType) data.NodesReloadResponse UpdateNodesBasedOnSyncStateCalled func(nodesWithSyncStatus []*data.NodeData) GetAllNodesWithSyncStateCalled func() []*data.NodeData + PrintNodesInShardsCalled func() } // GetNodesByShardId - @@ -66,6 +67,13 @@ func (ops *ObserversProviderStub) ReloadNodes(nodesType data.NodeType) data.Node return data.NodesReloadResponse{} } +// PrintNodesInShards - +func (ops *ObserversProviderStub) PrintNodesInShards() { + if ops.PrintNodesInShardsCalled != nil { + ops.PrintNodesInShardsCalled() + } +} + // IsInterfaceNil - func (ops *ObserversProviderStub) IsInterfaceNil() bool { return ops == nil