From 2ba13d45b45af616273dd0b668ef75691a8116b6 Mon Sep 17 00:00:00 2001 From: Gabe <7622243+decentralgabe@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:09:54 -0700 Subject: [PATCH] add more logging and tracing (#183) * updates * one more --- impl/cmd/main.go | 2 +- impl/config/config.go | 4 ++-- impl/config/config.toml | 2 +- impl/integrationtest/main.go | 2 +- impl/internal/dht/getput.go | 10 +++++++++- impl/internal/util/logging.go | 4 ++-- impl/pkg/server/logging.go | 30 ++++++++++++++++++++---------- impl/pkg/service/pkarr.go | 24 +++++++++++++++++------- impl/pkg/telemetry/telemetry.go | 14 +++++++++++--- 9 files changed, 64 insertions(+), 28 deletions(-) diff --git a/impl/cmd/main.go b/impl/cmd/main.go index 36a88e99..3367dc36 100644 --- a/impl/cmd/main.go +++ b/impl/cmd/main.go @@ -93,7 +93,7 @@ func run() error { case sig := <-shutdown: logrus.WithContext(ctx).WithField("signal", sig.String()).Info("shutdown signal received") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err = s.Shutdown(ctx); err != nil { diff --git a/impl/config/config.go b/impl/config/config.go index 0b7b2e69..2075d04e 100644 --- a/impl/config/config.go +++ b/impl/config/config.go @@ -84,9 +84,9 @@ func GetDefaultConfig() Config { BootstrapPeers: GetDefaultBootstrapPeers(), }, PkarrConfig: PkarrServiceConfig{ - RepublishCRON: "0 */2 * * *", + RepublishCRON: "0 */3 * * *", CacheTTLSeconds: 600, - CacheSizeLimitMB: 500, + CacheSizeLimitMB: 1000, }, Log: LogConfig{ Level: logrus.DebugLevel.String(), diff --git a/impl/config/config.toml b/impl/config/config.toml index ce88bd29..65cf88ac 100644 --- a/impl/config/config.toml +++ b/impl/config/config.toml @@ -11,6 +11,6 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht. "router.utorrent.com:6881", "router.nuh.dev:6881"] [pkarr] -republish_cron = "0 */2 * * *" # every 2 hours +republish_cron = "0 */3 * * *" # every 3 hours cache_ttl_seconds = 600 # 10 minutes cache_size_limit_mb = 1000 # 1000 MB \ No newline at end of file diff --git a/impl/integrationtest/main.go b/impl/integrationtest/main.go index 5765c773..94949f5d 100644 --- a/impl/integrationtest/main.go +++ b/impl/integrationtest/main.go @@ -120,7 +120,7 @@ func doRequest(ctx context.Context, req *http.Request) error { "url": req.URL, }) - ctx, done := context.WithTimeout(ctx, time.Second*10) + ctx, done := context.WithTimeout(ctx, 10*time.Second) defer done() req = req.WithContext(ctx) diff --git a/impl/internal/dht/getput.go b/impl/internal/dht/getput.go index 244d790e..00496ae1 100644 --- a/impl/internal/dht/getput.go +++ b/impl/internal/dht/getput.go @@ -16,6 +16,8 @@ import ( "github.com/anacrolix/dht/v2/bep44" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/dht/v2/traversal" + + "github.com/TBD54566975/did-dht-method/pkg/telemetry" ) // Copied from https://github.com/anacrolix/dht/blob/master/exts/getput/getput.go and modified @@ -38,7 +40,7 @@ func startGetTraversal( Alpha: 15, Target: target, DoQuery: func(ctx context.Context, addr krpc.NodeAddr) traversal.QueryResult { - queryCtx, cancel := context.WithTimeout(ctx, time.Second*8) + queryCtx, cancel := context.WithTimeout(ctx, 8*time.Second) defer cancel() res := s.Get(queryCtx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{}) @@ -101,6 +103,9 @@ func Get( ) ( ret FullGetResult, stats *traversal.Stats, err error, ) { + ctx, span := telemetry.GetTracer().Start(ctx, "DHT.Get") + defer span.End() + vChan, op, err := startGetTraversal(ctx, target, s, seq, salt) if err != nil { return @@ -139,6 +144,9 @@ func Put( ) ( stats *traversal.Stats, err error, ) { + ctx, span := telemetry.GetTracer().Start(ctx, "DHT.Put") + defer span.End() + vChan, op, err := startGetTraversal(ctx, target, s, // When we do a get traversal for a put, we don't care what seq the peers have? nil, diff --git a/impl/internal/util/logging.go b/impl/internal/util/logging.go index 917b98a7..9bdb499e 100644 --- a/impl/internal/util/logging.go +++ b/impl/internal/util/logging.go @@ -26,8 +26,8 @@ func (h *TraceHook) Fire(entry *logrus.Entry) error { traceID := span.SpanContext().TraceID().String() spanID := span.SpanContext().SpanID().String() - entry.Data["traceID"] = traceID - entry.Data["spanID"] = spanID + entry.Data["trace_id"] = traceID + entry.Data["span_id"] = spanID return nil } diff --git a/impl/pkg/server/logging.go b/impl/pkg/server/logging.go index d70f94f7..86ab5c0c 100644 --- a/impl/pkg/server/logging.go +++ b/impl/pkg/server/logging.go @@ -8,6 +8,7 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/trace" ) // logger is the logrus logger handler amended for telemetry @@ -45,17 +46,26 @@ func logger(logger logrus.FieldLogger, notLogged ...string) gin.HandlerFunc { return } + traceID, spanID := "", "" + span := trace.SpanFromContext(c) + if span.SpanContext().IsValid() { + traceID = span.SpanContext().TraceID().String() + spanID = span.SpanContext().SpanID().String() + } + entry := logger.WithFields(logrus.Fields{ - "hostname": hostname, - "statusCode": statusCode, - "latency": latency, - "clientIP": clientIP, - "method": c.Request.Method, - "path": path, - "referer": referer, - "dataLength": dataLength, - "userAgent": clientUserAgent, - "time": time.Now().Format(time.RFC3339), + "hostname": hostname, + "status_code": statusCode, + "latency": latency, + "client_ip": clientIP, + "method": c.Request.Method, + "path": path, + "referer": referer, + "data_length": dataLength, + "user_agent": clientUserAgent, + "time": time.Now().Format(time.RFC3339), + "trace_id": traceID, + "span_id": spanID, }) if len(c.Errors) > 0 { diff --git a/impl/pkg/service/pkarr.go b/impl/pkg/service/pkarr.go index 6e095dbc..2ef86e50 100644 --- a/impl/pkg/service/pkarr.go +++ b/impl/pkg/service/pkarr.go @@ -10,6 +10,7 @@ import ( "github.com/allegro/bigcache/v3" "github.com/anacrolix/torrent/bencode" "github.com/goccy/go-json" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/tv42/zbase32" @@ -147,17 +148,23 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response if got, err := s.cache.Get(id); err == nil { var resp pkarr.Response if err = json.Unmarshal(got, &resp); err == nil { - logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved pkarr record from cache") + logrus.WithContext(ctx).WithField("record_id", id).Info("resolved pkarr record from cache") return &resp, nil } logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht") } - // next do a dht lookup - got, err := s.dht.GetFull(ctx, id) + // next do a dht lookup with a timeout of 10 seconds + getCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + got, err := s.dht.GetFull(getCtx, id) if err != nil { - // try to resolve from storage before returning and error - logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage") + if errors.Is(err, context.DeadlineExceeded) { + logrus.WithContext(ctx).WithField("record", id).Warn("dht lookup timed out, attempting to resolve from storage") + } else { + logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage") + } rawID, err := util.Z32Decode(id) if err != nil { @@ -176,8 +183,9 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response return nil, err } - logrus.WithContext(ctx).WithField("record", id).Debug("resolved pkarr record from storage") + logrus.WithContext(ctx).WithField("record", id).Info("resolved pkarr record from storage") resp := record.Response() + // add the record back to the cache for future lookups if err = s.addRecordToCache(id, record.Response()); err != nil { logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache") } @@ -203,6 +211,8 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response // add the record to cache, do it here to avoid duplicate calculations if err = s.addRecordToCache(id, resp); err != nil { logrus.WithContext(ctx).WithError(err).Errorf("failed to set pkarr record[%s] in cache", id) + } else { + logrus.WithContext(ctx).WithField("record", id).Info("added pkarr record back to cache") } return &resp, nil @@ -267,7 +277,7 @@ func (s *PkarrService) republish() { recordID := zbase32.EncodeToString(record.Key[:]) logrus.WithContext(ctx).Debugf("republishing record: %s", recordID) - putCtx, cancel := context.WithTimeout(ctx, time.Second*10) + putCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil { diff --git a/impl/pkg/telemetry/telemetry.go b/impl/pkg/telemetry/telemetry.go index ad824cbc..ec1d4247 100644 --- a/impl/pkg/telemetry/telemetry.go +++ b/impl/pkg/telemetry/telemetry.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -23,12 +24,13 @@ const ( ) var ( - traceProvider *sdktrace.TracerProvider tracer trace.Tracer - + traceProvider *sdktrace.TracerProvider meterProvider *sdkmetric.MeterProvider + propagator propagation.TextMapPropagator ) +// SetupTelemetry initializes the OpenTelemetry SDK with the appropriate exporters and propagators. func SetupTelemetry(ctx context.Context) error { r, err := resource.Merge( resource.Default(), @@ -55,14 +57,19 @@ func SetupTelemetry(ctx context.Context) error { otel.SetMeterProvider(meterProvider) // setup memory metrics - err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 30)) + err = runtime.Start(runtime.WithMeterProvider(meterProvider), runtime.WithMinimumReadMemStatsInterval(time.Second*15)) if err != nil { return err } + // setup propagator + propagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) + otel.SetTextMapPropagator(propagator) + return nil } +// Shutdown stops the telemetry providers and exporters safely. func Shutdown(ctx context.Context) { if traceProvider != nil { if err := traceProvider.Shutdown(ctx); err != nil { @@ -77,6 +84,7 @@ func Shutdown(ctx context.Context) { } } +// GetTracer returns the tracer for the application. If the tracer is not yet initialized, it will be created. func GetTracer() trace.Tracer { if tracer == nil { tracer = otel.GetTracerProvider().Tracer(scopeName, trace.WithInstrumentationVersion(config.Version))