From 6ac68985185d0e3b9a15f1ed830ab972a28a1b5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Fri, 28 Jul 2023 15:26:16 +0000 Subject: [PATCH] Emit some basic traces --- go.mod | 2 + go.sum | 3 + internal/api/api.go | 7 +- internal/batch/service.go | 4 +- internal/collection/goa.go | 4 +- internal/collection/workflow.go | 11 +++- main.go | 113 ++++++++++++++++++++++++++------ 7 files changed, 119 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 50e076a2..67ad5b45 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.8.4 go.artefactual.dev/tools v0.3.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 @@ -73,6 +74,7 @@ require ( github.com/dimfeld/httptreemux/v5 v5.5.0 // indirect github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.2.4 // indirect github.com/gogo/googleapis v1.4.1 // indirect diff --git a/go.sum b/go.sum index a8073528..cf5deacd 100644 --- a/go.sum +++ b/go.sum @@ -1123,6 +1123,7 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= @@ -2272,6 +2273,8 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.2 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0/go.mod h1:PFmBsWbldL1kiWZk9+0LBZz2brhByaGsvp6pRICMlPE= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0/go.mod h1:pcQ3MM3SWvrA71U4GDqv9UFDJ3HQsW7y5ZO3tDTlUdI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= go.opentelemetry.io/otel v1.6.0/go.mod h1:bfJD2DZVw0LBxghOTlgnlI0CV3hLDu9XF/QKOUXMTQQ= diff --git a/internal/api/api.go b/internal/api/api.go index f5555dc9..6953ce06 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -19,6 +19,8 @@ import ( "github.com/go-logr/logr" "github.com/gorilla/websocket" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + sdktrace "go.opentelemetry.io/otel/sdk/trace" goahttp "goa.design/goa/v3/http" goahttpmwr "goa.design/goa/v3/http/middleware" "goa.design/goa/v3/middleware" @@ -37,7 +39,9 @@ import ( ) func HTTPServer( - logger logr.Logger, config *Config, + logger logr.Logger, + tracerProvider *sdktrace.TracerProvider, + config *Config, pipesvc intpipe.Service, batchsvc intbatch.Service, colsvc intcol.Service, @@ -81,6 +85,7 @@ func HTTPServer( // Global middlewares. var handler http.Handler = mux + handler = otelhttp.NewHandler(handler, "internal/api", otelhttp.WithTracerProvider(tracerProvider)) handler = goahttpmwr.RequestID()(handler) handler = versionHeaderMiddleware(config.AppVersion)(handler) if config.Debug { diff --git a/internal/batch/service.go b/internal/batch/service.go index 510f24a1..b8abff95 100644 --- a/internal/batch/service.go +++ b/internal/batch/service.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" "go.artefactual.dev/tools/ref" + "go.opentelemetry.io/otel/trace" temporalapi_enums "go.temporal.io/api/enums/v1" temporalapi_serviceerror "go.temporal.io/api/serviceerror" temporalsdk_client "go.temporal.io/sdk/client" @@ -134,7 +135,8 @@ func (s *batchImpl) Hints(ctx context.Context) (*goabatch.BatchHintsResult, erro func (s *batchImpl) InitProcessingWorkflow(ctx context.Context, req *collection.ProcessingWorkflowRequest) error { req.ValidationConfig = validation.Config{} - err := collection.InitProcessingWorkflow(ctx, s.cc, s.taskQueue, req) + tr := trace.NewNoopTracerProvider().Tracer("") + err := collection.InitProcessingWorkflow(ctx, tr, s.cc, s.taskQueue, req) if err != nil { s.logger.Error(err, "Error initializing processing workflow.") } diff --git a/internal/collection/goa.go b/internal/collection/goa.go index 07eb435d..87362d98 100644 --- a/internal/collection/goa.go +++ b/internal/collection/goa.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/trace" temporalapi_common "go.temporal.io/api/common/v1" temporalapi_enums "go.temporal.io/api/enums/v1" temporalapi_serviceerror "go.temporal.io/api/serviceerror" @@ -265,7 +266,8 @@ func (w *goaWrapper) Retry(ctx context.Context, payload *goacollection.RetryPayl req.WorkflowID = *goacol.WorkflowID req.CollectionID = goacol.ID - if err := InitProcessingWorkflow(ctx, w.cc, w.taskQueue, req); err != nil { + tr := trace.NewNoopTracerProvider().Tracer("") + if err := InitProcessingWorkflow(ctx, tr, w.cc, w.taskQueue, req); err != nil { return fmt.Errorf("error starting the new workflow instance: %w", err) } diff --git a/internal/collection/workflow.go b/internal/collection/workflow.go index 81593259..0b903fbc 100644 --- a/internal/collection/workflow.go +++ b/internal/collection/workflow.go @@ -6,6 +6,8 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" temporalsdk_api_enums "go.temporal.io/api/enums/v1" temporalsdk_client "go.temporal.io/sdk/client" @@ -60,7 +62,10 @@ type ProcessingWorkflowRequest struct { RejectDuplicates bool } -func InitProcessingWorkflow(ctx context.Context, c temporalsdk_client.Client, taskQueue string, req *ProcessingWorkflowRequest) error { +func InitProcessingWorkflow(ctx context.Context, tr trace.Tracer, c temporalsdk_client.Client, taskQueue string, req *ProcessingWorkflowRequest) error { + _, span := tr.Start(ctx, "InitProcessingWorkflow") + defer span.End() + if req.WorkflowID == "" { req.WorkflowID = fmt.Sprintf("processing-workflow-%s", uuid.New().String()) } @@ -74,6 +79,10 @@ func InitProcessingWorkflow(ctx context.Context, c temporalsdk_client.Client, ta WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, } _, err := c.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req) + if err != nil { + span.SetStatus(codes.Error, "ExecuteWorkflow failed") + span.RecordError(err) + } return err } diff --git a/main.go b/main.go index 00432060..51deaeb9 100644 --- a/main.go +++ b/main.go @@ -18,10 +18,19 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" "go.artefactual.dev/tools/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" temporalsdk_activity "go.temporal.io/sdk/activity" temporalsdk_client "go.temporal.io/sdk/client" temporalsdk_worker "go.temporal.io/sdk/worker" temporalsdk_workflow "go.temporal.io/sdk/workflow" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/artefactual-labs/enduro/internal/api" "github.com/artefactual-labs/enduro/internal/batch" @@ -92,12 +101,61 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Set up tracing. + var tracerProvider *sdktrace.TracerProvider + var tracer trace.Tracer + { + conn, err := grpc.DialContext(ctx, + config.Telemetry.Traces.Addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock()) + if err != nil { + logger.Error(err, "Error connecting to telemetry agent.") + os.Exit(1) + } + + exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + logger.Error(err, "Error creating gRPC telemetry exporter.") + os.Exit(1) + } + + resource, _ := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(appName), + semconv.ServiceVersion(version), + ), + ) + + tracerProvider = sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithResource(resource), + sdktrace.WithBatcher(exporter), + ) + defer func() { + if err := tracerProvider.Shutdown(ctx); err != nil { + logger.Error(err, "Failed to shut down tracing provider.") + } + }() + + tracer = tracerProvider.Tracer(appName) + } + database, err := db.Connect(config.Database.DSN) if err != nil { logger.Error(err, "Database configuration failed.") os.Exit(1) } - _ = database.Ping() + _, span := tracer.Start(ctx, "db-ping") + span.SetAttributes(attribute.String("db.driver", "mysql")) + if err := database.Ping(); err != nil { + span.SetStatus(codes.Error, "ping failed") + span.RecordError(err) + } + span.AddEvent("Connected!") + span.End() temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{ Namespace: config.Temporal.Namespace, @@ -152,7 +210,7 @@ func main() { g.Add( func() error { - srv = api.HTTPServer(logger, &config.API, pipesvc, batchsvc, colsvc) + srv = api.HTTPServer(logger, tracerProvider, &config.API, pipesvc, batchsvc, colsvc) return srv.ListenAndServe() }, func(err error) { @@ -166,8 +224,8 @@ func main() { // Watchers, where each watcher is a group actor. { for _, w := range wsvc.Watchers() { + w := w done := make(chan struct{}) - cur := w g.Add( func() error { for { @@ -175,30 +233,36 @@ func main() { case <-done: return nil default: - event, err := cur.Watch(ctx) + event, err := w.Watch(ctx) if err != nil { if !errors.Is(err, watcher.ErrWatchTimeout) { - logger.Error(err, "Error monitoring watcher interface.", "watcher", cur) + logger.Error(err, "Error monitoring watcher interface.", "watcher", w) } continue } + ctx, span := tracer.Start(ctx, "Watcher") + span.SetAttributes( + attribute.String("watcher", event.WatcherName), + attribute.String("bucket", event.Bucket), + attribute.String("key", event.Key), + attribute.Bool("dir", event.IsDir), + ) logger.V(1).Info("Starting new workflow", "watcher", event.WatcherName, "bucket", event.Bucket, "key", event.Key, "dir", event.IsDir) - go func() { - req := collection.ProcessingWorkflowRequest{ - WatcherName: event.WatcherName, - PipelineNames: event.PipelineName, - RetentionPeriod: event.RetentionPeriod, - CompletedDir: event.CompletedDir, - StripTopLevelDir: event.StripTopLevelDir, - RejectDuplicates: event.RejectDuplicates, - Key: event.Key, - IsDir: event.IsDir, - ValidationConfig: config.Validation, - } - if err := collection.InitProcessingWorkflow(ctx, temporalClient, config.Temporal.TaskQueue, &req); err != nil { - logger.Error(err, "Error initializing processing workflow.") - } - }() + req := collection.ProcessingWorkflowRequest{ + WatcherName: event.WatcherName, + PipelineNames: event.PipelineName, + RetentionPeriod: event.RetentionPeriod, + CompletedDir: event.CompletedDir, + StripTopLevelDir: event.StripTopLevelDir, + RejectDuplicates: event.RejectDuplicates, + Key: event.Key, + IsDir: event.IsDir, + ValidationConfig: config.Validation, + } + if err := collection.InitProcessingWorkflow(ctx, tracer, temporalClient, config.Temporal.TaskQueue, &req); err != nil { + logger.Error(err, "Error initializing processing workflow.") + } + span.End() } } }, @@ -346,6 +410,7 @@ type configuration struct { Watcher watcher.Config Pipeline []pipeline.Config Validation validation.Config + Telemetry TelemetryConfig // This is a workaround for client-specific functionality. // Simple mechanism to support an arbitrary number of hooks and parameters. @@ -393,3 +458,9 @@ func readConfig(v *viper.Viper, config *configuration, configFile string) (found return found, nil } + +type TelemetryConfig struct { + Traces struct { + Addr string + } +}