Skip to content

Commit

Permalink
Emit some basic traces
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Jul 31, 2023
1 parent ed3bbce commit 6ac6898
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 25 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
go.artefactual.dev/tools v0.3.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
Expand Down Expand Up @@ -73,6 +74,7 @@ require (
github.com/dimfeld/httptreemux/v5 v5.5.0 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.2.4 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,7 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.2/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
Expand Down Expand Up @@ -2272,6 +2273,8 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.2
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.31.0/go.mod h1:PFmBsWbldL1kiWZk9+0LBZz2brhByaGsvp6pRICMlPE=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0/go.mod h1:pcQ3MM3SWvrA71U4GDqv9UFDJ3HQsW7y5ZO3tDTlUdI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs=
go.opentelemetry.io/otel v1.6.0/go.mod h1:bfJD2DZVw0LBxghOTlgnlI0CV3hLDu9XF/QKOUXMTQQ=
Expand Down
7 changes: 6 additions & 1 deletion internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/go-logr/logr"
"github.com/gorilla/websocket"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
goahttp "goa.design/goa/v3/http"
goahttpmwr "goa.design/goa/v3/http/middleware"
"goa.design/goa/v3/middleware"
Expand All @@ -37,7 +39,9 @@ import (
)

func HTTPServer(
logger logr.Logger, config *Config,
logger logr.Logger,
tracerProvider *sdktrace.TracerProvider,
config *Config,
pipesvc intpipe.Service,
batchsvc intbatch.Service,
colsvc intcol.Service,
Expand Down Expand Up @@ -81,6 +85,7 @@ func HTTPServer(

// Global middlewares.
var handler http.Handler = mux
handler = otelhttp.NewHandler(handler, "internal/api", otelhttp.WithTracerProvider(tracerProvider))

Check warning on line 88 in internal/api/api.go

View check run for this annotation

Codecov / codecov/patch

internal/api/api.go#L88

Added line #L88 was not covered by tests
handler = goahttpmwr.RequestID()(handler)
handler = versionHeaderMiddleware(config.AppVersion)(handler)
if config.Debug {
Expand Down
4 changes: 3 additions & 1 deletion internal/batch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-logr/logr"
"go.artefactual.dev/tools/ref"
"go.opentelemetry.io/otel/trace"
temporalapi_enums "go.temporal.io/api/enums/v1"
temporalapi_serviceerror "go.temporal.io/api/serviceerror"
temporalsdk_client "go.temporal.io/sdk/client"
Expand Down Expand Up @@ -134,7 +135,8 @@ func (s *batchImpl) Hints(ctx context.Context) (*goabatch.BatchHintsResult, erro

func (s *batchImpl) InitProcessingWorkflow(ctx context.Context, req *collection.ProcessingWorkflowRequest) error {
req.ValidationConfig = validation.Config{}
err := collection.InitProcessingWorkflow(ctx, s.cc, s.taskQueue, req)
tr := trace.NewNoopTracerProvider().Tracer("")
err := collection.InitProcessingWorkflow(ctx, tr, s.cc, s.taskQueue, req)
if err != nil {
s.logger.Error(err, "Error initializing processing workflow.")
}
Expand Down
4 changes: 3 additions & 1 deletion internal/collection/goa.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/otel/trace"
temporalapi_common "go.temporal.io/api/common/v1"
temporalapi_enums "go.temporal.io/api/enums/v1"
temporalapi_serviceerror "go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -265,7 +266,8 @@ func (w *goaWrapper) Retry(ctx context.Context, payload *goacollection.RetryPayl

req.WorkflowID = *goacol.WorkflowID
req.CollectionID = goacol.ID
if err := InitProcessingWorkflow(ctx, w.cc, w.taskQueue, req); err != nil {
tr := trace.NewNoopTracerProvider().Tracer("")
if err := InitProcessingWorkflow(ctx, tr, w.cc, w.taskQueue, req); err != nil {

Check warning on line 270 in internal/collection/goa.go

View check run for this annotation

Codecov / codecov/patch

internal/collection/goa.go#L269-L270

Added lines #L269 - L270 were not covered by tests
return fmt.Errorf("error starting the new workflow instance: %w", err)
}

Expand Down
11 changes: 10 additions & 1 deletion internal/collection/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/google/uuid"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

Expand Down Expand Up @@ -60,7 +62,10 @@ type ProcessingWorkflowRequest struct {
RejectDuplicates bool
}

func InitProcessingWorkflow(ctx context.Context, c temporalsdk_client.Client, taskQueue string, req *ProcessingWorkflowRequest) error {
func InitProcessingWorkflow(ctx context.Context, tr trace.Tracer, c temporalsdk_client.Client, taskQueue string, req *ProcessingWorkflowRequest) error {
_, span := tr.Start(ctx, "InitProcessingWorkflow")
defer span.End()

if req.WorkflowID == "" {
req.WorkflowID = fmt.Sprintf("processing-workflow-%s", uuid.New().String())
}
Expand All @@ -74,6 +79,10 @@ func InitProcessingWorkflow(ctx context.Context, c temporalsdk_client.Client, ta
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
_, err := c.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req)
if err != nil {
span.SetStatus(codes.Error, "ExecuteWorkflow failed")
span.RecordError(err)
}

return err
}
113 changes: 92 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ import (
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.artefactual.dev/tools/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
temporalsdk_activity "go.temporal.io/sdk/activity"
temporalsdk_client "go.temporal.io/sdk/client"
temporalsdk_worker "go.temporal.io/sdk/worker"
temporalsdk_workflow "go.temporal.io/sdk/workflow"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/artefactual-labs/enduro/internal/api"
"github.com/artefactual-labs/enduro/internal/batch"
Expand Down Expand Up @@ -92,12 +101,61 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set up tracing.
var tracerProvider *sdktrace.TracerProvider
var tracer trace.Tracer
{
conn, err := grpc.DialContext(ctx,
config.Telemetry.Traces.Addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
if err != nil {
logger.Error(err, "Error connecting to telemetry agent.")
os.Exit(1)
}

exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
logger.Error(err, "Error creating gRPC telemetry exporter.")
os.Exit(1)
}

resource, _ := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(appName),
semconv.ServiceVersion(version),
),
)

tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(resource),
sdktrace.WithBatcher(exporter),
)
defer func() {
if err := tracerProvider.Shutdown(ctx); err != nil {
logger.Error(err, "Failed to shut down tracing provider.")
}
}()

tracer = tracerProvider.Tracer(appName)
}

database, err := db.Connect(config.Database.DSN)
if err != nil {
logger.Error(err, "Database configuration failed.")
os.Exit(1)
}
_ = database.Ping()
_, span := tracer.Start(ctx, "db-ping")
span.SetAttributes(attribute.String("db.driver", "mysql"))
if err := database.Ping(); err != nil {
span.SetStatus(codes.Error, "ping failed")
span.RecordError(err)
}
span.AddEvent("Connected!")
span.End()

temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{
Namespace: config.Temporal.Namespace,
Expand Down Expand Up @@ -152,7 +210,7 @@ func main() {

g.Add(
func() error {
srv = api.HTTPServer(logger, &config.API, pipesvc, batchsvc, colsvc)
srv = api.HTTPServer(logger, tracerProvider, &config.API, pipesvc, batchsvc, colsvc)
return srv.ListenAndServe()
},
func(err error) {
Expand All @@ -166,39 +224,45 @@ func main() {
// Watchers, where each watcher is a group actor.
{
for _, w := range wsvc.Watchers() {
w := w
done := make(chan struct{})
cur := w
g.Add(
func() error {
for {
select {
case <-done:
return nil
default:
event, err := cur.Watch(ctx)
event, err := w.Watch(ctx)
if err != nil {
if !errors.Is(err, watcher.ErrWatchTimeout) {
logger.Error(err, "Error monitoring watcher interface.", "watcher", cur)
logger.Error(err, "Error monitoring watcher interface.", "watcher", w)
}
continue
}
ctx, span := tracer.Start(ctx, "Watcher")
span.SetAttributes(
attribute.String("watcher", event.WatcherName),
attribute.String("bucket", event.Bucket),
attribute.String("key", event.Key),
attribute.Bool("dir", event.IsDir),
)
logger.V(1).Info("Starting new workflow", "watcher", event.WatcherName, "bucket", event.Bucket, "key", event.Key, "dir", event.IsDir)
go func() {
req := collection.ProcessingWorkflowRequest{
WatcherName: event.WatcherName,
PipelineNames: event.PipelineName,
RetentionPeriod: event.RetentionPeriod,
CompletedDir: event.CompletedDir,
StripTopLevelDir: event.StripTopLevelDir,
RejectDuplicates: event.RejectDuplicates,
Key: event.Key,
IsDir: event.IsDir,
ValidationConfig: config.Validation,
}
if err := collection.InitProcessingWorkflow(ctx, temporalClient, config.Temporal.TaskQueue, &req); err != nil {
logger.Error(err, "Error initializing processing workflow.")
}
}()
req := collection.ProcessingWorkflowRequest{
WatcherName: event.WatcherName,
PipelineNames: event.PipelineName,
RetentionPeriod: event.RetentionPeriod,
CompletedDir: event.CompletedDir,
StripTopLevelDir: event.StripTopLevelDir,
RejectDuplicates: event.RejectDuplicates,
Key: event.Key,
IsDir: event.IsDir,
ValidationConfig: config.Validation,
}
if err := collection.InitProcessingWorkflow(ctx, tracer, temporalClient, config.Temporal.TaskQueue, &req); err != nil {
logger.Error(err, "Error initializing processing workflow.")
}
span.End()
}
}
},
Expand Down Expand Up @@ -346,6 +410,7 @@ type configuration struct {
Watcher watcher.Config
Pipeline []pipeline.Config
Validation validation.Config
Telemetry TelemetryConfig

// This is a workaround for client-specific functionality.
// Simple mechanism to support an arbitrary number of hooks and parameters.
Expand Down Expand Up @@ -393,3 +458,9 @@ func readConfig(v *viper.Viper, config *configuration, configFile string) (found

return found, nil
}

type TelemetryConfig struct {
Traces struct {
Addr string
}
}

0 comments on commit 6ac6898

Please sign in to comment.