Skip to content

Commit

Permalink
Instrumentation: Improve automatic instrumentation by the SDK to incl…
Browse files Browse the repository at this point in the history
…ude handler/request logs, metrics and traces (#1028)
  • Loading branch information
marefr authored Jul 12, 2024
1 parent 7c8ae12 commit d7d03cd
Show file tree
Hide file tree
Showing 15 changed files with 813 additions and 107 deletions.
170 changes: 170 additions & 0 deletions backend/adapter_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package backend

import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type handlerWrapperFunc func(ctx context.Context) (RequestStatus, error)

func setupContext(ctx context.Context, endpoint Endpoint) context.Context {
ctx = WithEndpoint(ctx, endpoint)
ctx = propagateTenantIDIfPresent(ctx)

return ctx
}

func wrapHandler(ctx context.Context, pluginCtx PluginContext, next handlerWrapperFunc) error {
ctx = setupHandlerContext(ctx, pluginCtx)
wrapper := errorWrapper(next)
wrapper = logWrapper(wrapper)
wrapper = metricWrapper(wrapper)
wrapper = tracingWrapper(wrapper)
_, err := wrapper(ctx)
return err
}

func setupHandlerContext(ctx context.Context, pluginCtx PluginContext) context.Context {
ctx = initErrorSource(ctx)
ctx = WithGrafanaConfig(ctx, pluginCtx.GrafanaConfig)
ctx = WithPluginContext(ctx, pluginCtx)
ctx = WithUser(ctx, pluginCtx.User)
ctx = withContextualLogAttributes(ctx, pluginCtx)
ctx = WithUserAgent(ctx, pluginCtx.UserAgent)
return ctx
}

func errorWrapper(next handlerWrapperFunc) handlerWrapperFunc {
return func(ctx context.Context) (RequestStatus, error) {
status, err := next(ctx)

if err != nil && IsDownstreamError(err) {
if innerErr := WithDownstreamErrorSource(ctx); innerErr != nil {
return RequestStatusError, fmt.Errorf("failed to set downstream status source: %w", errors.Join(innerErr, err))
}
}

return status, err
}
}

var pluginRequestCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "grafana_plugin",
Name: "request_total",
Help: "The total amount of plugin requests",
}, []string{"endpoint", "status", "status_source"})

var once = sync.Once{}

func metricWrapper(next handlerWrapperFunc) handlerWrapperFunc {
once.Do(func() {
prometheus.MustRegister(pluginRequestCounter)
})

return func(ctx context.Context) (RequestStatus, error) {
endpoint := EndpointFromContext(ctx)
status, err := next(ctx)

pluginRequestCounter.WithLabelValues(endpoint.String(), status.String(), string(errorSourceFromContext(ctx))).Inc()

return status, err
}
}

func tracingWrapper(next handlerWrapperFunc) handlerWrapperFunc {
return func(ctx context.Context) (RequestStatus, error) {
endpoint := EndpointFromContext(ctx)
pluginCtx := PluginConfigFromContext(ctx)
ctx, span := tracing.DefaultTracer().Start(ctx, fmt.Sprintf("sdk.%s", endpoint), trace.WithAttributes(
attribute.String("plugin_id", pluginCtx.PluginID),
attribute.Int64("org_id", pluginCtx.OrgID),
))
defer span.End()

if pluginCtx.DataSourceInstanceSettings != nil {
span.SetAttributes(
attribute.String("datasource_name", pluginCtx.DataSourceInstanceSettings.Name),
attribute.String("datasource_uid", pluginCtx.DataSourceInstanceSettings.UID),
)
}

if u := pluginCtx.User; u != nil {
span.SetAttributes(attribute.String("user", pluginCtx.User.Name))
}

status, err := next(ctx)

span.SetAttributes(
attribute.String("request_status", status.String()),
attribute.String("status_source", string(errorSourceFromContext(ctx))),
)

if err != nil {
return status, tracing.Error(span, err)
}

return status, err
}
}

func logWrapper(next handlerWrapperFunc) handlerWrapperFunc {
return func(ctx context.Context) (RequestStatus, error) {
start := time.Now()
status, err := next(ctx)

logParams := []any{
"status", status.String(),
"duration", time.Since(start).String(),
}

if err != nil {
logParams = append(logParams, "error", err)
}

logParams = append(logParams, "statusSource", string(errorSourceFromContext(ctx)))

ctxLogger := Logger.FromContext(ctx)
logFunc := ctxLogger.Debug
if status > RequestStatusOK {
logFunc = ctxLogger.Error
}

logFunc("Plugin Request Completed", logParams...)

return status, err
}
}

