From da878ca1d41a449dc648a848afa66c28bc9a6090 Mon Sep 17 00:00:00 2001 From: David Terpay Date: Mon, 18 Mar 2024 18:31:24 -0400 Subject: [PATCH] init --- oracle/orchestrator/lifecycle.go | 71 ++++++++++------------------ oracle/orchestrator/market_mapper.go | 9 ++-- oracle/orchestrator/orchestrator.go | 4 +- oracle/orchestrator/update.go | 15 +++++- providers/base/provider.go | 4 ++ 5 files changed, 50 insertions(+), 53 deletions(-) diff --git a/oracle/orchestrator/lifecycle.go b/oracle/orchestrator/lifecycle.go index 4d8dbcb0c..703c723cb 100644 --- a/oracle/orchestrator/lifecycle.go +++ b/oracle/orchestrator/lifecycle.go @@ -5,17 +5,8 @@ import ( "fmt" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) -// ctxErrors is a map of context errors that we check for when starting the provider. -// We only want cancel the main error group if the context is canceled or the deadline -// is exceeded. Otherwise, failures should be handled gracefully. -var ctxErrors = map[error]struct{}{ - context.Canceled: {}, - context.DeadlineExceeded: {}, -} - // generalProvider is a interface for a provider that implements the base provider. type generalProvider interface { // Start starts the provider. @@ -33,21 +24,31 @@ func (o *ProviderOrchestrator) Start(ctx context.Context) error { return err } - // Create a new error group for the provider orchestrator. - o.errGroup, ctx = errgroup.WithContext(ctx) - // Set tthe main context for the provider orchestrator. ctx, _ = o.setMainCtx(ctx) // Start all of the price providers. for _, state := range o.providers { - o.errGroup.Go(o.execProviderFn(ctx, state.Provider)) + o.wg.Add(1) + go func() { + defer o.wg.Done() + o.execProviderFn(ctx, state.Provider) + }() } // Start the market map provider. if o.mmProvider != nil { - o.errGroup.Go(o.execProviderFn(ctx, o.mmProvider)) - o.errGroup.Go(o.listenForMarketMapUpdates(ctx)) + o.wg.Add(1) + go func() { + defer o.wg.Done() + o.execProviderFn(ctx, o.mmProvider) + }() + + o.wg.Add(1) + go func() { + defer o.wg.Done() + o.listenForMarketMapUpdates(ctx) + }() } return nil @@ -55,25 +56,15 @@ func (o *ProviderOrchestrator) Start(ctx context.Context) error { // Stop stops the provider orchestrator. This is a synchronous operation that will // wait for all of the providers to exit. -func (o *ProviderOrchestrator) Stop() error { +func (o *ProviderOrchestrator) Stop() { o.logger.Info("stopping provider orchestrator") if _, cancel := o.getMainCtx(); cancel != nil { cancel() - if o.errGroup == nil { - return nil - } - - // Wait for all of the price providers to exit. - if err := o.errGroup.Wait(); err != nil { - o.logger.Error("provider orchestrator exited with error", zap.Error(err)) - return err - } - - o.logger.Info("provider orchestrator exited successfully") } - return nil + o.wg.Wait() + o.logger.Info("provider orchestrator exited successfully") } // execProviderFn returns a function that starts the provider. This function is used @@ -81,25 +72,15 @@ func (o *ProviderOrchestrator) Stop() error { func (o *ProviderOrchestrator) execProviderFn( ctx context.Context, p generalProvider, -) func() error { - return func() error { - defer func() { - if r := recover(); r != nil { - o.logger.Error("recovered from panic", zap.Error(fmt.Errorf("%v", r))) - } - }() - - // If the context is canceled, or the deadline is exceeded, - // we want to exit the provider and trigger the error group - // to exit for all providers. - err := p.Start(ctx) - if _, ok := ctxErrors[err]; ok { - return err +) { + defer func() { + if r := recover(); r != nil { + o.logger.Error("recovered from panic", zap.Error(fmt.Errorf("%v", r))) } + }() - // Otherwise, we gracefully exit the go routine. - return nil - } + err := p.Start(ctx) + o.logger.Error("provider exited with error", zap.String("provider", p.Name()), zap.Error(err)) } // setMainCtx sets the main context for the provider orchestrator. diff --git a/oracle/orchestrator/market_mapper.go b/oracle/orchestrator/market_mapper.go index a836cf0b4..24fc775be 100644 --- a/oracle/orchestrator/market_mapper.go +++ b/oracle/orchestrator/market_mapper.go @@ -11,13 +11,13 @@ import ( // listenForMarketMapUpdates is a goroutine that listens for market map updates and // updates the orchestrated providers with the new market map. -func (o *ProviderOrchestrator) listenForMarketMapUpdates(ctx context.Context) func() error { - return func() error { +func (o *ProviderOrchestrator) listenForMarketMapUpdates(ctx context.Context) func() { + return func() { mapper := o.GetMarketMapProvider() ids := mapper.GetIDs() if len(ids) != 1 { o.logger.Error("market mapper can only be responsible for one chain", zap.Any("ids", ids)) - return nil + return } apiCfg := mapper.GetAPIConfig() @@ -26,7 +26,8 @@ func (o *ProviderOrchestrator) listenForMarketMapUpdates(ctx context.Context) fu for { select { case <-ctx.Done(): - return ctx.Err() + o.logger.Info("stopping market map listener", zap.Error(ctx.Err())) + return case <-ticker.C: // Fetch the latest market map. response := mapper.GetData() diff --git a/oracle/orchestrator/orchestrator.go b/oracle/orchestrator/orchestrator.go index fc7f5ed6b..11241ed81 100644 --- a/oracle/orchestrator/orchestrator.go +++ b/oracle/orchestrator/orchestrator.go @@ -5,7 +5,6 @@ import ( "sync" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/skip-mev/slinky/oracle/config" "github.com/skip-mev/slinky/oracle/types" @@ -32,7 +31,7 @@ type ProviderOrchestrator struct { // mainCancel is the main context cancel function. mainCancel context.CancelFunc // errGroup is the error group for the provider orchestrator. - errGroup *errgroup.Group + wg sync.WaitGroup // -------------------Stateful Fields-------------------// // @@ -99,6 +98,7 @@ func NewProviderOrchestrator( wsMetrics: wsmetrics.NewWebSocketMetricsFromConfig(cfg.Metrics), apiMetrics: apimetrics.NewAPIMetricsFromConfig(cfg.Metrics), providerMetrics: providermetrics.NewProviderMetricsFromConfig(cfg.Metrics), + wg: sync.WaitGroup{}, } for _, opt := range opts { diff --git a/oracle/orchestrator/update.go b/oracle/orchestrator/update.go index 201ed18a5..691eb2365 100644 --- a/oracle/orchestrator/update.go +++ b/oracle/orchestrator/update.go @@ -54,6 +54,7 @@ func (o *ProviderOrchestrator) UpdateWithMarketMap(marketMap mmtypes.MarketMap) func (o *ProviderOrchestrator) UpdateProviderState(marketMap types.ProviderMarketMap, state ProviderState) (ProviderState, error) { provider := state.Provider + tickers := marketMap.GetTickers() o.logger.Info("updating provider state", zap.String("provider_state", provider.Name())) switch provider.Type() { case providertypes.API: @@ -65,7 +66,7 @@ func (o *ProviderOrchestrator) UpdateProviderState(marketMap types.ProviderMarke provider.Update( base.WithNewAPIHandler(handler), - base.WithNewIDs[mmtypes.Ticker, *big.Int](marketMap.GetTickers()), + base.WithNewIDs[mmtypes.Ticker, *big.Int](tickers), ) case providertypes.WebSockets: // Create and update the WebSocket query handler. @@ -76,10 +77,20 @@ func (o *ProviderOrchestrator) UpdateProviderState(marketMap types.ProviderMarke provider.Update( base.WithNewWebSocketHandler(handler), - base.WithNewIDs[mmtypes.Ticker, *big.Int](marketMap.GetTickers()), + base.WithNewIDs[mmtypes.Ticker, *big.Int](tickers), ) } + if len(tickers) == 0 { + provider.Stop() + } else if !provider.IsRunning() { + o.wg.Add(1) + go func() { + defer o.wg.Done() + o.execProviderFn(o.mainCtx, provider) + }() + } + // Update the provider's state. o.logger.Info("updated provider state", zap.String("provider_state", provider.Name())) return state, nil diff --git a/providers/base/provider.go b/providers/base/provider.go index 29a9f7068..2326be9de 100644 --- a/providers/base/provider.go +++ b/providers/base/provider.go @@ -101,6 +101,10 @@ func NewProvider[K providertypes.ResponseKey, V providertypes.ResponseValue](opt // and continuously update the data. This blocks until the provider is stopped. func (p *Provider[K, V]) Start(ctx context.Context) error { p.logger.Info("starting provider") + if len(p.ids) == 0 { + p.logger.Info("no ids to fetch; provider is exiting") + return nil + } mainCtx, mainCancel := p.setMainCtx(ctx) defer func() {