Skip to content

Commit

Permalink
feat: compare query height to the previous height (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
freak12techno authored May 15, 2024
1 parent ef18f3b commit fb8f8fd
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/coingecko/coingecko.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *Coingecko) FetchPrices(currencies []string) (map[string]float64, types.
url := fmt.Sprintf("https://api.coingecko.com/api/v3/simple/price?ids=%s&vs_currencies=usd", ids)

var response Response
queryInfo, err := c.Client.Get(url, &response)
queryInfo, _, err := c.Client.Get(url, &response, types.HTTPPredicateAlwaysPass())
if err != nil {
c.Logger.Error().Err(err).Msg("Could not get rate")
return nil, queryInfo
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package constants

const (
HeaderBlockHeight = "Grpc-Metadata-X-Cosmos-Block-Height"
)
21 changes: 16 additions & 5 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ func NewClient(logger zerolog.Logger, chain string) *Client {
}
}

func (c *Client) Get(url string, target interface{}) (types.QueryInfo, error) {
func (c *Client) Get(
url string,
target interface{},
predicate types.HTTPPredicate,
) (types.QueryInfo, http.Header, error) {
client := &http.Client{Timeout: 10 * 1000000000}
start := time.Now()

Expand All @@ -33,7 +37,7 @@ func (c *Client) Get(url string, target interface{}) (types.QueryInfo, error) {

req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return queryInfo, err
return queryInfo, nil, err
}

req.Header.Set("User-Agent", "cosmos-wallets-exporter")
Expand All @@ -44,14 +48,21 @@ func (c *Client) Get(url string, target interface{}) (types.QueryInfo, error) {
queryInfo.Duration = time.Since(start)
if err != nil {
c.logger.Warn().Str("url", url).Err(err).Msg("Query failed")
return queryInfo, err
return queryInfo, nil, err
}
defer res.Body.Close()

c.logger.Debug().Str("url", url).Dur("duration", time.Since(start)).Msg("Query is finished")
c.logger.Debug().
Str("url", url).
Dur("duration", time.Since(start)).
Msg("Query is finished")

if predicateErr := predicate(res); predicateErr != nil {
return queryInfo, res.Header, predicateErr
}

err = json.NewDecoder(res.Body).Decode(target)
queryInfo.Success = err == nil

return queryInfo, err
return queryInfo, res.Header, err
}
16 changes: 12 additions & 4 deletions pkg/queriers/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ import (
type BalanceQuerier struct {
Config *config.Config
Logger zerolog.Logger
RPCs []*tendermint.RPC
}

func NewBalanceQuerier(config *config.Config, logger zerolog.Logger) *BalanceQuerier {
rpcs := make([]*tendermint.RPC, len(config.Chains))

for index, chain := range config.Chains {
rpcs[index] = tendermint.NewRPC(chain, logger)
}

return &BalanceQuerier{
Config: config,
Logger: logger.With().Str("component", "balance_querier").Logger(),
RPCs: rpcs,
}
}

Expand All @@ -37,12 +45,12 @@ func (q *BalanceQuerier) GetMetrics() ([]prometheus.Collector, []types.QueryInfo
var wg sync.WaitGroup
var mutex sync.Mutex

for _, chain := range q.Config.Chains {
rpc := tendermint.NewRPC(chain, q.Logger)
for index, chain := range q.Config.Chains {
rpc := q.RPCs[index]

for _, wallet := range chain.Wallets {
wg.Add(1)
go func(wallet config.Wallet, chain config.Chain) {
go func(wallet config.Wallet, chain config.Chain, rpc *tendermint.RPC) {
defer wg.Done()

balancesResponse, queryInfo, err := rpc.GetWalletBalances(wallet.Address)
Expand Down Expand Up @@ -79,7 +87,7 @@ func (q *BalanceQuerier) GetMetrics() ([]prometheus.Collector, []types.QueryInfo
"denom": denom,
}).Set(amount)
}
}(wallet, chain)
}(wallet, chain, rpc)
}
}

Expand Down
25 changes: 21 additions & 4 deletions pkg/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"main/pkg/config"
"main/pkg/http"
"main/pkg/types"
"main/pkg/utils"
"sync"

"github.com/rs/zerolog"
)
Expand All @@ -13,28 +15,43 @@ type RPC struct {
Client *http.Client
URL string
Logger zerolog.Logger

LastQueryHeight map[string]int64
Mutex sync.Mutex
}

func NewRPC(chain config.Chain, logger zerolog.Logger) *RPC {
return &RPC{
Client: http.NewClient(logger, chain.Name),
URL: chain.LCDEndpoint,
Logger: logger.With().Str("component", "rpc").Logger(),
Client: http.NewClient(logger, chain.Name),
URL: chain.LCDEndpoint,
Logger: logger.With().Str("component", "rpc").Logger(),
LastQueryHeight: make(map[string]int64),
}
}

func (rpc *RPC) GetWalletBalances(address string) (*types.BalanceResponse, types.QueryInfo, error) {
lastHeight, _ := rpc.LastQueryHeight[address]

url := fmt.Sprintf(
"%s/cosmos/bank/v1beta1/balances/%s",
rpc.URL,
address,
)

var response *types.BalanceResponse
queryInfo, err := rpc.Client.Get(url, &response)
queryInfo, header, err := rpc.Client.Get(url, &response, types.HTTPPredicateCheckHeightAfter(lastHeight))
if err != nil {
return nil, queryInfo, err
}

newLastHeight, err := utils.GetBlockHeightFromHeader(header)
if err != nil {
return nil, queryInfo, err
}

rpc.Mutex.Lock()
rpc.LastQueryHeight[address] = newLastHeight
rpc.Mutex.Unlock()

return response, queryInfo, nil
}
34 changes: 34 additions & 0 deletions pkg/types/http_predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package types

import (
"fmt"
"main/pkg/utils"
"net/http"
)

type HTTPPredicate func(response *http.Response) error

func HTTPPredicateAlwaysPass() HTTPPredicate {
return func(response *http.Response) error {
return nil
}
}

func HTTPPredicateCheckHeightAfter(prevHeight int64) HTTPPredicate {
return func(response *http.Response) error {
currentHeight, err := utils.GetBlockHeightFromHeader(response.Header)
if err != nil {
return err
}

if prevHeight > currentHeight {
return fmt.Errorf(
"previous height (%d) is bigger than the current height (%d)",
prevHeight,
currentHeight,
)
}

return nil
}
}
16 changes: 16 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import (
"main/pkg/constants"
"net/http"
"strconv"
)

Expand All @@ -20,3 +22,17 @@ func StrToFloat64(s string) float64 {

return f
}

func GetBlockHeightFromHeader(header http.Header) (int64, error) {
valueStr := header.Get(constants.HeaderBlockHeight)
if valueStr == "" {
return 0, nil
}

value, err := strconv.ParseInt(valueStr, 10, 64)
if err != nil {
return 0, err
}

return value, nil
}

0 comments on commit fb8f8fd

Please sign in to comment.