Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe committed Apr 12, 2024
1 parent 9526dd8 commit 3e4a15b
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 28 deletions.
2 changes: 1 addition & 1 deletion impl/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func run() error {
case sig := <-shutdown:
logrus.WithContext(ctx).WithField("signal", sig.String()).Info("shutdown signal received")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err = s.Shutdown(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions impl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func GetDefaultConfig() Config {
BootstrapPeers: GetDefaultBootstrapPeers(),
},
PkarrConfig: PkarrServiceConfig{
RepublishCRON: "0 */2 * * *",
RepublishCRON: "0 */3 * * *",
CacheTTLSeconds: 600,
CacheSizeLimitMB: 500,
CacheSizeLimitMB: 1000,
},
Log: LogConfig{
Level: logrus.DebugLevel.String(),
Expand Down
2 changes: 1 addition & 1 deletion impl/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht.
"router.utorrent.com:6881", "router.nuh.dev:6881"]

[pkarr]
republish_cron = "0 */2 * * *" # every 2 hours
republish_cron = "0 */3 * * *" # every 3 hours
cache_ttl_seconds = 600 # 10 minutes
cache_size_limit_mb = 1000 # 1000 MB
2 changes: 1 addition & 1 deletion impl/integrationtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func doRequest(ctx context.Context, req *http.Request) error {
"url": req.URL,
})

ctx, done := context.WithTimeout(ctx, time.Second*10)
ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()

req = req.WithContext(ctx)
Expand Down
10 changes: 9 additions & 1 deletion impl/internal/dht/getput.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/dht/v2/traversal"

"github.com/TBD54566975/did-dht-method/pkg/telemetry"
)

