Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
davidterpay committed Mar 18, 2024
1 parent c327453 commit da878ca
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 53 deletions.
71 changes: 26 additions & 45 deletions oracle/orchestrator/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,8 @@ import (
"fmt"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// ctxErrors is a map of context errors that we check for when starting the provider.
// We only want cancel the main error group if the context is canceled or the deadline
// is exceeded. Otherwise, failures should be handled gracefully.
var ctxErrors = map[error]struct{}{
context.Canceled: {},
context.DeadlineExceeded: {},
}

// generalProvider is a interface for a provider that implements the base provider.
type generalProvider interface {
// Start starts the provider.
Expand All @@ -33,73 +24,63 @@ func (o *ProviderOrchestrator) Start(ctx context.Context) error {
return err
}

// Create a new error group for the provider orchestrator.
o.errGroup, ctx = errgroup.WithContext(ctx)

// Set tthe main context for the provider orchestrator.
ctx, _ = o.setMainCtx(ctx)

// Start all of the price providers.
for _, state := range o.providers {
o.errGroup.Go(o.execProviderFn(ctx, state.Provider))
o.wg.Add(1)
go func() {
defer o.wg.Done()
o.execProviderFn(ctx, state.Provider)
}()
}

// Start the market map provider.
if o.mmProvider != nil {
o.errGroup.Go(o.execProviderFn(ctx, o.mmProvider))
o.errGroup.Go(o.listenForMarketMapUpdates(ctx))
o.wg.Add(1)
go func() {
defer o.wg.Done()
o.execProviderFn(ctx, o.mmProvider)
}()

o.wg.Add(1)
go func() {
defer o.wg.Done()
o.listenForMarketMapUpdates(ctx)
}()
}

return nil
}

// Stop stops the provider orchestrator. This is a synchronous operation that will
// wait for all of the providers to exit.
func (o *ProviderOrchestrator) Stop() error {
func (o *ProviderOrchestrator) Stop() {
o.logger.Info("stopping provider orchestrator")
if _, cancel := o.getMainCtx(); cancel != nil {
cancel()

if o.errGroup == nil {
return nil
}

// Wait for all of the price providers to exit.
if err := o.errGroup.Wait(); err != nil {
o.logger.Error("provider orchestrator exited with error", zap.Error(err))
return err
}

o.logger.Info("provider orchestrator exited successfully")
}

return nil
o.wg.Wait()
o.logger.Info("provider orchestrator exited successfully")
}

// execProviderFn returns a function that starts the provider. This function is used
// to start the provider in a go routine.
func (o *ProviderOrchestrator) execProviderFn(
ctx context.Context,
p generalProvider,
) func() error {
return func() error {
defer func() {
if r := recover(); r != nil {
o.logger.Error("recovered from panic", zap.Error(fmt.Errorf("%v", r)))
}
}()

// If the context is canceled, or the deadline is exceeded,
// we want to exit the provider and trigger the error group
// to exit for all providers.
err := p.Start(ctx)
if _, ok := ctxErrors[err]; ok {
return err
) {
defer func() {
if r := recover(); r != nil {
o.logger.Error("recovered from panic", zap.Error(fmt.Errorf("%v", r)))
}
}()

// Otherwise, we gracefully exit the go routine.
return nil
}
err := p.Start(ctx)
o.logger.Error("provider exited with error", zap.String("provider", p.Name()), zap.Error(err))
}

// setMainCtx sets the main context for the provider orchestrator.
Expand Down
9 changes: 5 additions & 4 deletions oracle/orchestrator/market_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (

// listenForMarketMapUpdates is a goroutine that listens for market map updates and
// updates the orchestrated providers with the new market map.
func (o *ProviderOrchestrator) listenForMarketMapUpdates(ctx context.Context) func() error {
return func() error {
func (o *ProviderOrchestrator) listenForMarketMapUpdates(ctx context.Context) func() {
return func() {
mapper := o.GetMarketMapProvider()
ids := mapper.GetIDs()
if len(ids) != 1 {
o.logger.Error("market mapper can only be responsible for one chain", zap.Any("ids", ids))
return nil
return
}

apiCfg := mapper.GetAPIConfig()
Expand All @@ -26,7 +26,8 @@ func (o *ProviderOrchestrator) listenForMarketMapUpdates(ctx context.Context) fu
for {
select {
case <-ctx.Done():
return ctx.Err()
o.logger.Info("stopping market map listener", zap.Error(ctx.Err()))
return
case <-ticker.C:
// Fetch the latest market map.
response := mapper.GetData()
Expand Down
4 changes: 2 additions & 2 deletions oracle/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/skip-mev/slinky/oracle/config"
"github.com/skip-mev/slinky/oracle/types"
Expand All @@ -32,7 +31,7 @@ type ProviderOrchestrator struct {
// mainCancel is the main context cancel function.
mainCancel context.CancelFunc
// errGroup is the error group for the provider orchestrator.
errGroup *errgroup.Group
wg sync.WaitGroup

// -------------------Stateful Fields-------------------//
//
Expand Down Expand Up @@ -99,6 +98,7 @@ func NewProviderOrchestrator(
wsMetrics: wsmetrics.NewWebSocketMetricsFromConfig(cfg.Metrics),
apiMetrics: apimetrics.NewAPIMetricsFromConfig(cfg.Metrics),
providerMetrics: providermetrics.NewProviderMetricsFromConfig(cfg.Metrics),
wg: sync.WaitGroup{},
}

for _, opt := range opts {
Expand Down
15 changes: 13 additions & 2 deletions oracle/orchestrator/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (o *ProviderOrchestrator) UpdateWithMarketMap(marketMap mmtypes.MarketMap)
func (o *ProviderOrchestrator) UpdateProviderState(marketMap types.ProviderMarketMap, state ProviderState) (ProviderState, error) {
provider := state.Provider

tickers := marketMap.GetTickers()
o.logger.Info("updating provider state", zap.String("provider_state", provider.Name()))
switch provider.Type() {
case providertypes.API:
Expand All @@ -65,7 +66,7 @@ func (o *ProviderOrchestrator) UpdateProviderState(marketMap types.ProviderMarke

provider.Update(
base.WithNewAPIHandler(handler),
base.WithNewIDs[mmtypes.Ticker, *big.Int](marketMap.GetTickers()),
base.WithNewIDs[mmtypes.Ticker, *big.Int](tickers),
)
case providertypes.WebSockets:
// Create and update the WebSocket query handler.
Expand All @@ -76,10 +77,20 @@ func (o *ProviderOrchestrator) UpdateProviderState(marketMap types.ProviderMarke

provider.Update(
base.WithNewWebSocketHandler(handler),
base.WithNewIDs[mmtypes.Ticker, *big.Int](marketMap.GetTickers()),
base.WithNewIDs[mmtypes.Ticker, *big.Int](tickers),
)
}

if len(tickers) == 0 {
provider.Stop()
} else if !provider.IsRunning() {
o.wg.Add(1)
go func() {
defer o.wg.Done()
o.execProviderFn(o.mainCtx, provider)
}()
}

// Update the provider's state.
o.logger.Info("updated provider state", zap.String("provider_state", provider.Name()))
return state, nil
Expand Down
4 changes: 4 additions & 0 deletions providers/base/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func NewProvider[K providertypes.ResponseKey, V providertypes.ResponseValue](opt
// and continuously update the data. This blocks until the provider is stopped.
func (p *Provider[K, V]) Start(ctx context.Context) error {
p.logger.Info("starting provider")
if len(p.ids) == 0 {
p.logger.Info("no ids to fetch; provider is exiting")
return nil
}

mainCtx, mainCancel := p.setMainCtx(ctx)
defer func() {
Expand Down

0 comments on commit da878ca

Please sign in to comment.