diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go new file mode 100644 index 000000000..c630ea06c --- /dev/null +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -0,0 +1,102 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datapointstorage + +import ( + "fmt" + "strings" + "time" + + "go.opentelemetry.io/collector/model/pdata" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" +) + +const gcInterval = 20 * time.Minute + +type Cache map[string]usedPoint + +type usedPoint struct { + point *pdata.NumberDataPoint + used bool +} + +// New instantiates a cache and starts background processes +func NewCache(shutdown <-chan struct{}) Cache { + c := make(Cache) + go func() { + ticker := time.NewTicker(gcInterval) + for c.gc(shutdown, ticker.C) { + } + }() + return c +} + +// Get retrieves the point associated with the identifier, and whether +// or not it was found +func (c Cache) Get(identifier string) (*pdata.NumberDataPoint, bool) { + point, found := c[identifier] + if found { + point.used = true + c[identifier] = point + } + return point.point, found +} + +// Set assigns the point to the identifier in the cache +func (c Cache) Set(identifier string, point *pdata.NumberDataPoint) { + c[identifier] = usedPoint{point, true} +} + +// gc garbage collects the cache after the ticker ticks +func (c Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool { + select { + case <-shutdown: + return false + case <-tickerCh: + // garbage collect the cache + for id, point := range c { + if point.used { + // for points that have been used, mark them as unused + point.used = false + c[id] = point + } else { + // for points that have not been used, delete points + delete(c, id) + } + } + } + return true +} + +// Identifier returns the unique string identifier for a metric +func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pdata.Metric, labels pdata.AttributeMap) string { + var b strings.Builder + + // Resource identifiers + if resource != nil { + fmt.Fprintf(&b, "%v", resource.GetLabels()) + } + + // Instrumentation library labels and additional resource labels + fmt.Fprintf(&b, " - %v", extraLabels) + + // Metric identifiers + fmt.Fprintf(&b, " - %s -", metric.Name()) + labels.Sort().Range(func(k string, v pdata.AttributeValue) bool { + fmt.Fprintf(&b, " %s=%s", k, v.AsString()) + return true + }) + return b.String() +} diff --git a/exporter/collector/internal/datapointstorage/datapointcache_test.go b/exporter/collector/internal/datapointstorage/datapointcache_test.go new file mode 100644 index 000000000..2b7956654 --- /dev/null +++ b/exporter/collector/internal/datapointstorage/datapointcache_test.go @@ -0,0 +1,192 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datapointstorage + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" +) + +func TestSetAndGet(t *testing.T) { + c := make(Cache) + c.Set("foo", nil) + point, found := c.Get("foo") + assert.Nil(t, point) + assert.True(t, found) + + point, found = c.Get("bar") + assert.Nil(t, point) + assert.False(t, found) + + setPoint := pdata.NewNumberDataPoint() + c.Set("bar", &setPoint) + + point, found = c.Get("bar") + assert.Equal(t, point, &setPoint) + assert.True(t, found) +} + +func TestShutdown(t *testing.T) { + shutdown := make(chan struct{}) + c := make(Cache) + close(shutdown) + // gc should return after shutdown is closed + cont := c.gc(shutdown, make(chan time.Time)) + assert.False(t, cont) +} + +func TestGC(t *testing.T) { + shutdown := make(chan struct{}) + c := make(Cache) + fakeTicker := make(chan time.Time) + + c.Set("bar", nil) + + // bar exists since we just set it + usedPoint, found := c["bar"] + assert.True(t, usedPoint.used) + assert.True(t, found) + + // first gc tick marks bar stale + go func() { + fakeTicker <- time.Now() + }() + cont := c.gc(shutdown, fakeTicker) + assert.True(t, cont) + usedPoint, found = c["bar"] + assert.False(t, usedPoint.used) + assert.True(t, found) + + // second gc tick removes bar + go func() { + fakeTicker <- time.Now() + }() + cont = c.gc(shutdown, fakeTicker) + assert.True(t, cont) + _, found = c["bar"] + assert.False(t, found) +} + +func TestGetPreventsGC(t *testing.T) { + shutdown := make(chan struct{}) + c := make(Cache) + fakeTicker := make(chan time.Time) + + setPoint := pdata.NewNumberDataPoint() + c.Set("bar", &setPoint) + + // bar exists since we just set it + _, found := c["bar"] + assert.True(t, found) + + // first gc tick marks bar stale + go func() { + fakeTicker <- time.Now() + }() + cont := c.gc(shutdown, fakeTicker) + assert.True(t, cont) + // calling Get() marks it fresh again. + _, found = c.Get("bar") + assert.True(t, found) + + // second gc tick does not remove bar + go func() { + fakeTicker <- time.Now() + }() + cont = c.gc(shutdown, fakeTicker) + assert.True(t, cont) + _, found = c["bar"] + assert.True(t, found) +} + +func TestIdentifier(t *testing.T) { + metricWithName := pdata.NewMetric() + metricWithName.SetName("custom.googleapis.com/test.metric") + dpWithAttributes := pdata.NewNumberDataPoint() + dpWithAttributes.Attributes().Insert("string", pdata.NewAttributeValueString("strval")) + dpWithAttributes.Attributes().Insert("bool", pdata.NewAttributeValueBool(true)) + dpWithAttributes.Attributes().Insert("int", pdata.NewAttributeValueInt(123)) + monitoredResource := &monitoredrespb.MonitoredResource{ + Type: "generic_task", + Labels: map[string]string{ + "location": "us-central1-b", + "project": "project-foo", + }, + } + extraLabels := map[string]string{ + "foo": "bar", + "hello": "world", + } + for _, tc := range []struct { + desc string + want string + resource *monitoredrespb.MonitoredResource + extraLabels map[string]string + metric pdata.Metric + labels pdata.AttributeMap + }{ + { + desc: "empty", + want: " - map[] - -", + metric: pdata.NewMetric(), + labels: pdata.NewNumberDataPoint().Attributes(), + }, + { + desc: "with name", + want: " - map[] - custom.googleapis.com/test.metric -", + metric: metricWithName, + labels: pdata.NewNumberDataPoint().Attributes(), + }, + { + desc: "with attributes", + want: " - map[] - - bool=true int=123 string=strval", + metric: pdata.NewMetric(), + labels: dpWithAttributes.Attributes(), + }, + { + desc: "with resource", + want: "map[location:us-central1-b project:project-foo] - map[] - -", + resource: monitoredResource, + metric: pdata.NewMetric(), + labels: pdata.NewNumberDataPoint().Attributes(), + }, + { + desc: "with extra labels", + want: " - map[foo:bar hello:world] - -", + metric: pdata.NewMetric(), + labels: pdata.NewNumberDataPoint().Attributes(), + extraLabels: extraLabels, + }, + { + desc: "with all", + want: "map[location:us-central1-b project:project-foo] - map[foo:bar hello:world] - custom.googleapis.com/test.metric - bool=true int=123 string=strval", + metric: metricWithName, + labels: dpWithAttributes.Attributes(), + extraLabels: extraLabels, + resource: monitoredResource, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + got := Identifier(tc.resource, tc.extraLabels, tc.metric, tc.labels) + if tc.want != got { + t.Errorf("Identifier() = %q; want %q", got, tc.want) + } + }) + } +} diff --git a/exporter/collector/metrics.go b/exporter/collector/metrics.go index f5903a1c3..b663aa45a 100644 --- a/exporter/collector/metrics.go +++ b/exporter/collector/metrics.go @@ -42,6 +42,8 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/datapointstorage" ) // self-observability reporting meters/tracers/loggers. @@ -77,8 +79,9 @@ type MetricsExporter struct { // metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has // all pure functions. type metricMapper struct { - obs selfObservability - cfg Config + obs selfObservability + cfg Config + sumCache datapointstorage.Cache } // Constants we use when translating summary metrics into GCP. @@ -139,11 +142,16 @@ func NewGoogleCloudMetricsExporter( return nil, err } obs := selfObservability{log: log} + shutdown := make(chan struct{}) mExp := &MetricsExporter{ cfg: cfg, client: client, obs: obs, - mapper: metricMapper{obs, cfg}, + mapper: metricMapper{ + obs, + cfg, + datapointstorage.NewCache(shutdown), + }, // We create a buffered channel for metric descriptors. // MetricDescritpors are asychronously sent and optimistic. // We only get Unit/Description/Display name from them, so it's ok @@ -151,7 +159,7 @@ func NewGoogleCloudMetricsExporter( metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize), mdCache: make(map[string]*metricpb.MetricDescriptor), timeSeriesC: make(chan *monitoringpb.TimeSeries), - shutdownC: make(chan struct{}), + shutdownC: shutdown, } // Fire up the metric descriptor exporter. @@ -334,7 +342,7 @@ func (m *metricMapper) metricToTimeSeries( points := sum.DataPoints() for i := 0; i < points.Len(); i++ { ts := m.sumPointToTimeSeries(resource, extraLabels, metric, sum, points.At(i)) - timeSeries = append(timeSeries, ts) + timeSeries = append(timeSeries, ts...) } case pdata.MetricDataTypeGauge: gauge := metric.Gauge() @@ -659,16 +667,24 @@ func (m *metricMapper) sumPointToTimeSeries( metric pdata.Metric, sum pdata.Sum, point pdata.NumberDataPoint, -) *monitoringpb.TimeSeries { +) []*monitoringpb.TimeSeries { metricKind := metricpb.MetricDescriptor_CUMULATIVE - startTime := timestamppb.New(point.StartTimestamp().AsTime()) - if !sum.IsMonotonic() { + var startTime *timestamppb.Timestamp + if sum.IsMonotonic() { + metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + normalizedPoint := m.normalizeNumberDataPoint(point, metricIdentifier) + if normalizedPoint == nil { + return nil + } + point = *normalizedPoint + startTime = timestamppb.New(normalizedPoint.StartTimestamp().AsTime()) + } else { metricKind = metricpb.MetricDescriptor_GAUGE startTime = nil } value, valueType := numberDataPointToValue(point) - return &monitoringpb.TimeSeries{ + return []*monitoringpb.TimeSeries{{ Resource: resource, Unit: metric.Unit(), MetricKind: metricKind, @@ -687,7 +703,49 @@ func (m *metricMapper) sumPointToTimeSeries( extraLabels, ), }, + }} +} + +// normalizeNumberDataPoint returns a point which has been normalized against an initial +// start point, or nil if the point should be dropped. +func (m *metricMapper) normalizeNumberDataPoint(point pdata.NumberDataPoint, identifier string) *pdata.NumberDataPoint { + // if the point doesn't need to be normalized, use original point + normalizedPoint := &point + start, ok := m.sumCache.Get(identifier) + if ok { + if !start.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { + // We found a cached start timestamp that wouldn't produce a valid point. + // Drop it and log. + m.obs.log.Info( + "data point being processed older than last recorded reset, will not be emitted", + zap.String("lastRecordedReset", start.Timestamp().String()), + zap.String("dataPoint", point.Timestamp().String()), + ) + return nil + } + // Make a copy so we don't mutate underlying data + newPoint := pdata.NewNumberDataPoint() + point.CopyTo(newPoint) + // Use the start timestamp from the normalization point + newPoint.SetStartTimestamp(start.Timestamp()) + // Adjust the value based on the start point's value + switch newPoint.ValueType() { + case pdata.MetricValueTypeInt: + newPoint.SetIntVal(point.IntVal() - start.IntVal()) + case pdata.MetricValueTypeDouble: + newPoint.SetDoubleVal(point.DoubleVal() - start.DoubleVal()) + } + normalizedPoint = &newPoint + } + if (!ok && point.StartTimestamp().AsTime().IsZero()) || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { + // This is the first time we've seen this metric, or we received + // an explicit reset point as described in + // https://github.com/open-telemetry/opentelemetry-specification/blob/9555f9594c7ffe5dc333b53da5e0f880026cead1/specification/metrics/datamodel.md#resets-and-gaps + // Record it in history and drop the point. + m.sumCache.Set(identifier, &point) + return nil } + return normalizedPoint } func (m *metricMapper) gaugePointToTimeSeries( diff --git a/exporter/collector/metrics_test.go b/exporter/collector/metrics_test.go index 23221e6a7..dc86465da 100644 --- a/exporter/collector/metrics_test.go +++ b/exporter/collector/metrics_test.go @@ -30,31 +30,50 @@ import ( monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/datapointstorage" ) var ( start = time.Unix(0, 0) ) -func newTestMetricMapper() metricMapper { +func newTestMetricMapper() (metricMapper, func()) { obs := selfObservability{log: zap.NewNop()} - return metricMapper{obs, DefaultConfig()} + s := make(chan struct{}) + return metricMapper{ + obs, + DefaultConfig(), + datapointstorage.NewCache(s), + }, func() { close(s) } } func TestMetricToTimeSeries(t *testing.T) { mr := &monitoredrespb.MonitoredResource{} t.Run("Sum", func(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() metric := pdata.NewMetric() metric.SetDataType(pdata.MetricDataTypeSum) sum := metric.Sum() sum.SetIsMonotonic(true) sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + startTs := pdata.NewTimestampFromTime(start) + endTs := pdata.NewTimestampFromTime(start.Add(time.Hour)) // Add three points - sum.DataPoints().AppendEmpty().SetDoubleVal(10) - sum.DataPoints().AppendEmpty().SetDoubleVal(15) - sum.DataPoints().AppendEmpty().SetDoubleVal(16) + point := sum.DataPoints().AppendEmpty() + point.SetDoubleVal(10) + point.SetStartTimestamp(startTs) + point.SetTimestamp(endTs) + point = sum.DataPoints().AppendEmpty() + point.SetDoubleVal(15) + point.SetStartTimestamp(startTs) + point.SetTimestamp(endTs) + point = sum.DataPoints().AppendEmpty() + point.SetDoubleVal(16) + point.SetStartTimestamp(startTs) + point.SetTimestamp(endTs) ts := mapper.metricToTimeSeries( mr, @@ -65,8 +84,79 @@ func TestMetricToTimeSeries(t *testing.T) { require.Same(t, ts[0].Resource, mr, "Should assign the passed in monitored resource") }) + t.Run("Sum without timestamps", func(t *testing.T) { + mapper, shutdown := newTestMetricMapper() + defer shutdown() + metric := pdata.NewMetric() + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + startTs := pdata.NewTimestampFromTime(start) + endTs := pdata.NewTimestampFromTime(start.Add(time.Hour)) + // Add three points without start timestamps. + // The first one should be dropped to set the start timestamp for the rest + point := sum.DataPoints().AppendEmpty() + point.SetDoubleVal(10) + point.SetTimestamp(startTs) + point = sum.DataPoints().AppendEmpty() + point.SetDoubleVal(15) + point.SetTimestamp(endTs) + point = sum.DataPoints().AppendEmpty() + point.SetDoubleVal(16) + point.SetTimestamp(endTs) + + ts := mapper.metricToTimeSeries( + mr, + labels{}, + metric, + ) + require.Len(t, ts, 2, "Should create one timeseries for each sum point") + require.Same(t, ts[0].Resource, mr, "Should assign the passed in monitored resource") + require.Equal(t, ts[0].Points[0].Value.GetDoubleValue(), 5.0, "Should normalize the resulting sum") + require.Equal(t, ts[0].Points[0].Interval.StartTime, timestamppb.New(start), "Should use the first timestamp as the start time for the rest of the points") + }) + + t.Run("Sum with reset timestamp", func(t *testing.T) { + mapper, shutdown := newTestMetricMapper() + defer shutdown() + metric := pdata.NewMetric() + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + startTs := pdata.NewTimestampFromTime(start) + middleTs := pdata.NewTimestampFromTime(start.Add(30 * time.Minute)) + endTs := pdata.NewTimestampFromTime(start.Add(time.Hour)) + // Add three points + point := sum.DataPoints().AppendEmpty() + point.SetDoubleVal(10) + point.SetStartTimestamp(startTs) + point.SetTimestamp(middleTs) + point = sum.DataPoints().AppendEmpty() + point.SetDoubleVal(15) + // identical start and end indicates a reset point + // this point is expected to be dropped, and to normalize subsequent points + point.SetStartTimestamp(middleTs) + point.SetTimestamp(middleTs) + point = sum.DataPoints().AppendEmpty() + point.SetDoubleVal(16) + point.SetStartTimestamp(middleTs) + point.SetTimestamp(endTs) + + ts := mapper.metricToTimeSeries( + mr, + labels{}, + metric, + ) + require.Len(t, ts, 2, "Should create one timeseries for each sum point") + require.Same(t, ts[0].Resource, mr, "Should assign the passed in monitored resource") + require.Equal(t, ts[1].Points[0].Value.GetDoubleValue(), 1.0, "Should normalize the point after the reset") + }) + t.Run("Gauge", func(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() metric := pdata.NewMetric() metric.SetDataType(pdata.MetricDataTypeGauge) gauge := metric.Gauge() @@ -104,7 +194,8 @@ func TestMergeLabels(t *testing.T) { } func TestHistogramPointToTimeSeries(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() mapper.cfg.ProjectID = "myproject" mr := &monitoredrespb.MonitoredResource{} metric := pdata.NewMetric() @@ -167,7 +258,8 @@ func TestHistogramPointToTimeSeries(t *testing.T) { } func TestExponentialHistogramPointToTimeSeries(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() mapper.cfg.ProjectID = "myproject" mr := &monitoredrespb.MonitoredResource{} metric := pdata.NewMetric() @@ -234,7 +326,8 @@ func TestExponentialHistogramPointToTimeSeries(t *testing.T) { } func TestExemplarNoAttachements(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() exemplar := pdata.NewExemplar() exemplar.SetTimestamp(pdata.NewTimestampFromTime(start)) exemplar.SetDoubleVal(1) @@ -246,7 +339,8 @@ func TestExemplarNoAttachements(t *testing.T) { } func TestExemplarOnlyDroppedLabels(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() exemplar := pdata.NewExemplar() exemplar.SetTimestamp(pdata.NewTimestampFromTime(start)) exemplar.SetDoubleVal(1) @@ -264,7 +358,8 @@ func TestExemplarOnlyDroppedLabels(t *testing.T) { } func TestExemplarOnlyTraceId(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() mapper.cfg.ProjectID = "p" exemplar := pdata.NewExemplar() exemplar.SetTimestamp(pdata.NewTimestampFromTime(start)) @@ -288,7 +383,8 @@ func TestExemplarOnlyTraceId(t *testing.T) { } func TestSumPointToTimeSeries(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() mr := &monitoredrespb.MonitoredResource{} newCase := func() (pdata.Metric, pdata.Sum, pdata.NumberDataPoint) { @@ -312,7 +408,9 @@ func TestSumPointToTimeSeries(t *testing.T) { point.SetStartTimestamp(pdata.NewTimestampFromTime(start)) point.SetTimestamp(pdata.NewTimestampFromTime(end)) - ts := mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + tsl := mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts := tsl[0] assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_CUMULATIVE) assert.Equal(t, ts.ValueType, metricpb.MetricDescriptor_INT64) assert.Equal(t, ts.Unit, unit) @@ -332,7 +430,9 @@ func TestSumPointToTimeSeries(t *testing.T) { // Test double as well point.SetDoubleVal(float64(value)) - ts = mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + tsl = mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts = tsl[0] assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_CUMULATIVE) assert.Equal(t, ts.ValueType, metricpb.MetricDescriptor_DOUBLE) assert.Equal(t, ts.Points[0].Value.GetDoubleValue(), float64(value)) @@ -349,7 +449,9 @@ func TestSumPointToTimeSeries(t *testing.T) { point.SetTimestamp(pdata.NewTimestampFromTime(end)) // Should output a "pseudo-cumulative" with same interval as the delta - ts := mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + tsl := mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts := tsl[0] assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_CUMULATIVE) assert.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ StartTime: timestamppb.New(start), @@ -368,14 +470,18 @@ func TestSumPointToTimeSeries(t *testing.T) { point.SetTimestamp(pdata.NewTimestampFromTime(end)) // Should output a gauge regardless of temporality, only setting end time - ts := mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + tsl := mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts := tsl[0] assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_GAUGE) assert.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ EndTime: timestamppb.New(end), }) sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta) - ts = mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + tsl = mapper.sumPointToTimeSeries(mr, labels{}, metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts = tsl[0] assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_GAUGE) assert.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ EndTime: timestamppb.New(end), @@ -384,19 +490,27 @@ func TestSumPointToTimeSeries(t *testing.T) { t.Run("Add labels", func(t *testing.T) { metric, sum, point := newCase() + end := start.Add(time.Hour) + point.SetStartTimestamp(pdata.NewTimestampFromTime(start)) + point.SetTimestamp(pdata.NewTimestampFromTime(end)) extraLabels := map[string]string{"foo": "bar"} - ts := mapper.sumPointToTimeSeries(mr, labels(extraLabels), metric, sum, point) + tsl := mapper.sumPointToTimeSeries(mr, labels(extraLabels), metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts := tsl[0] assert.Equal(t, ts.Metric.Labels, extraLabels) // Full set of labels point.Attributes().InsertString("baz", "bar") - ts = mapper.sumPointToTimeSeries(mr, labels(extraLabels), metric, sum, point) + tsl = mapper.sumPointToTimeSeries(mr, labels(extraLabels), metric, sum, point) + assert.Equal(t, 1, len(tsl)) + ts = tsl[0] assert.Equal(t, ts.Metric.Labels, map[string]string{"foo": "bar", "baz": "bar"}) }) } func TestGaugePointToTimeSeries(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() mr := &monitoredrespb.MonitoredResource{} newCase := func() (pdata.Metric, pdata.Gauge, pdata.NumberDataPoint) { @@ -452,7 +566,8 @@ func TestGaugePointToTimeSeries(t *testing.T) { } func TestSummaryPointToTimeSeries(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() mr := &monitoredrespb.MonitoredResource{} metric := pdata.NewMetric() @@ -524,7 +639,8 @@ func TestSummaryPointToTimeSeries(t *testing.T) { } func TestMetricNameToType(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() assert.Equal( t, mapper.metricNameToType("foo"), @@ -981,7 +1097,8 @@ func TestMetricDescriptorMapping(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() metric := test.metricCreator() md := mapper.metricDescriptor(metric, test.extraLabels) diff := cmp.Diff( @@ -1038,7 +1155,8 @@ func TestKnownDomains(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("%v to %v", test.name, test.metricType), func(t *testing.T) { - mapper := newTestMetricMapper() + mapper, shutdown := newTestMetricMapper() + defer shutdown() if len(test.knownDomains) > 0 { mapper.cfg.MetricConfig.KnownDomains = test.knownDomains } @@ -1080,7 +1198,8 @@ func TestInstrumentationLibraryToLabels(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - m := newTestMetricMapper() + m, shutdown := newTestMetricMapper() + defer shutdown() m.cfg.MetricConfig = test.metricConfig out := m.instrumentationLibraryToLabels(test.instrumentationLibrary)