Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
backend: add otel tracers
Browse files Browse the repository at this point in the history
- add tracer to apid
- add tracer to etcd store
- add tracer to etcdstore v2
- add tracer to event_store
- add tracers to schedulerd
- add tracers to eventd
- add tracers to cache
- add tracer to health_store
- add tracer to pipeline
- add traces to ringv2
- add span to ringv2 hasTrigger

Signed-off-by: Vladimir Ermakov <[email protected]>
vooon committed Jul 8, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent d6efe91 commit 201b1e7
Showing 30 changed files with 426 additions and 46 deletions.
3 changes: 3 additions & 0 deletions backend/apid/apid.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"

"github.com/sensu/sensu-go/backend/apid/actions"
"github.com/sensu/sensu-go/backend/apid/graphql"
@@ -135,6 +136,8 @@ func NewRouter() *mux.Router {
// Register a default handler when no routes match
router.NotFoundHandler = middlewares.SimpleLogger{}.Then(http.HandlerFunc(notFoundHandler))

router.Use(otelmux.Middleware("sensu-backend-apid"))

return router
}

4 changes: 4 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ import (
"github.com/sensu/sensu-go/backend/tessend"
"github.com/sensu/sensu-go/rpc"
"github.com/sensu/sensu-go/system"
otelutil "github.com/sensu/sensu-go/util/otel"
"github.com/sensu/sensu-go/util/retry"
)

