From 79371c17c107dc300bfc67e19f4b62c9a6a22709 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 25 Jan 2024 12:35:10 -0800 Subject: [PATCH] Add the SampledFilter exemplar Reservoir impl (#4851) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This Reservoir implementaiton is used at the MeterProvider level to pre-filter measurements offered to a wrapped Reservoir. Co-authored-by: Robert PajÄ…k --- sdk/metric/internal/exemplar/filter.go | 40 +++++++++++ sdk/metric/internal/exemplar/filter_test.go | 77 +++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 sdk/metric/internal/exemplar/filter.go create mode 100644 sdk/metric/internal/exemplar/filter_test.go diff --git a/sdk/metric/internal/exemplar/filter.go b/sdk/metric/internal/exemplar/filter.go new file mode 100644 index 00000000000..4f5946fb966 --- /dev/null +++ b/sdk/metric/internal/exemplar/filter.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// SampledFilter returns a [Reservoir] wrapping r that will only offer measurements +// to r if the passed context associated with the measurement contains a sampled +// [go.opentelemetry.io/otel/trace.SpanContext]. +func SampledFilter[N int64 | float64](r Reservoir[N]) Reservoir[N] { + return filtered[N]{Reservoir: r} +} + +type filtered[N int64 | float64] struct { + Reservoir[N] +} + +func (f filtered[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { + if trace.SpanContextFromContext(ctx).IsSampled() { + f.Reservoir.Offer(ctx, t, n, a) + } +} diff --git a/sdk/metric/internal/exemplar/filter_test.go b/sdk/metric/internal/exemplar/filter_test.go new file mode 100644 index 00000000000..ae1e276cb83 --- /dev/null +++ b/sdk/metric/internal/exemplar/filter_test.go @@ -0,0 +1,77 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" +) + +func TestSampledFilter(t *testing.T) { + t.Run("Int64", testSampledFiltered[int64]) + t.Run("Float64", testSampledFiltered[float64]) +} + +func testSampledFiltered[N int64 | float64](t *testing.T) { + under := &res[N]{} + + r := SampledFilter[N](under) + + ctx := context.Background() + r.Offer(ctx, staticTime, 0, nil) + assert.False(t, under.OfferCalled, "underlying Reservoir Offer called") + r.Offer(sample(ctx), staticTime, 0, nil) + assert.True(t, under.OfferCalled, "underlying Reservoir Offer not called") + + r.Collect(nil) + assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called") + + r.Flush(nil) + assert.True(t, under.FlushCalled, "underlying Reservoir Flush not called") +} + +func sample(parent context.Context) context.Context { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01}, + SpanID: trace.SpanID{0x01}, + TraceFlags: trace.FlagsSampled, + }) + return trace.ContextWithSpanContext(parent, sc) +} + +type res[N int64 | float64] struct { + OfferCalled bool + CollectCalled bool + FlushCalled bool +} + +func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) { + r.OfferCalled = true +} + +func (r *res[N]) Collect(*[]metricdata.Exemplar[N]) { + r.CollectCalled = true +} + +func (r *res[N]) Flush(*[]metricdata.Exemplar[N]) { + r.FlushCalled = true +}