// Copied from https://github.com/anacrolix/dht/blob/master/exts/getput/getput.go and modified
Expand All @@ -38,7 +40,7 @@ func startGetTraversal(
Alpha: 15,
Target: target,
DoQuery: func(ctx context.Context, addr krpc.NodeAddr) traversal.QueryResult {
queryCtx, cancel := context.WithTimeout(ctx, time.Second*8)
queryCtx, cancel := context.WithTimeout(ctx, 8*time.Second)
defer cancel()

res := s.Get(queryCtx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{})
Expand Down Expand Up @@ -101,6 +103,9 @@ func Get(
) (
ret FullGetResult, stats *traversal.Stats, err error,
) {
ctx, span := telemetry.GetTracer().Start(ctx, "DHT.Get")
defer span.End()

vChan, op, err := startGetTraversal(ctx, target, s, seq, salt)
if err != nil {
return
Expand Down Expand Up @@ -139,6 +144,9 @@ func Put(
) (
stats *traversal.Stats, err error,
) {
ctx, span := telemetry.GetTracer().Start(ctx, "DHT.Put")
defer span.End()

vChan, op, err := startGetTraversal(ctx, target, s,
// When we do a get traversal for a put, we don't care what seq the peers have?
nil,
Expand Down
4 changes: 2 additions & 2 deletions impl/internal/util/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func (h *TraceHook) Fire(entry *logrus.Entry) error {
traceID := span.SpanContext().TraceID().String()
spanID := span.SpanContext().SpanID().String()

entry.Data["traceID"] = traceID
entry.Data["spanID"] = spanID
entry.Data["trace_id"] = traceID
entry.Data["span_id"] = spanID

return nil
}
30 changes: 20 additions & 10 deletions impl/pkg/server/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

// logger is the logrus logger handler amended for telemetry
Expand Down Expand Up @@ -45,17 +46,26 @@ func logger(logger logrus.FieldLogger, notLogged ...string) gin.HandlerFunc {
return
}

traceID, spanID := "", ""
span := trace.SpanFromContext(c)
if span.SpanContext().IsValid() {
traceID = span.SpanContext().TraceID().String()
spanID = span.SpanContext().SpanID().String()
}

entry := logger.WithFields(logrus.Fields{
"hostname": hostname,
"statusCode": statusCode,
"latency": latency,
"clientIP": clientIP,
"method": c.Request.Method,
"path": path,
"referer": referer,
"dataLength": dataLength,
"userAgent": clientUserAgent,
"time": time.Now().Format(time.RFC3339),
"hostname": hostname,
"status_code": statusCode,
"latency": latency,
"client_ip": clientIP,
"method": c.Request.Method,
"path": path,
"referer": referer,
"data_length": dataLength,
"user_agent": clientUserAgent,
"time": time.Now().Format(time.RFC3339),
"trace_id": traceID,
"span_id": spanID,
})

if len(c.Errors) > 0 {
Expand Down
24 changes: 17 additions & 7 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/allegro/bigcache/v3"
"github.com/anacrolix/torrent/bencode"
"github.com/goccy/go-json"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tv42/zbase32"

Expand Down Expand Up @@ -147,17 +148,23 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
if got, err := s.cache.Get(id); err == nil {
var resp pkarr.Response
if err = json.Unmarshal(got, &resp); err == nil {
logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved pkarr record from cache")
logrus.WithContext(ctx).WithField("record_id", id).Info("resolved pkarr record from cache")
return &resp, nil
}
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht")
}

// next do a dht lookup
got, err := s.dht.GetFull(ctx, id)
// next do a dht lookup with a timeout of 10 seconds
getCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

got, err := s.dht.GetFull(getCtx, id)
if err != nil {
// try to resolve from storage before returning and error
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage")
if errors.Is(err, context.DeadlineExceeded) {
logrus.WithContext(ctx).WithField("record", id).Warn("dht lookup timed out, attempting to resolve from storage")
} else {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage")
}

rawID, err := util.Z32Decode(id)
if err != nil {
Expand All @@ -176,8 +183,9 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
return nil, err
}

logrus.WithContext(ctx).WithField("record", id).Debug("resolved pkarr record from storage")
logrus.WithContext(ctx).WithField("record", id).Info("resolved pkarr record from storage")
resp := record.Response()
// add the record back to the cache for future lookups
if err = s.addRecordToCache(id, record.Response()); err != nil {
logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache")
}
Expand All @@ -203,6 +211,8 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
// add the record to cache, do it here to avoid duplicate calculations
if err = s.addRecordToCache(id, resp); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("failed to set pkarr record[%s] in cache", id)
} else {
logrus.WithContext(ctx).WithField("record", id).Info("added pkarr record back to cache")
}

return &resp, nil
Expand Down Expand Up @@ -267,7 +277,7 @@ func (s *PkarrService) republish() {
recordID := zbase32.EncodeToString(record.Key[:])
logrus.WithContext(ctx).Debugf("republishing record: %s", recordID)

putCtx, cancel := context.WithTimeout(ctx, time.Second*10)
putCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil {
Expand Down
14 changes: 11 additions & 3 deletions impl/pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -23,12 +24,13 @@ const (
)

var (
traceProvider *sdktrace.TracerProvider
tracer trace.Tracer

traceProvider *sdktrace.TracerProvider
meterProvider *sdkmetric.MeterProvider
propagator propagation.TextMapPropagator
)

// SetupTelemetry initializes the OpenTelemetry SDK with the appropriate exporters and propagators.
func SetupTelemetry(ctx context.Context) error {
r, err := resource.Merge(
resource.Default(),
Expand All @@ -55,14 +57,19 @@ func SetupTelemetry(ctx context.Context) error {
otel.SetMeterProvider(meterProvider)

// setup memory metrics
err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 30))
err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 15))
if err != nil {
return err
}

// setup propagator
propagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
otel.SetTextMapPropagator(propagator)

return nil
}

// Shutdown stops the telemetry providers and exporters safely.
func Shutdown(ctx context.Context) {
if traceProvider != nil {
if err := traceProvider.Shutdown(ctx); err != nil {
Expand All @@ -77,6 +84,7 @@ func Shutdown(ctx context.Context) {
}
}

// GetTracer returns the tracer for the application. If the tracer is not yet initialized, it will be created.
func GetTracer() trace.Tracer {
if tracer == nil {
tracer = otel.GetTracerProvider().Tracer(scopeName, trace.WithInstrumentationVersion(config.Version))
Expand Down

0 comments on commit 3e4a15b

Please sign in to comment.