func withHeaderMiddleware(ctx context.Context, headers http.Header) context.Context {
if len(headers) > 0 {
ctx = httpclient.WithContextualMiddleware(ctx,
httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
if !opts.ForwardHTTPHeaders {
return next
}

return httpclient.RoundTripperFunc(func(qreq *http.Request) (*http.Response, error) {
// Only set a header if it is not already set.
for k, v := range headers {
if qreq.Header.Get(k) == "" {
for _, vv := range v {
qreq.Header.Add(k, vv)
}
}
}
return next.RoundTrip(qreq)
})
}))
}
return ctx
}
37 changes: 37 additions & 0 deletions backend/adapter_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package backend

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestErrorWrapper(t *testing.T) {
t.Run("No downstream error should not set downstream error source in context", func(t *testing.T) {
ctx := initErrorSource(context.Background())

actualErr := errors.New("BOOM")
wrapper := errorWrapper(func(_ context.Context) (RequestStatus, error) {
return RequestStatusError, actualErr
})
status, err := wrapper(ctx)
require.ErrorIs(t, err, actualErr)
require.Equal(t, RequestStatusError, status)
require.Equal(t, DefaultErrorSource, errorSourceFromContext(ctx))
})

t.Run("Downstream error should set downstream error source in context", func(t *testing.T) {
ctx := initErrorSource(context.Background())

actualErr := errors.New("BOOM")
wrapper := errorWrapper(func(_ context.Context) (RequestStatus, error) {
return RequestStatusError, DownstreamError(actualErr)
})
status, err := wrapper(ctx)
require.ErrorIs(t, err, actualErr)
require.Equal(t, RequestStatusError, status)
require.Equal(t, ErrorSourceDownstream, errorSourceFromContext(ctx))
})
}
51 changes: 27 additions & 24 deletions backend/admission_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,52 @@ func newAdmissionSDKAdapter(handler AdmissionHandler) *admissionSDKAdapter {
}

func (a *admissionSDKAdapter) ValidateAdmission(ctx context.Context, req *pluginv2.AdmissionRequest) (*pluginv2.ValidationResponse, error) {
ctx = WithEndpoint(ctx, EndpointValidateAdmission)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointValidateAdmission)
parsedReq := FromProto().AdmissionRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.handler.ValidateAdmission(ctx, parsedReq)

var resp *ValidationResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
var innerErr error
resp, innerErr = a.handler.ValidateAdmission(ctx, parsedReq)
return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}

return ToProto().ValidationResponse(resp), nil
}

func (a *admissionSDKAdapter) MutateAdmission(ctx context.Context, req *pluginv2.AdmissionRequest) (*pluginv2.MutationResponse, error) {
ctx = WithEndpoint(ctx, EndpointMutateAdmission)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointMutateAdmission)
parsedReq := FromProto().AdmissionRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.handler.MutateAdmission(ctx, parsedReq)

var resp *MutationResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
var innerErr error
resp, innerErr = a.handler.MutateAdmission(ctx, parsedReq)
return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}

return ToProto().MutationResponse(resp), nil
}

func (a *admissionSDKAdapter) ConvertObject(ctx context.Context, req *pluginv2.ConversionRequest) (*pluginv2.ConversionResponse, error) {
ctx = WithEndpoint(ctx, EndpointConvertObject)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointConvertObject)
parsedReq := FromProto().ConversionRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.handler.ConvertObject(ctx, parsedReq)

var resp *ConversionResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
var innerErr error
resp, innerErr = a.handler.ConvertObject(ctx, parsedReq)
return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}

return ToProto().ConversionResponse(resp), nil
}
74 changes: 39 additions & 35 deletions backend/data_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package backend

import (
"context"
"net/http"
"errors"
"fmt"

"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
)

Expand All @@ -19,41 +19,45 @@ func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter {
}
}

func withHeaderMiddleware(ctx context.Context, headers http.Header) context.Context {
if len(headers) > 0 {
ctx = httpclient.WithContextualMiddleware(ctx,
httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
if !opts.ForwardHTTPHeaders {
return next
}

return httpclient.RoundTripperFunc(func(qreq *http.Request) (*http.Response, error) {
// Only set a header if it is not already set.
for k, v := range headers {
if qreq.Header.Get(k) == "" {
for _, vv := range v {
qreq.Header.Add(k, vv)
}
}
}
return next.RoundTrip(qreq)
})
}))
}
return ctx
}

func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) {
ctx = WithEndpoint(ctx, EndpointQueryData)
ctx = propagateTenantIDIfPresent(ctx)
ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig))
ctx = setupContext(ctx, EndpointQueryData)
parsedReq := FromProto().QueryDataRequest(req)
ctx = WithPluginContext(ctx, parsedReq.PluginContext)
ctx = WithUser(ctx, parsedReq.PluginContext.User)
ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders())
ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext)
ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent)
resp, err := a.queryDataHandler.QueryData(ctx, parsedReq)

var resp *QueryDataResponse
err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) {
ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders())
var innerErr error
resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq)

if resp == nil || len(resp.Responses) == 0 {
return RequestStatusFromError(innerErr), innerErr
}

// Set downstream status source in the context if there's at least one response with downstream status source,
// and if there's no plugin error
var hasPluginError bool
var hasDownstreamError bool
for _, r := range resp.Responses {
if r.Error == nil {
continue
}
if r.ErrorSource == ErrorSourceDownstream {
hasDownstreamError = true
} else {
hasPluginError = true
}
}

// A plugin error has higher priority than a downstream error,
// so set to downstream only if there's no plugin error
if hasDownstreamError && !hasPluginError {
if err := WithDownstreamErrorSource(ctx); err != nil {
return RequestStatusError, fmt.Errorf("failed to set downstream status source: %w", errors.Join(innerErr, err))
}
}

return RequestStatusFromError(innerErr), innerErr
})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d7d03cd

Please sign in to comment.