Skip to content

Commit

Permalink
feat: add opentelemetry tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
freak12techno committed May 15, 2024
1 parent fb8f8fd commit bdd43c1
Show file tree
Hide file tree
Showing 15 changed files with 330 additions and 67 deletions.
2 changes: 1 addition & 1 deletion cmd/cosmos-wallets-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var (
)

func ExecuteMain(configPath string) {
app := pkg.NewApp(configPath)
app := pkg.NewApp(configPath, version)
app.Start()
}

Expand Down
34 changes: 27 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
module main

go 1.18
go 1.21.4

toolchain go1.22.1

require (
github.com/BurntSushi/toml v1.1.0
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/guregu/null/v5 v5.0.0
github.com/mcuadros/go-defaults v1.2.0
github.com/prometheus/client_golang v1.12.2
github.com/rs/zerolog v1.26.1
github.com/spf13/cobra v1.4.0
github.com/superoo7/go-gecko v1.0.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0
go.opentelemetry.io/otel/sdk v1.26.0
go.opentelemetry.io/otel/trace v1.26.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
google.golang.org/protobuf v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
88 changes: 67 additions & 21 deletions go.sum

Large diffs are not rendered by default.

47 changes: 33 additions & 14 deletions pkg/app.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
package pkg

import (
"context"
coingeckoPkg "main/pkg/coingecko"
"main/pkg/config"
"main/pkg/logger"
queriersPkg "main/pkg/queriers"
"main/pkg/tracing"
"main/pkg/types"
"net/http"
"sync"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/trace"
)

type App struct {
Config *config.Config
Logger zerolog.Logger
Queriers []types.Querier

Tracer trace.Tracer
}

func NewApp(configPath string) *App {
func NewApp(configPath string, version string) *App {
appConfig, err := config.GetConfig(configPath)
if err != nil {
logger.GetDefaultLogger().Fatal().Err(err).Msg("Could not load config")
Expand All @@ -32,26 +40,31 @@ func NewApp(configPath string) *App {
logger.GetDefaultLogger().Fatal().Err(err).Msg("Provided config is invalid!")
}

tracer, err := tracing.InitTracer(appConfig.TracingConfig, version)
if err != nil {
logger.GetDefaultLogger().Fatal().Err(err).Msg("Error setting up tracing")
}

log := logger.GetLogger(appConfig.LogConfig)
coingecko := coingeckoPkg.NewCoingecko(appConfig, log)
coingecko := coingeckoPkg.NewCoingecko(appConfig, log, tracer)

queriers := []types.Querier{
queriersPkg.NewPriceQuerier(appConfig, coingecko),
queriersPkg.NewBalanceQuerier(appConfig, log),
queriersPkg.NewUptimeQuerier(),
queriersPkg.NewPriceQuerier(appConfig, coingecko, tracer),
queriersPkg.NewBalanceQuerier(appConfig, log, tracer),
queriersPkg.NewUptimeQuerier(tracer),
}

return &App{
Config: appConfig,
Logger: log,
Queriers: queriers,
Tracer: tracer,
}
}

func (a *App) Start() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
a.Handler(w, r)
})
otelHandler := otelhttp.NewHandler(http.HandlerFunc(a.Handler), "prometheus")
http.Handle("/metrics", otelHandler)

a.Logger.Info().Str("addr", a.Config.ListenAddress).Msg("Listening")
err := http.ListenAndServe(a.Config.ListenAddress, nil)
Expand All @@ -62,11 +75,18 @@ func (a *App) Start() {

func (a *App) Handler(w http.ResponseWriter, r *http.Request) {
requestStart := time.Now()
requestID := uuid.New().String()

sublogger := a.Logger.With().
Str("request-id", uuid.New().String()).
Str("request-id", requestID).
Logger()

span := trace.SpanFromContext(r.Context())
span.SetAttributes(attribute.String("request-id", requestID))
rootSpanCtx := r.Context()

defer span.End()

registry := prometheus.NewRegistry()

var wg sync.WaitGroup
Expand All @@ -76,16 +96,15 @@ func (a *App) Handler(w http.ResponseWriter, r *http.Request) {

for _, querier := range a.Queriers {
wg.Add(1)
go func(querier types.Querier) {
mutex.Lock()
go func(querier types.Querier, ctx context.Context) {
metrics, querierQueryInfos := querier.GetMetrics(ctx)

metrics, querierQueryInfos := querier.GetMetrics()
mutex.Lock()
registry.MustRegister(metrics...)
queryInfos = append(queryInfos, querierQueryInfos...)

mutex.Unlock()
wg.Done()
}(querier)
}(querier, rootSpanCtx)
}

wg.Wait()
Expand Down
16 changes: 12 additions & 4 deletions pkg/coingecko/coingecko.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package coingecko

import (
"context"
"fmt"
"main/pkg/config"
"main/pkg/http"
"main/pkg/types"
"strings"

"go.opentelemetry.io/otel/trace"

"github.com/rs/zerolog"
)

Expand All @@ -16,22 +19,27 @@ type Coingecko struct {
Client *http.Client
Config *config.Config
Logger zerolog.Logger
Tracer trace.Tracer
}

func NewCoingecko(appConfig *config.Config, logger zerolog.Logger) *Coingecko {
func NewCoingecko(appConfig *config.Config, logger zerolog.Logger, tracer trace.Tracer) *Coingecko {
return &Coingecko{
Config: appConfig,
Client: http.NewClient(logger, "coingecko"),
Client: http.NewClient(logger, "coingecko", tracer),
Logger: logger.With().Str("component", "coingecko").Logger(),
Tracer: tracer,
}
}

func (c *Coingecko) FetchPrices(currencies []string) (map[string]float64, types.QueryInfo) {
func (c *Coingecko) FetchPrices(currencies []string, ctx context.Context) (map[string]float64, types.QueryInfo) {
childCtx, span := c.Tracer.Start(ctx, "Querying prices")
defer span.End()

ids := strings.Join(currencies, ",")
url := fmt.Sprintf("https://api.coingecko.com/api/v3/simple/price?ids=%s&vs_currencies=usd", ids)

var response Response
queryInfo, _, err := c.Client.Get(url, &response, types.HTTPPredicateAlwaysPass())
queryInfo, _, err := c.Client.Get(url, &response, types.HTTPPredicateAlwaysPass(), childCtx)
if err != nil {
c.Logger.Error().Err(err).Msg("Could not get rate")
return nil, queryInfo
Expand Down
17 changes: 14 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"

"github.com/guregu/null/v5"

"github.com/BurntSushi/toml"
"github.com/mcuadros/go-defaults"
)
Expand Down Expand Up @@ -78,16 +80,25 @@ func (c *Chain) FindDenomByName(denom string) (*DenomInfo, bool) {
}

type Config struct {
LogConfig LogConfig `toml:"log"`
ListenAddress string `default:":9550" toml:"listen-address"`
Chains []Chain `toml:"chains"`
TracingConfig TracingConfig `toml:"tracing"`
LogConfig LogConfig `toml:"log"`
ListenAddress string `default:":9550" toml:"listen-address"`
Chains []Chain `toml:"chains"`
}

type LogConfig struct {
LogLevel string `default:"info" toml:"level"`
JSONOutput bool `default:"false" toml:"json"`
}

type TracingConfig struct {
Enabled null.Bool `default:"false" toml:"enabled"`
OpenTelemetryHTTPHost string `toml:"open-telemetry-http-host"`
OpenTelemetryHTTPInsecure null.Bool `default:"true" toml:"open-telemetry-http-insecure"`
OpenTelemetryHTTPUser string `toml:"open-telemetry-http-user"`
OpenTelemetryHTTPPassword string `toml:"open-telemetry-http-password"`
}

func (c *Config) Validate() error {
if len(c.Chains) == 0 {
return errors.New("no chains provided")
Expand Down
19 changes: 16 additions & 3 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,45 @@
package http

import (
"context"
"encoding/json"
"main/pkg/types"
"net/http"
"time"

"go.opentelemetry.io/otel/trace"

"github.com/rs/zerolog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

type Client struct {
logger zerolog.Logger
chain string
tracer trace.Tracer
}

func NewClient(logger zerolog.Logger, chain string) *Client {
func NewClient(logger zerolog.Logger, chain string, tracer trace.Tracer) *Client {
return &Client{
logger: logger.With().Str("component", "http").Logger(),
chain: chain,
tracer: tracer,
}
}

func (c *Client) Get(
url string,
target interface{},
predicate types.HTTPPredicate,
ctx context.Context,
) (types.QueryInfo, http.Header, error) {
client := &http.Client{Timeout: 10 * 1000000000}
childCtx, span := c.tracer.Start(ctx, "HTTP request")
defer span.End()

client := &http.Client{
Timeout: 10 * 1000000000,
Transport: otelhttp.NewTransport(http.DefaultTransport),
}
start := time.Now()

queryInfo := types.QueryInfo{
Expand All @@ -35,7 +48,7 @@ func (c *Client) Get(
URL: url,
}

req, err := http.NewRequest(http.MethodGet, url, nil)
req, err := http.NewRequestWithContext(childCtx, http.MethodGet, url, nil)
if err != nil {
return queryInfo, nil, err
}
Expand Down
26 changes: 22 additions & 4 deletions pkg/queriers/balance.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package queriers

import (
"context"
"main/pkg/config"
"main/pkg/tendermint"
"main/pkg/types"
"main/pkg/utils"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
)
Expand All @@ -15,23 +19,32 @@ type BalanceQuerier struct {
Config *config.Config
Logger zerolog.Logger
RPCs []*tendermint.RPC
Tracer trace.Tracer
}

func NewBalanceQuerier(config *config.Config, logger zerolog.Logger) *BalanceQuerier {
func NewBalanceQuerier(
config *config.Config,
logger zerolog.Logger,
tracer trace.Tracer,
) *BalanceQuerier {
rpcs := make([]*tendermint.RPC, len(config.Chains))

for index, chain := range config.Chains {
rpcs[index] = tendermint.NewRPC(chain, logger)
rpcs[index] = tendermint.NewRPC(chain, logger, tracer)
}

return &BalanceQuerier{
Config: config,
Logger: logger.With().Str("component", "balance_querier").Logger(),
RPCs: rpcs,
Tracer: tracer,
}
}

func (q *BalanceQuerier) GetMetrics() ([]prometheus.Collector, []types.QueryInfo) {
func (q *BalanceQuerier) GetMetrics(ctx context.Context) ([]prometheus.Collector, []types.QueryInfo) {
childCtx, span := q.Tracer.Start(ctx, "Querying balance metrics")
defer span.End()

balancesGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cosmos_wallets_exporter_balance",
Expand All @@ -51,9 +64,14 @@ func (q *BalanceQuerier) GetMetrics() ([]prometheus.Collector, []types.QueryInfo
for _, wallet := range chain.Wallets {
wg.Add(1)
go func(wallet config.Wallet, chain config.Chain, rpc *tendermint.RPC) {
chainCtx, chainSpan := q.Tracer.Start(childCtx, "Querying chain and wallet")
chainSpan.SetAttributes(attribute.String("chain", chain.Name))
chainSpan.SetAttributes(attribute.String("wallet", wallet.Address))
defer chainSpan.End()

defer wg.Done()

balancesResponse, queryInfo, err := rpc.GetWalletBalances(wallet.Address)
balancesResponse, queryInfo, err := rpc.GetWalletBalances(wallet.Address, chainCtx)

mutex.Lock()
defer mutex.Unlock()
Expand Down
Loading

0 comments on commit bdd43c1

Please sign in to comment.