From 5e1a96ba872cf979c12cccd42e754e1d67902b44 Mon Sep 17 00:00:00 2001 From: Muhammad Raza Date: Fri, 20 Oct 2023 22:57:23 +0100 Subject: [PATCH] Passing message to record malformed event Signed-off-by: Muhammad Raza --- observability/opencensus/v2/client/client_test.go | 2 +- observability/opencensus/v2/client/observability_service.go | 2 +- .../opentelemetry/v2/client/otel_observability_service.go | 2 +- test/observability/go.mod | 1 + .../opentelemetry/otel_observability_service_test.go | 3 ++- v2/client/invoker.go | 4 ++-- v2/client/observability.go | 5 +++-- 7 files changed, 11 insertions(+), 8 deletions(-) diff --git a/observability/opencensus/v2/client/client_test.go b/observability/opencensus/v2/client/client_test.go index 980349510..5f3b20644 100644 --- a/observability/opencensus/v2/client/client_test.go +++ b/observability/opencensus/v2/client/client_test.go @@ -355,7 +355,7 @@ func (n fakeObservabilityServiceWithError) InboundContextDecorators() []func(con return nil } -func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, err error) { +func (n fakeObservabilityServiceWithError) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) { } func (n fakeObservabilityServiceWithError) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) { diff --git a/observability/opencensus/v2/client/observability_service.go b/observability/opencensus/v2/client/observability_service.go index c9d123f67..8ae3a8f4c 100644 --- a/observability/opencensus/v2/client/observability_service.go +++ b/observability/opencensus/v2/client/observability_service.go @@ -24,7 +24,7 @@ func (o opencensusObservabilityService) InboundContextDecorators() []func(contex return []func(context.Context, binding.Message) context.Context{tracePropagatorContextDecorator} } -func (o opencensusObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) { +func (o opencensusObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) { ctx, r := NewReporter(ctx, reportReceive) r.Error() } diff --git a/observability/opentelemetry/v2/client/otel_observability_service.go b/observability/opentelemetry/v2/client/otel_observability_service.go index 25ead8331..c7c5ca84b 100644 --- a/observability/opentelemetry/v2/client/otel_observability_service.go +++ b/observability/opentelemetry/v2/client/otel_observability_service.go @@ -57,7 +57,7 @@ func (o OTelObservabilityService) InboundContextDecorators() []func(context.Cont } // RecordReceivedMalformedEvent records the error from a malformed event in the span. -func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) { +func (o OTelObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) { spanName := observability.ClientSpanName + ".malformed receive" _, span := o.tracer.Start( ctx, spanName, diff --git a/test/observability/go.mod b/test/observability/go.mod index 51e4e2d45..c1363ae2b 100644 --- a/test/observability/go.mod +++ b/test/observability/go.mod @@ -14,6 +14,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.2 // indirect + github.com/google/go-cmp v0.5.6 // indirect github.com/google/uuid v1.1.1 // indirect github.com/json-iterator/go v1.1.10 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect diff --git a/test/observability/opentelemetry/otel_observability_service_test.go b/test/observability/opentelemetry/otel_observability_service_test.go index da55dd81f..862f8a4b9 100644 --- a/test/observability/opentelemetry/otel_observability_service_test.go +++ b/test/observability/opentelemetry/otel_observability_service_test.go @@ -23,6 +23,7 @@ import ( "github.com/cloudevents/sdk-go/v2/extensions" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/cloudevents/sdk-go/v2/test" ) var ( @@ -385,7 +386,7 @@ func TestRecordReceivedMalformedEvent(t *testing.T) { os := otelObs.NewOTelObservabilityService() // act - os.RecordReceivedMalformedEvent(ctx, tc.expectedResult) + os.RecordReceivedMalformedEvent(ctx, test.FullMessage(), tc.expectedResult) spans := sr.Ended() diff --git a/v2/client/invoker.go b/v2/client/invoker.go index a3080b007..c5e557e8c 100644 --- a/v2/client/invoker.go +++ b/v2/client/invoker.go @@ -65,13 +65,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p e, eventErr := binding.ToEvent(ctx, m) switch { case eventErr != nil && r.fn.hasEventIn: - r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr) + r.observabilityService.RecordReceivedMalformedEvent(ctx, m, eventErr) return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr)) case r.fn != nil: // Check if event is valid before invoking the receiver function if e != nil { if validationErr := e.Validate(); validationErr != nil { - r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr) + r.observabilityService.RecordReceivedMalformedEvent(ctx, m, validationErr) return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr)) } } diff --git a/v2/client/observability.go b/v2/client/observability.go index 75005d3bb..c050ab641 100644 --- a/v2/client/observability.go +++ b/v2/client/observability.go @@ -18,7 +18,7 @@ type ObservabilityService interface { InboundContextDecorators() []func(context.Context, binding.Message) context.Context // RecordReceivedMalformedEvent is invoked when an event was received but it's malformed or invalid. - RecordReceivedMalformedEvent(ctx context.Context, err error) + RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) // RecordCallingInvoker is invoked before the user function is invoked. // The returned callback will be invoked after the user finishes to process the event with the eventual processing error // The error provided to the callback could be both a processing error, or a result @@ -39,7 +39,8 @@ func (n noopObservabilityService) InboundContextDecorators() []func(context.Cont return nil } -func (n noopObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, err error) {} +func (n noopObservabilityService) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) { +} func (n noopObservabilityService) RecordCallingInvoker(ctx context.Context, event *event.Event) (context.Context, func(errOrResult error)) { return ctx, func(errOrResult error) {}