@@ -543,6 +544,9 @@ func (b *Backend) RunContext() context.Context {
// RunWithInitializer is like Run but accepts an initialization function to use
// for initialization, instead of using the default Initialize().
func (b *Backend) RunWithInitializer(initialize func(context.Context, *Config) (*Backend, error)) error {
tracerCf, _ := otelutil.InitTracer(b.cfg.TracingEnabled, b.cfg.TracingAgentAddress, b.cfg.TracingServiceNameKey)
defer tracerCf()

// we allow inErrChan to leak to avoid panics from other
// goroutines writing errors to either after shutdown has been initiated.
backoff := retry.ExponentialBackoff{
17 changes: 17 additions & 0 deletions backend/cmd/start.go
Original file line number Diff line number Diff line change
@@ -111,6 +111,11 @@ const (
// URLs to advertise to the rest of the cluster
defaultEtcdAdvertiseClientURL = "http://localhost:2379"

// OTEL
flagTracingEnabled = "tracing-enabled"
flagTracingAgentAddress = "tracing-address"
flagTracingServiceNameKey = "tracing-name-key"

// Start command usage template
startUsageTemplate = `Usage:{{if .Runnable}}
{{.UseLine}}{{end}}{{if .HasAvailableSubCommands}}
@@ -221,6 +226,10 @@ func StartCommand(initialize InitializeFunc) *cobra.Command {
NoEmbedEtcd: viper.GetBool(flagNoEmbedEtcd),
Labels: viper.GetStringMapString(flagLabels),
Annotations: viper.GetStringMapString(flagAnnotations),

TracingEnabled: viper.GetBool(flagTracingEnabled),
TracingAgentAddress: viper.GetString(flagTracingAgentAddress),
TracingServiceNameKey: viper.GetString(flagTracingServiceNameKey),
}

if flag := cmd.Flags().Lookup(flagLabels); flag != nil && flag.Changed {
@@ -392,6 +401,10 @@ func handleConfig(cmd *cobra.Command, arguments []string, server bool) error {
viper.SetDefault(flagNoEmbedEtcd, false)
}

viper.SetDefault(flagTracingEnabled, false)
viper.SetDefault(flagTracingAgentAddress, "localhost:4317")
viper.SetDefault(flagTracingServiceNameKey, defaultEtcdName)

// Merge in flag set so that it appears in command usage
flags := flagSet(server)
cmd.Flags().AddFlagSet(flags)
@@ -532,6 +545,10 @@ func flagSet(server bool) *pflag.FlagSet {
_ = flagSet.SetAnnotation(flagEtcdPeerTrustedCAFile, "categories", []string{"store"})
flagSet.String(flagEtcdNodeName, viper.GetString(flagEtcdNodeName), "name for this etcd node")
_ = flagSet.SetAnnotation(flagEtcdNodeName, "categories", []string{"store"})

flagSet.Bool(flagTracingEnabled, viper.GetBool(flagTracingEnabled), "enable OTEL")
flagSet.String(flagTracingAgentAddress, viper.GetString(flagTracingAgentAddress), "otlp collector host:port")
flagSet.String(flagTracingServiceNameKey, viper.GetString(flagTracingServiceNameKey), "tracing service name key")
}

flagSet.SetOutput(ioutil.Discard)
4 changes: 4 additions & 0 deletions backend/config.go
Original file line number Diff line number Diff line change
@@ -107,4 +107,8 @@ type Config struct {

LogLevel string
EtcdLogLevel string

TracingEnabled bool
TracingAgentAddress string
TracingServiceNameKey string
}
14 changes: 11 additions & 3 deletions backend/eventd/entity.go
Original file line number Diff line number Diff line change
@@ -8,11 +8,15 @@ import (
"github.com/sensu/sensu-go/backend/store"
storev2 "github.com/sensu/sensu-go/backend/store/v2"
"github.com/sensu/sensu-go/backend/store/v2/wrap"
"go.opentelemetry.io/otel/attribute"
)

// createProxyEntity creates a proxy entity for the given event if the entity
// does not exist already and returns the entity created
func createProxyEntity(event *corev2.Event, s storev2.Interface) error {
func createProxyEntity(ctx context.Context, event *corev2.Event, s storev2.Interface) error {
ctx, span := tracer.Start(ctx, "backend.eventd/createProxyEntity")
defer span.End()

entityName := event.Entity.Name
namespace := event.Entity.Namespace

@@ -23,15 +27,19 @@ func createProxyEntity(event *corev2.Event, s storev2.Interface) error {
return nil
}

span.SetAttributes(
attribute.String("entity.name", entityName),
)

// Determine if the entity exists
//NOTE(ccressent): there is no timeout for this operation?
entityMeta := corev2.NewObjectMeta(entityName, namespace)

state := corev3.NewEntityState(namespace, entityName)
config := corev3.NewEntityConfig(namespace, entityName)

configReq := storev2.NewResourceRequestFromResource(context.Background(), config)
stateReq := storev2.NewResourceRequestFromResource(context.Background(), state)
configReq := storev2.NewResourceRequestFromResource(ctx, config)
stateReq := storev2.NewResourceRequestFromResource(ctx, state)

var (
wState, wConfig storev2.Wrapper
2 changes: 1 addition & 1 deletion backend/eventd/entity_test.go
Original file line number Diff line number Diff line change
@@ -230,7 +230,7 @@ func TestCreateProxyEntity(t *testing.T) {
}
defer store.AssertExpectations(t)

if err := createProxyEntity(tt.event, store); (err != nil) != tt.wantErr {
if err := createProxyEntity(context.Background(), tt.event, store); (err != nil) != tt.wantErr {
t.Errorf("createProxyEntity() error = %v, wantErr %v", err, tt.wantErr)
return
}
26 changes: 23 additions & 3 deletions backend/eventd/eventd.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/otel/attribute"

corev2 "github.com/sensu/sensu-go/api/core/v2"
corev3 "github.com/sensu/sensu-go/api/core/v3"
@@ -322,6 +323,9 @@ func eventKey(event *corev2.Event) string {
}

func (e *Eventd) handleMessage(msg interface{}) error {
ctx, span := tracer.Start(context.Background(), "backend.eventd/handleMessage")
defer span.End()

then := time.Now()
defer func() {
duration := time.Since(then)
@@ -350,12 +354,18 @@ func (e *Eventd) handleMessage(msg interface{}) error {
return e.bus.Publish(messaging.TopicEvent, event)
}

ctx := context.WithValue(context.Background(), corev2.NamespaceKey, event.Entity.Namespace)
span.SetAttributes(
attribute.String("check.name", event.Check.Name),
attribute.String("entity.name", event.Entity.Name),
)

ctx = context.WithValue(ctx, corev2.NamespaceKey, event.Entity.Namespace)

// Create a proxy entity if required and update the event's entity with it,
// but only if the event's entity is not an agent.
if err := createProxyEntity(event, e.store); err != nil {
if err := createProxyEntity(ctx, event, e.store); err != nil {
EventsProcessed.WithLabelValues(EventsProcessedLabelError, EventsProcessedTypeLabelCheck).Inc()
span.RecordError(err)
return err
}

@@ -369,6 +379,7 @@ func (e *Eventd) handleMessage(msg interface{}) error {
event, prevEvent, err := e.eventStore.UpdateEvent(ctx, event)
if err != nil {
EventsProcessed.WithLabelValues(EventsProcessedLabelError, EventsProcessedTypeLabelCheck).Inc()
span.RecordError(err)
return err
}

@@ -427,6 +438,9 @@ func (e *Eventd) alive(key string, prev liveness.State, leader bool) (bury bool)
}

func (e *Eventd) dead(key string, prev liveness.State, leader bool) (bury bool) {
ctx, span := tracer.Start(context.Background(), "backend.eventd/dead")
defer span.End()

if e.ctx.Err() != nil {
return false
}
@@ -454,7 +468,7 @@ func (e *Eventd) dead(key string, prev liveness.State, leader bool) (bury bool)
return true
}

ctx := store.NamespaceContext(context.Background(), namespace)
ctx = store.NamespaceContext(ctx, namespace)
// TODO(eric): make this configurable? Or dynamic based on some property?
// 120s seems like a reasonable, it not somewhat large, timeout for check TTL processing.
ctx, cancel := context.WithTimeout(ctx, e.storeTimeout)
@@ -533,6 +547,9 @@ func parseKey(key string) (namespace, check, entity string, err error) {
// handleFailure creates a check event with a warn status and publishes it to
// TopicEvent.
func (e *Eventd) handleFailure(ctx context.Context, event *corev2.Event) error {
ctx, span := tracer.Start(ctx, "backend.eventd/handleFailure")
defer span.End()

// don't update the event with ttl output for keepalives,
// there is a different mechanism for that
if event.Check.Name == keepalived.KeepaliveCheckName {
@@ -563,6 +580,9 @@ func (e *Eventd) handleFailure(ctx context.Context, event *corev2.Event) error {
}

func (e *Eventd) createFailedCheckEvent(ctx context.Context, event *corev2.Event) (*corev2.Event, error) {
ctx, span := tracer.Start(ctx, "backend.eventd/createFailedCheckEvent")
defer span.End()

if !event.HasCheck() {
return nil, errors.New("event does not contain a check")
}
5 changes: 5 additions & 0 deletions backend/eventd/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package eventd

import "go.opentelemetry.io/otel"

var tracer = otel.Tracer("backend/eventd")
20 changes: 16 additions & 4 deletions backend/pipeline/filter.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@ import (

// Returns true if the event should be filtered/denied.
func evaluateEventFilter(ctx context.Context, event *corev2.Event, filter *corev2.EventFilter, assets asset.RuntimeAssetSet) bool {
ctx, span := tracer.Start(ctx, "backend.pipeline/evaluateEventFilter")
defer span.End()

// Redact the entity to avoid leaking sensitive information
event.Entity = event.Entity.GetRedactedEntity()

@@ -109,7 +112,10 @@ func evaluateEventFilter(ctx context.Context, event *corev2.Event, filter *corev
// FilterEvent filters a Sensu event, determining if it will continue through
// the Sensu pipeline. Returns the filter's name if the event was filtered and
// any error encountered
func (p *Pipeline) FilterEvent(handler *corev2.Handler, event *corev2.Event) (string, error) {
func (p *Pipeline) FilterEvent(gctx context.Context, handler *corev2.Handler, event *corev2.Event) (string, error) {
gctx, span := tracer.Start(gctx, "backend.pipeline/FilterEvent")
defer span.End()

// Prepare the logging
fields := utillogging.EventFields(event, false)
fields["handler"] = handler.Name
@@ -140,11 +146,12 @@ func (p *Pipeline) FilterEvent(handler *corev2.Handler, event *corev2.Event) (st
}
default:
// Retrieve the filter from the store with its name
ctx := corev2.SetContextFromResource(context.Background(), event.Entity)
ctx := corev2.SetContextFromResource(gctx, event.Entity)
tctx, cancel := context.WithTimeout(ctx, p.storeTimeout)
filter, err := p.store.GetEventFilterByName(tctx, filterName)
cancel()
if err != nil {
span.RecordError(err)
logger.WithFields(fields).WithError(err).
Warning("could not retrieve filter")
return "", err
@@ -154,10 +161,11 @@ func (p *Pipeline) FilterEvent(handler *corev2.Handler, event *corev2.Event) (st
// Execute the filter, evaluating each of its
// expressions against the event. The event is rejected
// if the product of all expressions is true.
ctx := corev2.SetContextFromResource(context.Background(), filter)
ctx := corev2.SetContextFromResource(gctx, filter)
matchedAssets := asset.GetAssets(ctx, p.store, filter.RuntimeAssets)
assets, err := asset.GetAll(context.TODO(), p.assetGetter, matchedAssets)
assets, err := asset.GetAll(gctx, p.assetGetter, matchedAssets)
if err != nil {
span.RecordError(err)
logger.WithFields(fields).WithError(err).Error("failed to retrieve assets for filter")
if _, ok := err.(*store.ErrInternal); ok {
// Fatal error
@@ -175,6 +183,7 @@ func (p *Pipeline) FilterEvent(handler *corev2.Handler, event *corev2.Event) (st
// If the filter didn't exist, it might be an extension filter
ext, err := p.store.GetExtension(ctx, filterName)
if err != nil {
span.RecordError(err)
logger.WithFields(fields).WithError(err).
Warning("could not retrieve filter")
if _, ok := err.(*store.ErrInternal); ok {
@@ -186,17 +195,20 @@ func (p *Pipeline) FilterEvent(handler *corev2.Handler, event *corev2.Event) (st

executor, err := p.extensionExecutor(ext)
if err != nil {
span.RecordError(err)
logger.WithFields(fields).WithError(err).
Error("could not execute filter")
continue
}
defer func() {
if err := executor.Close(); err != nil {
span.RecordError(err)
logger.WithError(err).Debug("error closing grpc client")
}
}()
filtered, err := executor.FilterEvent(event)
if err != nil {
span.RecordError(err)
logger.WithFields(fields).WithError(err).
Error("could not execute filter")
continue
5 changes: 3 additions & 2 deletions backend/pipeline/filter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"context"
"testing"
"time"

@@ -209,7 +210,7 @@ func TestPipelineFilter(t *testing.T) {
Metrics: tc.metrics,
}

f, _ := p.FilterEvent(handler, event)
f, _ := p.FilterEvent(context.TODO(), handler, event)
assert.Equal(t, tc.expectedFilter, f)
})
}
@@ -303,7 +304,7 @@ func TestPipelineWhenFilter(t *testing.T) {
Filters: []string{tc.filterName},
}

f, _ := p.FilterEvent(handler, event)
f, _ := p.FilterEvent(context.TODO(), handler, event)
assert.Equal(t, tc.expectedFilter, f)
})
}
8 changes: 4 additions & 4 deletions backend/pipeline/handle_test.go
Original file line number Diff line number Diff line change
@@ -211,7 +211,7 @@ func TestPipelinePipeHandler(t *testing.T) {
event := corev2.FixtureEvent("test", "test")
eventData, _ := json.Marshal(event)

handlerExec, err := p.pipeHandler(handler, event, eventData)
handlerExec, err := p.pipeHandler(context.TODO(), handler, event, eventData)

assert.NoError(t, err)
assert.Equal(t, string(eventData[:]), handlerExec.Output)
@@ -268,7 +268,7 @@ func TestPipelineTcpHandler(t *testing.T) {
}()

<-ready
_, err := p.socketHandler(handler, event, eventData)
_, err := p.socketHandler(context.TODO(), handler, event, eventData)

assert.NoError(t, err)
<-done
@@ -316,7 +316,7 @@ func TestPipelineUdpHandler(t *testing.T) {

<-ready

_, err := p.socketHandler(handler, event, eventData)
_, err := p.socketHandler(context.TODO(), handler, event, eventData)

assert.NoError(t, err)
<-done
@@ -336,7 +336,7 @@ func TestPipelineGRPCHandler(t *testing.T) {
p := &Pipeline{
extensionExecutor: execFn,
}
result, err := p.grpcHandler(extension, event, nil)
result, err := p.grpcHandler(context.TODO(), extension, event, nil)

assert.NoError(t, err)
assert.Equal(t, "ok", result.Output)
Loading

0 comments on commit 201b1e7

Please sign in to comment.