Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/core: rate fetcher tweaks #2512

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/asset/eth/multirpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
// HTTPS, but not for other requests.
// TODO: Keep a file mapping provider URL to retrieved chain IDs, and skip
// the eth_chainId request after verified for the first time?
defaultRequestTimeout = time.Second * 20
defaultRequestTimeout = time.Second * 10
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had set this to 20 seconds a while back, but I've come to realize that I had a misconfiguration on my system that brought some glibc bug to the surface that was causing hella long DNS lookup times in some cases.

)

var (
Expand Down
71 changes: 23 additions & 48 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,9 +1454,8 @@ type Core struct {

ratesMtx sync.RWMutex
fiatRateSources map[string]*commonRateSource
// stopFiatRateFetching will be used to shutdown fetchFiatExchangeRates
// goroutine when all rate sources have been disabled.
stopFiatRateFetching context.CancelFunc

reFiat chan struct{}

pendingWalletsMtx sync.RWMutex
pendingWallets map[uint32]bool
Expand Down Expand Up @@ -1571,6 +1570,7 @@ func New(cfg *Config) (*Core, error) {
seedGenerationTime: seedGenerationTime,

fiatRateSources: make(map[string]*commonRateSource),
reFiat: make(chan struct{}, 1),
pendingWallets: make(map[uint32]bool),

notes: make(chan asset.WalletNotification, 128),
Expand Down Expand Up @@ -1613,7 +1613,6 @@ func (c *Core) Run(ctx context.Context) {
// Skip rate fetch setup if on simnet. Rate fetching maybe enabled if
// desired.
if c.cfg.Net != dex.Simnet || c.cfg.SimnetFiatRates {
c.ratesMtx.Lock()
// Retrieve disabled fiat rate sources from database.
disabledSources, err := c.db.DisabledRateSources()
if err != nil {
Expand All @@ -1630,15 +1629,8 @@ func (c *Core) Run(ctx context.Context) {
}
c.fiatRateSources[token] = newCommonRateSource(rateFetcher)
}

// Start goroutine for fiat rate fetcher's if we have at least one source.
if len(c.fiatRateSources) != 0 {
c.fetchFiatExchangeRates()
} else {
c.log.Debug("no fiat rate source initialized")
}
c.ratesMtx.Unlock()
}
c.fetchFiatExchangeRates(ctx)

// Start bond supervisor.
c.wg.Add(1)
Expand Down Expand Up @@ -10267,14 +10259,7 @@ func (c *Core) findActiveOrder(oid order.OrderID) (*trackedTrade, error) {

// fetchFiatExchangeRates starts the fiat rate fetcher goroutine and schedules
// refresh cycles. Use under ratesMtx lock.
func (c *Core) fetchFiatExchangeRates() {
if c.stopFiatRateFetching != nil {
c.log.Debug("Fiat exchange rate fetching is already enabled")
return
}
ctx, cancel := context.WithCancel(c.ctx)
c.stopFiatRateFetching = cancel

func (c *Core) fetchFiatExchangeRates(ctx context.Context) {
c.log.Debug("starting fiat rate fetching")

c.wg.Add(1)
Expand All @@ -10286,32 +10271,39 @@ func (c *Core) fetchFiatExchangeRates() {
c.refreshFiatRates(ctx)

select {
case <-tick.C:
case <-c.reFiat:
case <-ctx.Done():
return
case <-tick.C:

}
}
}()
}

func (c *Core) fiatSources() []*commonRateSource {
c.ratesMtx.RLock()
defer c.ratesMtx.RUnlock()
sources := make([]*commonRateSource, 0, len(c.fiatRateSources))
for _, s := range c.fiatRateSources {
sources = append(sources, s)
}
return sources
}

// refreshFiatRates refreshes the fiat rates for rate sources whose values have
// not been updated since fiatRateRequestInterval. It also checks if fiat rates
// are expired and does some clean-up.
func (c *Core) refreshFiatRates(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

var wg sync.WaitGroup
supportedAssets := c.SupportedAssets()
c.ratesMtx.RLock()
for _, source := range c.fiatRateSources {
for _, source := range c.fiatSources() {
wg.Add(1)
go func(source *commonRateSource) {
defer wg.Done()
source.refreshRates(ctx, c.log, supportedAssets)
}(source)
}
c.ratesMtx.RUnlock()
wg.Wait()

// Remove expired rate source if any.
Expand Down Expand Up @@ -10347,13 +10339,11 @@ func (c *Core) fiatConversions() map[uint32]float64 {
}
}

c.ratesMtx.RLock()
defer c.ratesMtx.RUnlock()
fiatRatesMap := make(map[uint32]float64, len(supportedAssets))
for assetID := range assetIDs {
var rateSum float64
var sources int
for _, source := range c.fiatRateSources {
for _, source := range c.fiatSources() {
rateInfo := source.assetRate(assetID)
if rateInfo != nil && time.Since(rateInfo.lastUpdate) < fiatRateDataExpiry && rateInfo.rate > 0 {
sources++
Expand Down Expand Up @@ -10394,17 +10384,9 @@ func (c *Core) enableRateSource(source string) error {
rateSource := newCommonRateSource(rateFetcher)
c.fiatRateSources[source] = rateSource

// If this is our first fiat rate source, start fiat rate fetcher goroutine,
// else fetch rates.
if len(c.fiatRateSources) == 1 {
c.fetchFiatExchangeRates()
} else {
go func() {
supportedAssets := c.SupportedAssets() // not with ratesMtx locked!
ctx, cancel := context.WithTimeout(c.ctx, 4*time.Second)
defer cancel()
rateSource.refreshRates(ctx, c.log, supportedAssets)
}()
select {
case c.reFiat <- struct{}{}:
default:
}

// Update disabled fiat rate source.
Expand Down Expand Up @@ -10472,13 +10454,6 @@ func (c *Core) saveDisabledRateSources() {
}
}

// Shutdown rate fetching if there are no exchange rate source.
if len(c.fiatRateSources) == 0 && c.stopFiatRateFetching != nil {
c.stopFiatRateFetching()
c.stopFiatRateFetching = nil
c.log.Debug("shutting down rate fetching")
}

err := c.db.SaveDisabledRateSources(disabled)
if err != nil {
c.log.Errorf("Unable to save disabled fiat rate source to database: %v", err)
Expand Down
4 changes: 1 addition & 3 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10209,8 +10209,6 @@ func TestFiatConversions(t *testing.T) {
rig := newTestRig()
defer rig.shutdown()
tCore := rig.core
ctx, cancel := context.WithCancel(tCore.ctx)
tCore.stopFiatRateFetching = cancel

// No fiat rate source initialized
fiatRates := tCore.fiatConversions()
Expand All @@ -10227,7 +10225,7 @@ func TestFiatConversions(t *testing.T) {
tCore.wg.Add(1)
go func() {
defer tCore.wg.Done()
tCore.refreshFiatRates(ctx)
tCore.refreshFiatRates(tCtx)
}()
tCore.wg.Wait()

Expand Down
89 changes: 65 additions & 24 deletions client/core/exchangeratefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
fiatRateRequestInterval = 12 * time.Minute
// fiatRateDataExpiry : Any data older than fiatRateDataExpiry will be discarded.
fiatRateDataExpiry = 60 * time.Minute
fiatRequestTimeout = time.Second * 5

// Tokens. Used to identify fiat rate source, source name must not contain a
// comma.
Expand All @@ -32,11 +33,33 @@ const (
)

var (
dcrDataURL = "https://explorer.dcrdata.org/api/exchangerate"
dcrDataURL = "https://explorer.dcrdata.org/api/exchangerate"
// coinpaprika has two options. /tickers is for the top 2500 assets all in
// one request. /ticker/[slug] is for a single ticker. From testing
// Single ticker request took 274.626125ms
// Size of single ticker response: 0.733 kB
// All tickers request took 47.651851ms
// Size of all tickers response: 1828.863 kB
// Single ticker requests were < 1 kB, while all tickers were 1.8 MB, but
// the larger request was faster (without json decoding). Coinpaprika's,
// free tier allows up to 25k requests per month. For a
// fiatRateRequestInterval of 12 minutes, that 3600 requests per
// month for the all tickers request, and 3600 * N requests for N assets.
// So any more than 25000 / 3600 = 6.9 assets, and we can expect to run into
// rate limits. But the bandwidth of the full tickers request is kinda
// ridiculous too. Solution needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could keep track of how many requests are made, and if it gets to point where it can only make it through the rest of the month if only the all assets request is used, then switch to the all assets request. For most people it won't make it to that point because they don't keep dexc on all the time.

coinpaprikaURL = "https://api.coinpaprika.com/v1/tickers/%s"
messariURL = "https://data.messari.io/api/v1/assets/%s/metrics/market-data"
btcBipID, _ = dex.BipSymbolID("btc")
dcrBipID, _ = dex.BipSymbolID("dcr")
// The best info I can find on Messari says
// Without an API key requests are rate limited to 20 requests per minute
// and 1000 requests per day.
// For a
// fiatRateRequestInterval of 12 minutes, to hit 20 requests per minute, we
// would need to have 20 * 12 = 480 assets. To hit 1000 requests per day,
// we would need 12 * 60 / (86,400 / 1000) = 8.33 assets. Very likely. So
// we're in a similar position to coinpaprika here too.
messariURL = "https://data.messari.io/api/v1/assets/%s/metrics/market-data"
btcBipID, _ = dex.BipSymbolID("btc")
dcrBipID, _ = dex.BipSymbolID("dcr")
)

// fiatRateFetchers is the list of all supported fiat rate fetchers.
Expand Down Expand Up @@ -119,10 +142,11 @@ func newCommonRateSource(fetcher rateFetcher) *commonRateSource {
// for sample request and response information.
func fetchCoinpaprikaRates(ctx context.Context, log dex.Logger, assets map[uint32]*SupportedAsset) map[uint32]float64 {
fiatRates := make(map[uint32]float64)
for assetID, sa := range assets {
fetchRate := func(sa *SupportedAsset) {
assetID := sa.ID
if sa.Wallet == nil {
// we don't want to fetch rates for assets with no wallet.
continue
return
}

res := new(struct {
Expand All @@ -133,24 +157,36 @@ func fetchCoinpaprikaRates(ctx context.Context, log dex.Logger, assets map[uint3
} `json:"quotes"`
})

symbol := sa.Symbol
symbol := dex.TokenSymbol(sa.Symbol)
if symbol == "dextt" {
return
}

name := sa.Name
// TODO: Store these within the *SupportedAsset.
switch assetID {
case 60001: // usdc
symbol = "usdc"
switch symbol {
case "usdc":
name = "usd-coin"
case "polygon":
symbol = "matic"
name = "polygon"
}

reqStr := fmt.Sprintf(coinpaprikaURL, coinpapSlug(symbol, name))

ctx, cancel := context.WithTimeout(ctx, fiatRequestTimeout)
defer cancel()

if err := getRates(ctx, reqStr, res); err != nil {
log.Error(err)
continue
log.Errorf("Error getting fiat exchange rates from coinpaprika: %v", err)
return
}

fiatRates[assetID] = res.Quotes.Currency.Price
}
for _, sa := range assets {
fetchRate(sa)
}
return fiatRates
}

Expand Down Expand Up @@ -191,10 +227,11 @@ func fetchDcrdataRates(ctx context.Context, log dex.Logger, assets map[uint32]*S
// sample request and response information.
func fetchMessariRates(ctx context.Context, log dex.Logger, assets map[uint32]*SupportedAsset) map[uint32]float64 {
fiatRates := make(map[uint32]float64)
for assetID, asset := range assets {
if asset.Wallet == nil {
fetchRate := func(sa *SupportedAsset) {
assetID := sa.ID
if sa.Wallet == nil {
// we don't want to fetch rate for assets with no wallet.
continue
return
}

res := new(struct {
Expand All @@ -205,23 +242,27 @@ func fetchMessariRates(ctx context.Context, log dex.Logger, assets map[uint32]*S
} `json:"data"`
})

slug := strings.ToLower(asset.Symbol)

// TODO: Store these within the *SupportedAsset.
switch assetID {
case 60001: // usdc
slug = "usdc"
slug := dex.TokenSymbol(sa.Symbol)
if slug == "dextt" {
return
}

reqStr := fmt.Sprintf(messariURL, slug)

ctx, cancel := context.WithTimeout(ctx, fiatRequestTimeout)
defer cancel()

if err := getRates(ctx, reqStr, res); err != nil {
log.Error(err)
continue
log.Errorf("Error getting fiat exchange rates from messari: %v", err)
return
}

fiatRates[assetID] = res.Data.MarketData.Price
}

for _, sa := range assets {
fetchRate(sa)
}
return fiatRates
}

Expand All @@ -244,7 +285,7 @@ func getRates(ctx context.Context, url string, thing interface{}) error {
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response, got status code %d", resp.StatusCode)
return fmt.Errorf("error %d fetching %q", resp.StatusCode, url)
}

reader := io.LimitReader(resp.Body, 1<<20)
Expand Down
13 changes: 13 additions & 0 deletions dex/bip-id.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package dex

import "strings"

var symbolBipIDs map[string]uint32

// BipSymbolID returns the asset ID associated with a given ticker symbol.
Expand All @@ -26,6 +28,17 @@ func BipIDSymbol(id uint32) string {
return bipIDs[id]
}

// TokenSymbol returns the tokens raw symbol if this is compound symbol that
// encodes the blockchain, or else the input is returned unaltered.
// e.g. usdc.eth => usdc
func TokenSymbol(symbol string) string {
parts := strings.Split(symbol, ".")
if len(parts) > 1 {
return parts[0]
}
return symbol
}

var bipIDs = map[uint32]string{
0: "btc",
1: "testnet",
Expand Down