From d7d03cd288d10375438d9165c80e528558d9d737 Mon Sep 17 00:00:00 2001 From: Marcus Efraimsson Date: Fri, 12 Jul 2024 15:09:47 +0200 Subject: [PATCH] Instrumentation: Improve automatic instrumentation by the SDK to include handler/request logs, metrics and traces (#1028) --- backend/adapter_utils.go | 170 +++++++++++++++++ backend/adapter_utils_test.go | 37 ++++ backend/admission_adapter.go | 51 ++--- backend/data_adapter.go | 74 +++---- backend/data_adapter_test.go | 72 ++++++- backend/diagnostics_adapter.go | 21 +- backend/endpoint.go | 4 + backend/error_source.go | 113 ++++++++++- backend/request_status.go | 103 ++++++++++ backend/request_status_test.go | 180 ++++++++++++++++++ backend/resource_adapter.go | 17 +- backend/stream_adapter.go | 50 ++--- backend/tracing/tracing.go | 15 ++ experimental/errorsource/error_source.go | 4 + experimental/errorsource/error_source_test.go | 9 + 15 files changed, 813 insertions(+), 107 deletions(-) create mode 100644 backend/adapter_utils.go create mode 100644 backend/adapter_utils_test.go create mode 100644 backend/request_status.go create mode 100644 backend/request_status_test.go diff --git a/backend/adapter_utils.go b/backend/adapter_utils.go new file mode 100644 index 000000000..28a737086 --- /dev/null +++ b/backend/adapter_utils.go @@ -0,0 +1,170 @@ +package backend + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +type handlerWrapperFunc func(ctx context.Context) (RequestStatus, error) + +func setupContext(ctx context.Context, endpoint Endpoint) context.Context { + ctx = WithEndpoint(ctx, endpoint) + ctx = propagateTenantIDIfPresent(ctx) + + return ctx +} + +func wrapHandler(ctx context.Context, pluginCtx PluginContext, next handlerWrapperFunc) error { + ctx = setupHandlerContext(ctx, pluginCtx) + wrapper := errorWrapper(next) + wrapper = logWrapper(wrapper) + wrapper = metricWrapper(wrapper) + wrapper = tracingWrapper(wrapper) + _, err := wrapper(ctx) + return err +} + +func setupHandlerContext(ctx context.Context, pluginCtx PluginContext) context.Context { + ctx = initErrorSource(ctx) + ctx = WithGrafanaConfig(ctx, pluginCtx.GrafanaConfig) + ctx = WithPluginContext(ctx, pluginCtx) + ctx = WithUser(ctx, pluginCtx.User) + ctx = withContextualLogAttributes(ctx, pluginCtx) + ctx = WithUserAgent(ctx, pluginCtx.UserAgent) + return ctx +} + +func errorWrapper(next handlerWrapperFunc) handlerWrapperFunc { + return func(ctx context.Context) (RequestStatus, error) { + status, err := next(ctx) + + if err != nil && IsDownstreamError(err) { + if innerErr := WithDownstreamErrorSource(ctx); innerErr != nil { + return RequestStatusError, fmt.Errorf("failed to set downstream status source: %w", errors.Join(innerErr, err)) + } + } + + return status, err + } +} + +var pluginRequestCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "grafana_plugin", + Name: "request_total", + Help: "The total amount of plugin requests", +}, []string{"endpoint", "status", "status_source"}) + +var once = sync.Once{} + +func metricWrapper(next handlerWrapperFunc) handlerWrapperFunc { + once.Do(func() { + prometheus.MustRegister(pluginRequestCounter) + }) + + return func(ctx context.Context) (RequestStatus, error) { + endpoint := EndpointFromContext(ctx) + status, err := next(ctx) + + pluginRequestCounter.WithLabelValues(endpoint.String(), status.String(), string(errorSourceFromContext(ctx))).Inc() + + return status, err + } +} + +func tracingWrapper(next handlerWrapperFunc) handlerWrapperFunc { + return func(ctx context.Context) (RequestStatus, error) { + endpoint := EndpointFromContext(ctx) + pluginCtx := PluginConfigFromContext(ctx) + ctx, span := tracing.DefaultTracer().Start(ctx, fmt.Sprintf("sdk.%s", endpoint), trace.WithAttributes( + attribute.String("plugin_id", pluginCtx.PluginID), + attribute.Int64("org_id", pluginCtx.OrgID), + )) + defer span.End() + + if pluginCtx.DataSourceInstanceSettings != nil { + span.SetAttributes( + attribute.String("datasource_name", pluginCtx.DataSourceInstanceSettings.Name), + attribute.String("datasource_uid", pluginCtx.DataSourceInstanceSettings.UID), + ) + } + + if u := pluginCtx.User; u != nil { + span.SetAttributes(attribute.String("user", pluginCtx.User.Name)) + } + + status, err := next(ctx) + + span.SetAttributes( + attribute.String("request_status", status.String()), + attribute.String("status_source", string(errorSourceFromContext(ctx))), + ) + + if err != nil { + return status, tracing.Error(span, err) + } + + return status, err + } +} + +func logWrapper(next handlerWrapperFunc) handlerWrapperFunc { + return func(ctx context.Context) (RequestStatus, error) { + start := time.Now() + status, err := next(ctx) + + logParams := []any{ + "status", status.String(), + "duration", time.Since(start).String(), + } + + if err != nil { + logParams = append(logParams, "error", err) + } + + logParams = append(logParams, "statusSource", string(errorSourceFromContext(ctx))) + + ctxLogger := Logger.FromContext(ctx) + logFunc := ctxLogger.Debug + if status > RequestStatusOK { + logFunc = ctxLogger.Error + } + + logFunc("Plugin Request Completed", logParams...) + + return status, err + } +} + +func withHeaderMiddleware(ctx context.Context, headers http.Header) context.Context { + if len(headers) > 0 { + ctx = httpclient.WithContextualMiddleware(ctx, + httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper { + if !opts.ForwardHTTPHeaders { + return next + } + + return httpclient.RoundTripperFunc(func(qreq *http.Request) (*http.Response, error) { + // Only set a header if it is not already set. + for k, v := range headers { + if qreq.Header.Get(k) == "" { + for _, vv := range v { + qreq.Header.Add(k, vv) + } + } + } + return next.RoundTrip(qreq) + }) + })) + } + return ctx +} diff --git a/backend/adapter_utils_test.go b/backend/adapter_utils_test.go new file mode 100644 index 000000000..15223ce48 --- /dev/null +++ b/backend/adapter_utils_test.go @@ -0,0 +1,37 @@ +package backend + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestErrorWrapper(t *testing.T) { + t.Run("No downstream error should not set downstream error source in context", func(t *testing.T) { + ctx := initErrorSource(context.Background()) + + actualErr := errors.New("BOOM") + wrapper := errorWrapper(func(_ context.Context) (RequestStatus, error) { + return RequestStatusError, actualErr + }) + status, err := wrapper(ctx) + require.ErrorIs(t, err, actualErr) + require.Equal(t, RequestStatusError, status) + require.Equal(t, DefaultErrorSource, errorSourceFromContext(ctx)) + }) + + t.Run("Downstream error should set downstream error source in context", func(t *testing.T) { + ctx := initErrorSource(context.Background()) + + actualErr := errors.New("BOOM") + wrapper := errorWrapper(func(_ context.Context) (RequestStatus, error) { + return RequestStatusError, DownstreamError(actualErr) + }) + status, err := wrapper(ctx) + require.ErrorIs(t, err, actualErr) + require.Equal(t, RequestStatusError, status) + require.Equal(t, ErrorSourceDownstream, errorSourceFromContext(ctx)) + }) +} diff --git a/backend/admission_adapter.go b/backend/admission_adapter.go index f63210a5c..6c04675df 100644 --- a/backend/admission_adapter.go +++ b/backend/admission_adapter.go @@ -18,49 +18,52 @@ func newAdmissionSDKAdapter(handler AdmissionHandler) *admissionSDKAdapter { } func (a *admissionSDKAdapter) ValidateAdmission(ctx context.Context, req *pluginv2.AdmissionRequest) (*pluginv2.ValidationResponse, error) { - ctx = WithEndpoint(ctx, EndpointValidateAdmission) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointValidateAdmission) parsedReq := FromProto().AdmissionRequest(req) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - resp, err := a.handler.ValidateAdmission(ctx, parsedReq) + + var resp *ValidationResponse + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + var innerErr error + resp, innerErr = a.handler.ValidateAdmission(ctx, parsedReq) + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } + return ToProto().ValidationResponse(resp), nil } func (a *admissionSDKAdapter) MutateAdmission(ctx context.Context, req *pluginv2.AdmissionRequest) (*pluginv2.MutationResponse, error) { - ctx = WithEndpoint(ctx, EndpointMutateAdmission) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointMutateAdmission) parsedReq := FromProto().AdmissionRequest(req) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - resp, err := a.handler.MutateAdmission(ctx, parsedReq) + + var resp *MutationResponse + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + var innerErr error + resp, innerErr = a.handler.MutateAdmission(ctx, parsedReq) + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } + return ToProto().MutationResponse(resp), nil } func (a *admissionSDKAdapter) ConvertObject(ctx context.Context, req *pluginv2.ConversionRequest) (*pluginv2.ConversionResponse, error) { - ctx = WithEndpoint(ctx, EndpointConvertObject) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointConvertObject) parsedReq := FromProto().ConversionRequest(req) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - resp, err := a.handler.ConvertObject(ctx, parsedReq) + + var resp *ConversionResponse + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + var innerErr error + resp, innerErr = a.handler.ConvertObject(ctx, parsedReq) + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } + return ToProto().ConversionResponse(resp), nil } diff --git a/backend/data_adapter.go b/backend/data_adapter.go index 496f230ff..7ed5d205c 100644 --- a/backend/data_adapter.go +++ b/backend/data_adapter.go @@ -2,9 +2,9 @@ package backend import ( "context" - "net/http" + "errors" + "fmt" - "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" ) @@ -19,41 +19,45 @@ func newDataSDKAdapter(handler QueryDataHandler) *dataSDKAdapter { } } -func withHeaderMiddleware(ctx context.Context, headers http.Header) context.Context { - if len(headers) > 0 { - ctx = httpclient.WithContextualMiddleware(ctx, - httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper { - if !opts.ForwardHTTPHeaders { - return next - } - - return httpclient.RoundTripperFunc(func(qreq *http.Request) (*http.Response, error) { - // Only set a header if it is not already set. - for k, v := range headers { - if qreq.Header.Get(k) == "" { - for _, vv := range v { - qreq.Header.Add(k, vv) - } - } - } - return next.RoundTrip(qreq) - }) - })) - } - return ctx -} - func (a *dataSDKAdapter) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest) (*pluginv2.QueryDataResponse, error) { - ctx = WithEndpoint(ctx, EndpointQueryData) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(req.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointQueryData) parsedReq := FromProto().QueryDataRequest(req) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - resp, err := a.queryDataHandler.QueryData(ctx, parsedReq) + + var resp *QueryDataResponse + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) + var innerErr error + resp, innerErr = a.queryDataHandler.QueryData(ctx, parsedReq) + + if resp == nil || len(resp.Responses) == 0 { + return RequestStatusFromError(innerErr), innerErr + } + + // Set downstream status source in the context if there's at least one response with downstream status source, + // and if there's no plugin error + var hasPluginError bool + var hasDownstreamError bool + for _, r := range resp.Responses { + if r.Error == nil { + continue + } + if r.ErrorSource == ErrorSourceDownstream { + hasDownstreamError = true + } else { + hasPluginError = true + } + } + + // A plugin error has higher priority than a downstream error, + // so set to downstream only if there's no plugin error + if hasDownstreamError && !hasPluginError { + if err := WithDownstreamErrorSource(ctx); err != nil { + return RequestStatusError, fmt.Errorf("failed to set downstream status source: %w", errors.Join(innerErr, err)) + } + } + + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } diff --git a/backend/data_adapter_test.go b/backend/data_adapter_test.go index d36b63933..57fd76152 100644 --- a/backend/data_adapter_test.go +++ b/backend/data_adapter_test.go @@ -3,6 +3,7 @@ package backend import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -65,11 +66,10 @@ func (f *fakeDataHandlerWithOAuth) QueryData(ctx context.Context, _ *QueryDataRe } func TestQueryData(t *testing.T) { - handler := newFakeDataHandlerWithOAuth() - adapter := newDataSDKAdapter(handler) - t.Run("When forward HTTP headers enabled should forward headers", func(t *testing.T) { ctx := context.Background() + handler := newFakeDataHandlerWithOAuth() + adapter := newDataSDKAdapter(handler) _, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{ Headers: map[string]string{ "Authorization": "Bearer 123", @@ -94,6 +94,8 @@ func TestQueryData(t *testing.T) { t.Run("When forward HTTP headers disable should not forward headers", func(t *testing.T) { ctx := context.Background() + handler := newFakeDataHandlerWithOAuth() + adapter := newDataSDKAdapter(handler) _, err := adapter.QueryData(ctx, &pluginv2.QueryDataRequest{ Headers: map[string]string{ "Authorization": "Bearer 123", @@ -130,6 +132,70 @@ func TestQueryData(t *testing.T) { }) require.NoError(t, err) }) + + t.Run("TestQueryDataResponse", func(t *testing.T) { + someErr := errors.New("oops") + + for _, tc := range []struct { + name string + queryDataResponse *QueryDataResponse + expErrorSource ErrorSource + }{ + { + name: `single downstream error should be "downstream" error source`, + queryDataResponse: &QueryDataResponse{ + Responses: map[string]DataResponse{ + "A": {Error: someErr, ErrorSource: ErrorSourceDownstream}, + }, + }, + expErrorSource: ErrorSourceDownstream, + }, + { + name: `single plugin error should be "plugin" error source`, + queryDataResponse: &QueryDataResponse{ + Responses: map[string]DataResponse{ + "A": {Error: someErr, ErrorSource: ErrorSourcePlugin}, + }, + }, + expErrorSource: ErrorSourcePlugin, + }, + { + name: `multiple downstream errors should be "downstream" error source`, + queryDataResponse: &QueryDataResponse{ + Responses: map[string]DataResponse{ + "A": {Error: someErr, ErrorSource: ErrorSourceDownstream}, + "B": {Error: someErr, ErrorSource: ErrorSourceDownstream}, + }, + }, + expErrorSource: ErrorSourceDownstream, + }, + { + name: `single plugin error mixed with downstream errors should be "plugin" error source`, + queryDataResponse: &QueryDataResponse{ + Responses: map[string]DataResponse{ + "A": {Error: someErr, ErrorSource: ErrorSourceDownstream}, + "B": {Error: someErr, ErrorSource: ErrorSourcePlugin}, + "C": {Error: someErr, ErrorSource: ErrorSourceDownstream}, + }, + }, + expErrorSource: ErrorSourcePlugin, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var actualCtx context.Context + a := newDataSDKAdapter(QueryDataHandlerFunc(func(ctx context.Context, _ *QueryDataRequest) (*QueryDataResponse, error) { + actualCtx = ctx + return tc.queryDataResponse, nil + })) + _, err := a.QueryData(context.Background(), &pluginv2.QueryDataRequest{ + PluginContext: &pluginv2.PluginContext{}, + }) + require.NoError(t, err) + ss := errorSourceFromContext(actualCtx) + require.Equal(t, tc.expErrorSource, ss) + }) + } + }) } var finalRoundTripper = httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { diff --git a/backend/diagnostics_adapter.go b/backend/diagnostics_adapter.go index 957355bef..97c3310ea 100644 --- a/backend/diagnostics_adapter.go +++ b/backend/diagnostics_adapter.go @@ -46,20 +46,21 @@ func (a *diagnosticsSDKAdapter) CollectMetrics(_ context.Context, _ *pluginv2.Co func (a *diagnosticsSDKAdapter) CheckHealth(ctx context.Context, protoReq *pluginv2.CheckHealthRequest) (*pluginv2.CheckHealthResponse, error) { if a.checkHealthHandler != nil { - ctx = WithEndpoint(ctx, EndpointCheckHealth) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(protoReq.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointCheckHealth) parsedReq := FromProto().CheckHealthRequest(protoReq) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - res, err := a.checkHealthHandler.CheckHealth(ctx, parsedReq) + + var resp *CheckHealthResult + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) + var innerErr error + resp, innerErr = a.checkHealthHandler.CheckHealth(ctx, parsedReq) + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } - return ToProto().CheckHealthResponse(res), nil + + return ToProto().CheckHealthResponse(resp), nil } return &pluginv2.CheckHealthResponse{ diff --git a/backend/endpoint.go b/backend/endpoint.go index ba17d11f6..a337191dd 100644 --- a/backend/endpoint.go +++ b/backend/endpoint.go @@ -10,6 +10,10 @@ func (e Endpoint) IsEmpty() bool { return e == "" } +func (e Endpoint) String() string { + return string(e) +} + type endpointCtxKeyType struct{} var endpointCtxKey = endpointCtxKeyType{} diff --git a/backend/error_source.go b/backend/error_source.go index ae92e9c63..69d868caa 100644 --- a/backend/error_source.go +++ b/backend/error_source.go @@ -1,16 +1,27 @@ package backend -import "net/http" +import ( + "context" + "errors" + "fmt" + "net/http" +) // ErrorSource type defines the source of the error type ErrorSource string const ( - ErrorSourcePlugin ErrorSource = "plugin" + // ErrorSourcePlugin error originates from plugin. + ErrorSourcePlugin ErrorSource = "plugin" + + // ErrorSourceDownstream error originates from downstream service. ErrorSourceDownstream ErrorSource = "downstream" + + // DefaultErrorSource is the default [ErrorSource] that should be used when it is not explicitly set. + DefaultErrorSource ErrorSource = ErrorSourcePlugin ) -// ErrorSourceFromStatus returns an ErrorSource based on provided HTTP status code. +// ErrorSourceFromStatus returns an [ErrorSource] based on provided HTTP status code. func ErrorSourceFromHTTPStatus(statusCode int) ErrorSource { switch statusCode { case http.StatusMethodNotAllowed, @@ -28,3 +39,99 @@ func ErrorSourceFromHTTPStatus(statusCode int) ErrorSource { return ErrorSourceDownstream } + +type errorWithSourceImpl struct { + source ErrorSource + err error +} + +func IsDownstreamError(err error) bool { + e := errorWithSourceImpl{ + source: ErrorSourceDownstream, + } + if errors.Is(err, e) { + return true + } + + type errorWithSource interface { + ErrorSource() ErrorSource + } + + // nolint:errorlint + if errWithSource, ok := err.(errorWithSource); ok && errWithSource.ErrorSource() == ErrorSourceDownstream { + return true + } + + return false +} + +func DownstreamError(err error) error { + return errorWithSourceImpl{ + source: ErrorSourceDownstream, + err: err, + } +} + +func DownstreamErrorf(format string, a ...any) error { + return DownstreamError(fmt.Errorf(format, a...)) +} + +func (e errorWithSourceImpl) ErrorSource() ErrorSource { + return e.source +} + +func (e errorWithSourceImpl) Error() string { + return fmt.Errorf("%s error: %w", e.source, e.err).Error() +} + +// Implements the interface used by [errors.Is]. +func (e errorWithSourceImpl) Is(err error) bool { + if errWithSource, ok := err.(errorWithSourceImpl); ok { + return errWithSource.ErrorSource() == e.source + } + + return false +} + +func (e errorWithSourceImpl) Unwrap() error { + return e.err +} + +type errorSourceCtxKey struct{} + +// errorSourceFromContext returns the error source stored in the context. +// If no error source is stored in the context, [DefaultErrorSource] is returned. +func errorSourceFromContext(ctx context.Context) ErrorSource { + value, ok := ctx.Value(errorSourceCtxKey{}).(*ErrorSource) + if ok { + return *value + } + return DefaultErrorSource +} + +// initErrorSource initialize the status source for the context. +func initErrorSource(ctx context.Context) context.Context { + s := DefaultErrorSource + return context.WithValue(ctx, errorSourceCtxKey{}, &s) +} + +// WithErrorSource mutates the provided context by setting the error source to +// s. If the provided context does not have a error source, the context +// will not be mutated and an error returned. This means that [initErrorSource] +// has to be called before this function. +func WithErrorSource(ctx context.Context, s ErrorSource) error { + v, ok := ctx.Value(errorSourceCtxKey{}).(*ErrorSource) + if !ok { + return errors.New("the provided context does not have a status source") + } + *v = s + return nil +} + +// WithDownstreamErrorSource mutates the provided context by setting the error source to +// [ErrorSourceDownstream]. If the provided context does not have a error source, the context +// will not be mutated and an error returned. This means that [initErrorSource] has to be +// called before this function. +func WithDownstreamErrorSource(ctx context.Context) error { + return WithErrorSource(ctx, ErrorSourceDownstream) +} diff --git a/backend/request_status.go b/backend/request_status.go new file mode 100644 index 000000000..2f5d3021c --- /dev/null +++ b/backend/request_status.go @@ -0,0 +1,103 @@ +package backend + +import ( + "context" + "errors" + "strings" + + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" + + "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" +) + +type RequestStatus int + +const ( + RequestStatusOK RequestStatus = iota + RequestStatusCancelled + RequestStatusError +) + +func (status RequestStatus) String() string { + names := [...]string{"ok", "cancelled", "error"} + if status < RequestStatusOK || status > RequestStatusError { + return "" + } + + return names[status] +} + +func RequestStatusFromError(err error) RequestStatus { + status := RequestStatusOK + if err != nil { + status = RequestStatusError + if errors.Is(err, context.Canceled) || grpcstatus.Code(err) == grpccodes.Canceled { + status = RequestStatusCancelled + } + } + + return status +} + +func RequestStatusFromErrorString(errString string) RequestStatus { + status := RequestStatusOK + if errString != "" { + status = RequestStatusError + if strings.Contains(errString, context.Canceled.Error()) || strings.Contains(errString, "code = Canceled") { + status = RequestStatusCancelled + } + } + + return status +} + +func RequestStatusFromQueryDataResponse(res *QueryDataResponse, err error) RequestStatus { + if err != nil { + return RequestStatusFromError(err) + } + + status := RequestStatusOK + + if res != nil { + for _, dr := range res.Responses { + if dr.Error != nil { + s := RequestStatusFromError(dr.Error) + if s > status { + status = s + } + + if status == RequestStatusError { + break + } + } + } + } + + return status +} + +func RequestStatusFromProtoQueryDataResponse(res *pluginv2.QueryDataResponse, err error) RequestStatus { + if err != nil { + return RequestStatusFromError(err) + } + + status := RequestStatusOK + + if res != nil { + for _, dr := range res.Responses { + if dr.Error != "" { + s := RequestStatusFromErrorString(dr.Error) + if s > status { + status = s + } + + if status == RequestStatusError { + break + } + } + } + } + + return status +} diff --git a/backend/request_status_test.go b/backend/request_status_test.go new file mode 100644 index 000000000..4712e1245 --- /dev/null +++ b/backend/request_status_test.go @@ -0,0 +1,180 @@ +package backend + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +func TestRequestStatus(t *testing.T) { + tcs := []struct { + s RequestStatus + expectedLabel string + }{ + { + s: RequestStatusOK, + expectedLabel: "ok", + }, + { + s: RequestStatusError, + expectedLabel: "error", + }, + { + s: RequestStatusCancelled, + expectedLabel: "cancelled", + }, + } + + for _, tc := range tcs { + t.Run(tc.s.String(), func(t *testing.T) { + require.Equal(t, tc.expectedLabel, tc.s.String()) + require.Equal(t, tc.expectedLabel, fmt.Sprint(tc.s)) + }) + } +} + +func TestRequestStatusFromError(t *testing.T) { + tcs := []struct { + desc string + err error + expectedStatus RequestStatus + }{ + { + desc: "no error should be status ok", + err: nil, + expectedStatus: RequestStatusOK, + }, + { + desc: "error should be status error", + err: errors.New("boom"), + expectedStatus: RequestStatusError, + }, + { + desc: "context canceled should be status cancelled", + err: context.Canceled, + expectedStatus: RequestStatusCancelled, + }, + { + desc: "gRPC canceled should be status cancelled", + err: status.Error(codes.Canceled, "canceled"), + expectedStatus: RequestStatusCancelled, + }, + } + + for _, tc := range tcs { + t.Run(tc.desc, func(t *testing.T) { + status := RequestStatusFromError(tc.err) + require.Equal(t, tc.expectedStatus, status) + }) + } +} + +func TestRequestStatusFromQueryDataResponse(t *testing.T) { + responseWithoutError := NewQueryDataResponse() + responseWithoutError.Responses["A"] = DataResponse{ + Frames: data.Frames{data.NewFrame("test")}, + } + + responseWithError := NewQueryDataResponse() + responseWithError.Responses["A"] = DataResponse{ + Error: errors.New("boom"), + } + responseWithMultipleErrors := NewQueryDataResponse() + responseWithMultipleErrors.Responses["A"] = DataResponse{ + Error: context.Canceled, + } + responseWithMultipleErrors.Responses["B"] = DataResponse{ + Frames: data.Frames{data.NewFrame("test")}, + } + responseWithMultipleErrors.Responses["C"] = DataResponse{ + Error: errors.New("boom"), + } + + tcs := []struct { + desc string + resp *QueryDataResponse + err error + expectedStatus RequestStatus + }{ + { + desc: "no error should be status ok", + err: nil, + expectedStatus: RequestStatusOK, + }, + { + desc: "error should be status error", + err: errors.New("boom"), + expectedStatus: RequestStatusError, + }, + { + desc: "context canceled should be status cancelled", + err: context.Canceled, + expectedStatus: RequestStatusCancelled, + }, + { + desc: "response without error should be status ok", + resp: responseWithoutError, + expectedStatus: RequestStatusOK, + }, + { + desc: "response with error should be status error", + resp: responseWithError, + expectedStatus: RequestStatusError, + }, + { + desc: "response with multiple error should pick the highest status cancelled", + resp: responseWithMultipleErrors, + expectedStatus: RequestStatusError, + }, + } + + for _, tc := range tcs { + t.Run(tc.desc, func(t *testing.T) { + status := RequestStatusFromQueryDataResponse(tc.resp, tc.err) + require.Equal(t, tc.expectedStatus, status) + }) + } +} + +func TestRequestStatusFromErrorString(t *testing.T) { + tcs := []struct { + desc string + err string + expectedStatus RequestStatus + }{ + { + desc: "no error should be status ok", + err: "", + expectedStatus: RequestStatusOK, + }, + { + desc: "error should be status error", + err: errors.New("boom").Error(), + expectedStatus: RequestStatusError, + }, + { + desc: "context canceled should be status cancelled", + err: context.Canceled.Error(), + expectedStatus: RequestStatusCancelled, + }, + { + desc: "gRPC canceled should be status cancelled", + err: status.Error(codes.Canceled, "canceled").Error(), + expectedStatus: RequestStatusCancelled, + }, + } + + for _, tc := range tcs { + t.Run(tc.desc, func(t *testing.T) { + status := RequestStatusFromErrorString(tc.err) + require.Equal(t, tc.expectedStatus, status) + }) + } +} diff --git a/backend/resource_adapter.go b/backend/resource_adapter.go index 77d9c7936..d33158482 100644 --- a/backend/resource_adapter.go +++ b/backend/resource_adapter.go @@ -1,6 +1,7 @@ package backend import ( + "context" "net/http" "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" @@ -29,14 +30,12 @@ func (a *resourceSDKAdapter) CallResource(protoReq *pluginv2.CallResourceRequest }) ctx := protoSrv.Context() - ctx = WithEndpoint(ctx, EndpointCallResource) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(protoReq.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointCallResource) parsedReq := FromProto().CallResourceRequest(protoReq) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - return a.callResourceHandler.CallResource(ctx, parsedReq, fn) + + return wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + ctx = withHeaderMiddleware(ctx, parsedReq.GetHTTPHeaders()) + err := a.callResourceHandler.CallResource(ctx, parsedReq, fn) + return RequestStatusFromError(err), err + }) } diff --git a/backend/stream_adapter.go b/backend/stream_adapter.go index 2bc7f1c70..d67c68cda 100644 --- a/backend/stream_adapter.go +++ b/backend/stream_adapter.go @@ -24,17 +24,20 @@ func (a *streamSDKAdapter) SubscribeStream(ctx context.Context, protoReq *plugin if a.streamHandler == nil { return nil, status.Error(codes.Unimplemented, "not implemented") } - ctx = WithEndpoint(ctx, EndpointSubscribeStream) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(protoReq.PluginContext.GrafanaConfig)) + + ctx = setupContext(ctx, EndpointSubscribeStream) parsedReq := FromProto().SubscribeStreamRequest(protoReq) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - resp, err := a.streamHandler.SubscribeStream(ctx, parsedReq) + + var resp *SubscribeStreamResponse + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + var innerErr error + resp, innerErr = a.streamHandler.SubscribeStream(ctx, parsedReq) + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } + return ToProto().SubscribeStreamResponse(resp), nil } @@ -42,17 +45,20 @@ func (a *streamSDKAdapter) PublishStream(ctx context.Context, protoReq *pluginv2 if a.streamHandler == nil { return nil, status.Error(codes.Unimplemented, "not implemented") } - ctx = WithEndpoint(ctx, EndpointPublishStream) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(protoReq.PluginContext.GrafanaConfig)) + + ctx = setupContext(ctx, EndpointPublishStream) parsedReq := FromProto().PublishStreamRequest(protoReq) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - resp, err := a.streamHandler.PublishStream(ctx, parsedReq) + + var resp *PublishStreamResponse + err := wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + var innerErr error + resp, innerErr = a.streamHandler.PublishStream(ctx, parsedReq) + return RequestStatusFromError(innerErr), innerErr + }) if err != nil { return nil, err } + return ToProto().PublishStreamResponse(resp), nil } @@ -69,14 +75,12 @@ func (a *streamSDKAdapter) RunStream(protoReq *pluginv2.RunStreamRequest, protoS return status.Error(codes.Unimplemented, "not implemented") } ctx := protoSrv.Context() - ctx = WithEndpoint(ctx, EndpointRunStream) - ctx = propagateTenantIDIfPresent(ctx) - ctx = WithGrafanaConfig(ctx, NewGrafanaCfg(protoReq.PluginContext.GrafanaConfig)) + ctx = setupContext(ctx, EndpointRunStream) parsedReq := FromProto().RunStreamRequest(protoReq) - ctx = WithPluginContext(ctx, parsedReq.PluginContext) - ctx = WithUser(ctx, parsedReq.PluginContext.User) - ctx = withContextualLogAttributes(ctx, parsedReq.PluginContext) - ctx = WithUserAgent(ctx, parsedReq.PluginContext.UserAgent) - sender := NewStreamSender(&runStreamServer{protoSrv: protoSrv}) - return a.streamHandler.RunStream(ctx, parsedReq, sender) + + return wrapHandler(ctx, parsedReq.PluginContext, func(ctx context.Context) (RequestStatus, error) { + sender := NewStreamSender(&runStreamServer{protoSrv: protoSrv}) + err := a.streamHandler.RunStream(ctx, parsedReq, sender) + return RequestStatusFromError(err), err + }) } diff --git a/backend/tracing/tracing.go b/backend/tracing/tracing.go index 5fb0d4444..d9959a608 100644 --- a/backend/tracing/tracing.go +++ b/backend/tracing/tracing.go @@ -2,10 +2,12 @@ package tracing import ( "context" + "fmt" "sync" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -50,3 +52,16 @@ func TraceIDFromContext(ctx context.Context, requireSampled bool) string { return spanCtx.TraceID().String() } + +// Error sets the status to error and record the error as an exception in the provided span. +func Error(span trace.Span, err error) error { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + return err +} + +// Errorf wraps fmt.Errorf and also sets the status to error and record the error as an exception in the provided span. +func Errorf(span trace.Span, format string, args ...any) error { + err := fmt.Errorf(format, args...) + return Error(span, err) +} diff --git a/experimental/errorsource/error_source.go b/experimental/errorsource/error_source.go index 052462fcf..50799c85c 100644 --- a/experimental/errorsource/error_source.go +++ b/experimental/errorsource/error_source.go @@ -33,6 +33,10 @@ func (r Error) Source() backend.ErrorSource { return r.source } +func (r Error) ErrorSource() backend.ErrorSource { + return r.source +} + // PluginError will apply the source as plugin func PluginError(err error, override bool) error { return SourceError(backend.ErrorSourcePlugin, err, override) diff --git a/experimental/errorsource/error_source_test.go b/experimental/errorsource/error_source_test.go index 67a7f88a5..f8af12c16 100644 --- a/experimental/errorsource/error_source_test.go +++ b/experimental/errorsource/error_source_test.go @@ -84,3 +84,12 @@ func TestResponseWithOptions(t *testing.T) { }) } } + +func TestError(t *testing.T) { + err := errors.New("boom") + require.False(t, backend.IsDownstreamError(err)) + pErr := PluginError(err, true) + require.False(t, backend.IsDownstreamError(pErr)) + dErr := DownstreamError(err, true) + require.True(t, backend.IsDownstreamError(dErr)) +}