From bc53abaa4d2e2471b0a4023a53bb64ecac2d61a6 Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Wed, 24 May 2023 18:29:53 +0200 Subject: [PATCH 1/5] Drop the old output v1 --- output/cloud/v1/bench_test.go | 358 --------------- output/cloud/v1/cloud_easyjson.go | 643 -------------------------- output/cloud/v1/data.go | 348 -------------- output/cloud/v1/data_test.go | 335 -------------- output/cloud/v1/metrics_client.go | 116 ----- output/cloud/v1/output.go | 555 ----------------------- output/cloud/v1/output_test.go | 727 ------------------------------ 7 files changed, 3082 deletions(-) delete mode 100644 output/cloud/v1/bench_test.go delete mode 100644 output/cloud/v1/cloud_easyjson.go delete mode 100644 output/cloud/v1/data.go delete mode 100644 output/cloud/v1/data_test.go delete mode 100644 output/cloud/v1/metrics_client.go delete mode 100644 output/cloud/v1/output.go delete mode 100644 output/cloud/v1/output_test.go diff --git a/output/cloud/v1/bench_test.go b/output/cloud/v1/bench_test.go deleted file mode 100644 index 483ff71627f..00000000000 --- a/output/cloud/v1/bench_test.go +++ /dev/null @@ -1,358 +0,0 @@ -package cloud - -import ( - "bytes" - "compress/gzip" - json "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "testing" - "time" - - easyjson "github.com/mailru/easyjson" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" - - "go.k6.io/k6/lib" - "go.k6.io/k6/lib/netext/httpext" - "go.k6.io/k6/lib/testutils" - "go.k6.io/k6/lib/testutils/httpmultibin" - "go.k6.io/k6/lib/types" - "go.k6.io/k6/metrics" - "go.k6.io/k6/output" -) - -func BenchmarkAggregateHTTP(b *testing.B) { - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(b), - JSONConfig: json.RawMessage(`{"noCompress": true, "aggregationCalcInterval": "200ms","aggregationPeriod": "200ms"}`), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(b, err) - now := time.Now() - out.referenceID = "something" - containersCount := 500000 - - for _, tagCount := range []int{1, 5, 35, 315, 3645} { - tagCount := tagCount - b.Run(fmt.Sprintf("tags:%d", tagCount), func(b *testing.B) { - registry := metrics.NewRegistry() - b.ResetTimer() - for s := 0; s < b.N; s++ { - b.StopTimer() - container := make([]metrics.SampleContainer, containersCount) - for i := 1; i <= containersCount; i++ { - status := "200" - if i%tagCount%7 == 6 { - status = "404" - } else if i%tagCount%7 == 5 { - status = "500" - } - - tags := generateTags(registry, i, tagCount, "status", status) - container[i-1] = generateHTTPExtTrail(now, time.Duration(i), tags) - } - out.AddMetricSamples(container) - b.StartTimer() - out.aggregateHTTPTrails(time.Millisecond * 200) - out.bufferSamples = nil - } - }) - } -} - -func generateTags(registry *metrics.Registry, i, tagCount int, additionalPairs ...string) *metrics.TagSet { - tags := registry.RootTagSet(). - With("test", "mest"). - With("a", "b"). - With("custom", fmt.Sprintf("group%d", i%tagCount%9)). - With("group", fmt.Sprintf("group%d", i%tagCount%5)). - With("url", fmt.Sprintf("something%d", i%tagCount%11)). - With("name", fmt.Sprintf("else%d", i%tagCount%11)) - - for j := 0; j < len(additionalPairs); j += 2 { - tags.With(additionalPairs[j], additionalPairs[j+1]) - } - - return tags -} - -func BenchmarkMetricMarshal(b *testing.B) { - for _, count := range []int{10000, 100000, 500000} { - count := count - registry := metrics.NewRegistry() - b.Run(fmt.Sprintf("%d", count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - s := generateSamples(registry, count) - b.StartTimer() - r, err := easyjson.Marshal(samples(s)) - require.NoError(b, err) - b.SetBytes(int64(len(r))) - } - }) - } -} - -func BenchmarkMetricMarshalWriter(b *testing.B) { - for _, count := range []int{10000, 100000, 500000} { - count := count - registry := metrics.NewRegistry() - b.Run(fmt.Sprintf("%d", count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - s := generateSamples(registry, count) - b.StartTimer() - n, err := easyjson.MarshalToWriter(samples(s), io.Discard) - require.NoError(b, err) - b.SetBytes(int64(n)) - } - }) - } -} - -func BenchmarkMetricMarshalGzip(b *testing.B) { - for _, count := range []int{10000, 100000, 500000} { - for name, level := range map[string]int{ - "bestcompression": gzip.BestCompression, - "default": gzip.DefaultCompression, - "bestspeed": gzip.BestSpeed, - } { - count := count - level := level - registry := metrics.NewRegistry() - b.Run(fmt.Sprintf("%d_%s", count, name), func(b *testing.B) { - s := generateSamples(registry, count) - r, err := easyjson.Marshal(samples(s)) - require.NoError(b, err) - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - var buf bytes.Buffer - buf.Grow(len(r) / 5) - g, err := gzip.NewWriterLevel(&buf, level) - require.NoError(b, err) - b.StartTimer() - n, err := g.Write(r) - require.NoError(b, err) - b.SetBytes(int64(n)) - b.ReportMetric(float64(len(r))/float64(buf.Len()), "ratio") - } - }) - } - } -} - -func BenchmarkMetricMarshalGzipAll(b *testing.B) { - for _, count := range []int{10000, 100000, 500000} { - for name, level := range map[string]int{ - "bestspeed": gzip.BestSpeed, - } { - count := count - level := level - registry := metrics.NewRegistry() - b.Run(fmt.Sprintf("%d_%s", count, name), func(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - - s := generateSamples(registry, count) - var buf bytes.Buffer - g, err := gzip.NewWriterLevel(&buf, level) - require.NoError(b, err) - b.StartTimer() - - r, err := easyjson.Marshal(samples(s)) - require.NoError(b, err) - buf.Grow(len(r) / 5) - n, err := g.Write(r) - require.NoError(b, err) - b.SetBytes(int64(n)) - } - }) - } - } -} - -func BenchmarkMetricMarshalGzipAllWriter(b *testing.B) { - for _, count := range []int{10000, 100000, 500000} { - for name, level := range map[string]int{ - "bestspeed": gzip.BestSpeed, - } { - count := count - level := level - registry := metrics.NewRegistry() - b.Run(fmt.Sprintf("%d_%s", count, name), func(b *testing.B) { - var buf bytes.Buffer - for i := 0; i < b.N; i++ { - b.StopTimer() - buf.Reset() - - s := generateSamples(registry, count) - g, err := gzip.NewWriterLevel(&buf, level) - require.NoError(b, err) - pr, pw := io.Pipe() - b.StartTimer() - - go func() { - _, _ = easyjson.MarshalToWriter(samples(s), pw) - _ = pw.Close() - }() - n, err := io.Copy(g, pr) - require.NoError(b, err) - b.SetBytes(n) - } - }) - } - } -} - -func generateSamples(registry *metrics.Registry, count int) []*Sample { - samples := make([]*Sample, count) - now := time.Now() - for i := range samples { - tags := generateTags(registry, i, 200) - encodedTags, err := easyjson.Marshal(tags) - if err != nil { - panic(err) - } - switch i % 3 { - case 0: - samples[i] = &Sample{ - Type: DataTypeSingle, - Metric: "something", - Data: &SampleDataSingle{ - Time: toMicroSecond(now), - Type: metrics.Counter, - Tags: encodedTags, - Value: float64(i), - }, - } - case 1: - aggrData := &SampleDataAggregatedHTTPReqs{ - Time: toMicroSecond(now), - Type: "aggregated_trend", - Tags: encodedTags, - } - trail := generateHTTPExtTrail(now, time.Duration(i), tags) - aggrData.Add(trail) - aggrData.Add(trail) - aggrData.Add(trail) - aggrData.Add(trail) - aggrData.Add(trail) - aggrData.CalcAverages() - - samples[i] = &Sample{ - Type: DataTypeAggregatedHTTPReqs, - Metric: "something", - Data: aggrData, - } - default: - samples[i] = NewSampleFromTrail(generateHTTPExtTrail(now, time.Duration(i), tags)) - } - } - - return samples -} - -func generateHTTPExtTrail(now time.Time, i time.Duration, tags *metrics.TagSet) *httpext.Trail { - //nolint:durationcheck - return &httpext.Trail{ - Blocked: i % 200 * 100 * time.Millisecond, - Connecting: i % 200 * 200 * time.Millisecond, - TLSHandshaking: i % 200 * 300 * time.Millisecond, - Sending: i % 200 * 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - EndTime: now.Add(i % 100 * 100), - ConnDuration: 500 * time.Millisecond, - Duration: i % 150 * 1500 * time.Millisecond, - Tags: tags, - } -} - -func BenchmarkHTTPPush(b *testing.B) { - tb := httpmultibin.NewHTTPMultiBin(b) - tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := fmt.Fprint(w, `{ -"reference_id": "fake", -}`) - require.NoError(b, err) - })) - tb.Mux.HandleFunc("/v1/metrics/fake", - func(w http.ResponseWriter, r *http.Request) { - _, err := io.Copy(io.Discard, r.Body) - assert.NoError(b, err) - }, - ) - - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(b), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ -"host": "%s", -"noCompress": false, -"aggregationCalcInterval": "200ms", -"aggregationPeriod": "200ms" -}`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(b, err) - out.referenceID = "fake" - assert.False(b, out.config.NoCompress.Bool) - - for _, count := range []int{1000, 5000, 50000, 100000, 250000} { - count := count - registry := metrics.NewRegistry() - b.Run(fmt.Sprintf("count:%d", count), func(b *testing.B) { - samples := generateSamples(registry, count) - b.ResetTimer() - for s := 0; s < b.N; s++ { - b.StopTimer() - toSend := append([]*Sample{}, samples...) - b.StartTimer() - require.NoError(b, out.client.PushMetric("fake", toSend)) - } - }) - } -} - -func BenchmarkNewSampleFromTrail(b *testing.B) { - registry := metrics.NewRegistry() - tags := generateTags(registry, 1, 200) - now := time.Now() - trail := &httpext.Trail{ - Blocked: 200 * 100 * time.Millisecond, - Connecting: 200 * 200 * time.Millisecond, - TLSHandshaking: 200 * 300 * time.Millisecond, - Sending: 200 * 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - EndTime: now, - ConnDuration: 500 * time.Millisecond, - Duration: 150 * 1500 * time.Millisecond, - Tags: tags, - } - - b.Run("no failed", func(b *testing.B) { - for s := 0; s < b.N; s++ { - _ = NewSampleFromTrail(trail) - } - }) - - trail.Failed = null.BoolFrom(true) - b.Run("failed", func(b *testing.B) { - for s := 0; s < b.N; s++ { - _ = NewSampleFromTrail(trail) - } - }) -} diff --git a/output/cloud/v1/cloud_easyjson.go b/output/cloud/v1/cloud_easyjson.go deleted file mode 100644 index f7729f3705f..00000000000 --- a/output/cloud/v1/cloud_easyjson.go +++ /dev/null @@ -1,643 +0,0 @@ -// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. - -package cloud - -import ( - json "encoding/json" - easyjson "github.com/mailru/easyjson" - jlexer "github.com/mailru/easyjson/jlexer" - jwriter "github.com/mailru/easyjson/jwriter" -) - -// suppress unused package warning -var ( - _ *json.RawMessage - _ *jlexer.Lexer - _ *jwriter.Writer - _ easyjson.Marshaler -) - -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud(in *jlexer.Lexer, out *samples) { - isTopLevel := in.IsStart() - if in.IsNull() { - in.Skip() - *out = nil - } else { - in.Delim('[') - if *out == nil { - if !in.IsDelim(']') { - *out = make(samples, 0, 8) - } else { - *out = samples{} - } - } else { - *out = (*out)[:0] - } - for !in.IsDelim(']') { - var v1 *Sample - if in.IsNull() { - in.Skip() - v1 = nil - } else { - if v1 == nil { - v1 = new(Sample) - } - (*v1).UnmarshalEasyJSON(in) - } - *out = append(*out, v1) - in.WantComma() - } - in.Delim(']') - } - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud(out *jwriter.Writer, in samples) { - if in == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v2, v3 := range in { - if v2 > 0 { - out.RawByte(',') - } - if v3 == nil { - out.RawString("null") - } else { - (*v3).MarshalEasyJSON(out) - } - } - out.RawByte(']') - } -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v samples) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGoK6IoK6OutputCloud(w, v) -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *samples) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGoK6IoK6OutputCloud(l, v) -} -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud1(in *jlexer.Lexer, out *SampleDataSingle) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "time": - out.Time = int64(in.Int64Str()) - case "type": - if data := in.UnsafeBytes(); in.Ok() { - in.AddError((out.Type).UnmarshalText(data)) - } - case "tags": - if data := in.Raw(); in.Ok() { - in.AddError((out.Tags).UnmarshalJSON(data)) - } - case "value": - out.Value = float64(in.Float64()) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud1(out *jwriter.Writer, in SampleDataSingle) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"time\":" - out.RawString(prefix[1:]) - out.Int64Str(int64(in.Time)) - } - { - const prefix string = ",\"type\":" - out.RawString(prefix) - out.Raw((in.Type).MarshalJSON()) - } - if len(in.Tags) != 0 { - const prefix string = ",\"tags\":" - out.RawString(prefix) - out.Raw((in.Tags).MarshalJSON()) - } - { - const prefix string = ",\"value\":" - out.RawString(prefix) - out.Float64(float64(in.Value)) - } - out.RawByte('}') -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v SampleDataSingle) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGoK6IoK6OutputCloud1(w, v) -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *SampleDataSingle) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGoK6IoK6OutputCloud1(l, v) -} -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud2(in *jlexer.Lexer, out *SampleDataMap) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "time": - out.Time = int64(in.Int64Str()) - case "type": - if data := in.UnsafeBytes(); in.Ok() { - in.AddError((out.Type).UnmarshalText(data)) - } - case "tags": - if data := in.Raw(); in.Ok() { - in.AddError((out.Tags).UnmarshalJSON(data)) - } - case "values": - if in.IsNull() { - in.Skip() - } else { - in.Delim('{') - if !in.IsDelim('}') { - out.Values = make(map[string]float64) - } else { - out.Values = nil - } - for !in.IsDelim('}') { - key := string(in.String()) - in.WantColon() - var v4 float64 - v4 = float64(in.Float64()) - (out.Values)[key] = v4 - in.WantComma() - } - in.Delim('}') - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud2(out *jwriter.Writer, in SampleDataMap) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"time\":" - out.RawString(prefix[1:]) - out.Int64Str(int64(in.Time)) - } - { - const prefix string = ",\"type\":" - out.RawString(prefix) - out.Raw((in.Type).MarshalJSON()) - } - if len(in.Tags) != 0 { - const prefix string = ",\"tags\":" - out.RawString(prefix) - out.Raw((in.Tags).MarshalJSON()) - } - if len(in.Values) != 0 { - const prefix string = ",\"values\":" - out.RawString(prefix) - { - out.RawByte('{') - v5First := true - for v5Name, v5Value := range in.Values { - if v5First { - v5First = false - } else { - out.RawByte(',') - } - out.String(string(v5Name)) - out.RawByte(':') - out.Float64(float64(v5Value)) - } - out.RawByte('}') - } - } - out.RawByte('}') -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v SampleDataMap) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGoK6IoK6OutputCloud2(w, v) -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *SampleDataMap) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGoK6IoK6OutputCloud2(l, v) -} -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud3(in *jlexer.Lexer, out *SampleDataAggregatedHTTPReqs) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "time": - out.Time = int64(in.Int64Str()) - case "type": - out.Type = string(in.String()) - case "count": - out.Count = uint64(in.Uint64()) - case "tags": - if data := in.Raw(); in.Ok() { - in.AddError((out.Tags).UnmarshalJSON(data)) - } - case "values": - easyjson9def2ecdDecode(in, &out.Values) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud3(out *jwriter.Writer, in SampleDataAggregatedHTTPReqs) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"time\":" - out.RawString(prefix[1:]) - out.Int64Str(int64(in.Time)) - } - { - const prefix string = ",\"type\":" - out.RawString(prefix) - out.String(string(in.Type)) - } - { - const prefix string = ",\"count\":" - out.RawString(prefix) - out.Uint64(uint64(in.Count)) - } - if len(in.Tags) != 0 { - const prefix string = ",\"tags\":" - out.RawString(prefix) - out.Raw((in.Tags).MarshalJSON()) - } - { - const prefix string = ",\"values\":" - out.RawString(prefix) - easyjson9def2ecdEncode(out, in.Values) - } - out.RawByte('}') -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v SampleDataAggregatedHTTPReqs) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGoK6IoK6OutputCloud3(w, v) -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *SampleDataAggregatedHTTPReqs) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGoK6IoK6OutputCloud3(l, v) -} -func easyjson9def2ecdDecode(in *jlexer.Lexer, out *struct { - Duration AggregatedMetric `json:"http_req_duration"` - Blocked AggregatedMetric `json:"http_req_blocked"` - Connecting AggregatedMetric `json:"http_req_connecting"` - TLSHandshaking AggregatedMetric `json:"http_req_tls_handshaking"` - Sending AggregatedMetric `json:"http_req_sending"` - Waiting AggregatedMetric `json:"http_req_waiting"` - Receiving AggregatedMetric `json:"http_req_receiving"` - Failed AggregatedRate `json:"http_req_failed,omitempty"` -}) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "http_req_duration": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.Duration) - case "http_req_blocked": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.Blocked) - case "http_req_connecting": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.Connecting) - case "http_req_tls_handshaking": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.TLSHandshaking) - case "http_req_sending": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.Sending) - case "http_req_waiting": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.Waiting) - case "http_req_receiving": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in, &out.Receiving) - case "http_req_failed": - easyjson9def2ecdDecodeGoK6IoK6OutputCloud5(in, &out.Failed) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncode(out *jwriter.Writer, in struct { - Duration AggregatedMetric `json:"http_req_duration"` - Blocked AggregatedMetric `json:"http_req_blocked"` - Connecting AggregatedMetric `json:"http_req_connecting"` - TLSHandshaking AggregatedMetric `json:"http_req_tls_handshaking"` - Sending AggregatedMetric `json:"http_req_sending"` - Waiting AggregatedMetric `json:"http_req_waiting"` - Receiving AggregatedMetric `json:"http_req_receiving"` - Failed AggregatedRate `json:"http_req_failed,omitempty"` -}) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"http_req_duration\":" - out.RawString(prefix[1:]) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.Duration) - } - { - const prefix string = ",\"http_req_blocked\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.Blocked) - } - { - const prefix string = ",\"http_req_connecting\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.Connecting) - } - { - const prefix string = ",\"http_req_tls_handshaking\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.TLSHandshaking) - } - { - const prefix string = ",\"http_req_sending\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.Sending) - } - { - const prefix string = ",\"http_req_waiting\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.Waiting) - } - { - const prefix string = ",\"http_req_receiving\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out, in.Receiving) - } - if (in.Failed).IsDefined() { - const prefix string = ",\"http_req_failed\":" - out.RawString(prefix) - easyjson9def2ecdEncodeGoK6IoK6OutputCloud5(out, in.Failed) - } - out.RawByte('}') -} -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud5(in *jlexer.Lexer, out *AggregatedRate) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "count": - out.Count = float64(in.Float64()) - case "nz_count": - out.NzCount = float64(in.Float64()) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud5(out *jwriter.Writer, in AggregatedRate) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"count\":" - out.RawString(prefix[1:]) - out.Float64(float64(in.Count)) - } - { - const prefix string = ",\"nz_count\":" - out.RawString(prefix) - out.Float64(float64(in.NzCount)) - } - out.RawByte('}') -} -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud4(in *jlexer.Lexer, out *AggregatedMetric) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "min": - out.Min = float64(in.Float64()) - case "max": - out.Max = float64(in.Float64()) - case "avg": - out.Avg = float64(in.Float64()) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud4(out *jwriter.Writer, in AggregatedMetric) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"min\":" - out.RawString(prefix[1:]) - out.Float64(float64(in.Min)) - } - { - const prefix string = ",\"max\":" - out.RawString(prefix) - out.Float64(float64(in.Max)) - } - { - const prefix string = ",\"avg\":" - out.RawString(prefix) - out.Float64(float64(in.Avg)) - } - out.RawByte('}') -} -func easyjson9def2ecdDecodeGoK6IoK6OutputCloud6(in *jlexer.Lexer, out *Sample) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeFieldName(false) - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "type": - out.Type = string(in.String()) - case "metric": - out.Metric = string(in.String()) - case "data": - if m, ok := out.Data.(easyjson.Unmarshaler); ok { - m.UnmarshalEasyJSON(in) - } else if m, ok := out.Data.(json.Unmarshaler); ok { - _ = m.UnmarshalJSON(in.Raw()) - } else { - out.Data = in.Interface() - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson9def2ecdEncodeGoK6IoK6OutputCloud6(out *jwriter.Writer, in Sample) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"type\":" - out.RawString(prefix[1:]) - out.String(string(in.Type)) - } - { - const prefix string = ",\"metric\":" - out.RawString(prefix) - out.String(string(in.Metric)) - } - { - const prefix string = ",\"data\":" - out.RawString(prefix) - if m, ok := in.Data.(easyjson.Marshaler); ok { - m.MarshalEasyJSON(out) - } else if m, ok := in.Data.(json.Marshaler); ok { - out.Raw(m.MarshalJSON()) - } else { - out.Raw(json.Marshal(in.Data)) - } - } - out.RawByte('}') -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v Sample) MarshalEasyJSON(w *jwriter.Writer) { - easyjson9def2ecdEncodeGoK6IoK6OutputCloud6(w, v) -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *Sample) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson9def2ecdDecodeGoK6IoK6OutputCloud6(l, v) -} diff --git a/output/cloud/v1/data.go b/output/cloud/v1/data.go deleted file mode 100644 index 275251841a2..00000000000 --- a/output/cloud/v1/data.go +++ /dev/null @@ -1,348 +0,0 @@ -package cloud - -import ( - "encoding/json" - "fmt" - "math" - "sort" - "time" - - easyjson "github.com/mailru/easyjson" - "go.k6.io/k6/lib/netext/httpext" - "go.k6.io/k6/metrics" -) - -// DataType constants -const ( - DataTypeSingle = "Point" - DataTypeMap = "Points" - DataTypeAggregatedHTTPReqs = "AggregatedPoints" -) - -//go:generate easyjson -pkg -no_std_marshalers -gen_build_flags -mod=mod . - -func toMicroSecond(t time.Time) int64 { - return t.UnixNano() / 1000 -} - -// Sample is the generic struct that contains all types of data that we send to the cloud. -// -//easyjson:json -type Sample struct { - Type string `json:"type"` - Metric string `json:"metric"` - Data interface{} `json:"data"` -} - -// UnmarshalJSON decodes the Data into the corresponding struct -func (ct *Sample) UnmarshalJSON(p []byte) error { - var tmpSample struct { - Type string `json:"type"` - Metric string `json:"metric"` - Data json.RawMessage `json:"data"` - } - if err := json.Unmarshal(p, &tmpSample); err != nil { - return err - } - s := Sample{ - Type: tmpSample.Type, - Metric: tmpSample.Metric, - } - - switch tmpSample.Type { - case DataTypeSingle: - s.Data = new(SampleDataSingle) - case DataTypeMap: - s.Data = new(SampleDataMap) - case DataTypeAggregatedHTTPReqs: - s.Data = new(SampleDataAggregatedHTTPReqs) - default: - return fmt.Errorf("unknown sample type '%s'", tmpSample.Type) - } - - if err := json.Unmarshal(tmpSample.Data, &s.Data); err != nil { - return err - } - - *ct = s - return nil -} - -// SampleDataSingle is used in all simple un-aggregated single-value samples. -// -//easyjson:json -type SampleDataSingle struct { - Time int64 `json:"time,string"` - Type metrics.MetricType `json:"type"` - Tags json.RawMessage `json:"tags,omitempty"` - Value float64 `json:"value"` -} - -// SampleDataMap is used by samples that contain multiple values, currently -// that's only iteration metrics (`iter_li_all`) and unaggregated HTTP -// requests (`http_req_li_all`). -// -//easyjson:json -type SampleDataMap struct { - Time int64 `json:"time,string"` - Type metrics.MetricType `json:"type"` - Tags json.RawMessage `json:"tags,omitempty"` - Values map[string]float64 `json:"values,omitempty"` -} - -// NewSampleFromTrail just creates a ready-to-send Sample instance -// directly from a httpext.Trail. -func NewSampleFromTrail(trail *httpext.Trail) *Sample { - length := 8 - if trail.Failed.Valid { - length++ - } - - values := make(map[string]float64, length) - values[metrics.HTTPReqsName] = 1 - values[metrics.HTTPReqDurationName] = metrics.D(trail.Duration) - values[metrics.HTTPReqBlockedName] = metrics.D(trail.Blocked) - values[metrics.HTTPReqConnectingName] = metrics.D(trail.Connecting) - values[metrics.HTTPReqTLSHandshakingName] = metrics.D(trail.TLSHandshaking) - values[metrics.HTTPReqSendingName] = metrics.D(trail.Sending) - values[metrics.HTTPReqWaitingName] = metrics.D(trail.Waiting) - values[metrics.HTTPReqReceivingName] = metrics.D(trail.Receiving) - if trail.Failed.Valid { // this is done so the adding of 1 map element doesn't reexpand the map as this is a hotpath - values[metrics.HTTPReqFailedName] = metrics.B(trail.Failed.Bool) - } - sdata := &SampleDataMap{ - Time: toMicroSecond(trail.GetTime()), - Values: values, - } - if !trail.Tags.IsEmpty() { - if tagsJSON, err := easyjson.Marshal(trail.Tags); err == nil { - sdata.Tags = tagsJSON - } - } - - return &Sample{ - Type: DataTypeMap, - Metric: "http_req_li_all", - Data: sdata, - } -} - -// SampleDataAggregatedHTTPReqs is used in aggregated samples for HTTP requests. -// -//easyjson:json -type SampleDataAggregatedHTTPReqs struct { - Time int64 `json:"time,string"` - Type string `json:"type"` - Count uint64 `json:"count"` - Tags json.RawMessage `json:"tags,omitempty"` - Values struct { - Duration AggregatedMetric `json:"http_req_duration"` - Blocked AggregatedMetric `json:"http_req_blocked"` - Connecting AggregatedMetric `json:"http_req_connecting"` - TLSHandshaking AggregatedMetric `json:"http_req_tls_handshaking"` - Sending AggregatedMetric `json:"http_req_sending"` - Waiting AggregatedMetric `json:"http_req_waiting"` - Receiving AggregatedMetric `json:"http_req_receiving"` - Failed AggregatedRate `json:"http_req_failed,omitempty"` - } `json:"values"` -} - -// Add updates all agregated values with the supplied trail data -func (sdagg *SampleDataAggregatedHTTPReqs) Add(trail *httpext.Trail) { - sdagg.Count++ - sdagg.Values.Duration.Add(trail.Duration) - sdagg.Values.Blocked.Add(trail.Blocked) - sdagg.Values.Connecting.Add(trail.Connecting) - sdagg.Values.TLSHandshaking.Add(trail.TLSHandshaking) - sdagg.Values.Sending.Add(trail.Sending) - sdagg.Values.Waiting.Add(trail.Waiting) - sdagg.Values.Receiving.Add(trail.Receiving) - if trail.Failed.Valid { - sdagg.Values.Failed.Add(trail.Failed.Bool) - } -} - -// CalcAverages calculates and sets all `Avg` properties in the `Values` struct -func (sdagg *SampleDataAggregatedHTTPReqs) CalcAverages() { - count := float64(sdagg.Count) - sdagg.Values.Duration.Calc(count) - sdagg.Values.Blocked.Calc(count) - sdagg.Values.Connecting.Calc(count) - sdagg.Values.TLSHandshaking.Calc(count) - sdagg.Values.Sending.Calc(count) - sdagg.Values.Waiting.Calc(count) - sdagg.Values.Receiving.Calc(count) -} - -// AggregatedRate is an aggregation of a Rate metric -type AggregatedRate struct { - Count float64 `json:"count"` - NzCount float64 `json:"nz_count"` -} - -// Add a boolean to the aggregated rate -func (ar *AggregatedRate) Add(b bool) { - ar.Count++ - if b { - ar.NzCount++ - } -} - -// IsDefined implements easyjson.Optional -func (ar AggregatedRate) IsDefined() bool { - return ar.Count != 0 -} - -// AggregatedMetric is used to store aggregated information for a -// particular metric in an SampleDataAggregatedMap. -type AggregatedMetric struct { - // Used by Add() to keep working state - minD time.Duration - maxD time.Duration - sumD time.Duration - // Updated by Calc() and used in the JSON output - Min float64 `json:"min"` - Max float64 `json:"max"` - Avg float64 `json:"avg"` -} - -// Add the new duration to the internal sum and update Min and Max if necessary -func (am *AggregatedMetric) Add(t time.Duration) { - if am.sumD == 0 || am.minD > t { - am.minD = t - } - if am.maxD < t { - am.maxD = t - } - am.sumD += t -} - -// Calc populates the float fields for min and max and calculates the average value -func (am *AggregatedMetric) Calc(count float64) { - am.Min = metrics.D(am.minD) - am.Max = metrics.D(am.maxD) - am.Avg = metrics.D(am.sumD) / count -} - -type aggregationBucket map[*metrics.TagSet][]*httpext.Trail - -type durations []time.Duration - -func (d durations) Len() int { return len(d) } -func (d durations) Swap(i, j int) { d[i], d[j] = d[j], d[i] } -func (d durations) Less(i, j int) bool { return d[i] < d[j] } - -// Used when there are fewer samples in the bucket (so we can interpolate) -// and for benchmark comparisons and verification of the quickselect -// algorithm (it should return exactly the same results if interpolation isn't used). -func (d durations) SortGetNormalBounds( - radius, iqrLowerCoef, iqrUpperCoef float64, interpolate bool, -) (min, max time.Duration) { - if len(d) == 0 { - return 0, 0 - } - sort.Sort(d) - last := float64(len(d) - 1) - - getValue := func(percentile float64) time.Duration { - i := percentile * last - // If interpolation is not enabled, we round the resulting slice position - // and return the value there. - if !interpolate { - return d[int(math.Round(i))] - } - - // Otherwise, we calculate (with linear interpolation) the value that - // should fall at that percentile, given the values above and below it. - floor := d[int(math.Floor(i))] - ceil := d[int(math.Ceil(i))] - posDiff := i - math.Floor(i) - return floor + time.Duration(float64(ceil-floor)*posDiff) - } - - // See https://en.wikipedia.org/wiki/Quartile#Outliers for details - radius = math.Min(0.5, radius) // guard against a radius greater than 50%, see AggregationOutlierIqrRadius - q1 := getValue(0.5 - radius) // get Q1, the (interpolated) value at a `radius` distance before the median - q3 := getValue(0.5 + radius) // get Q3, the (interpolated) value at a `radius` distance after the median - iqr := float64(q3 - q1) // calculate the interquartile range (IQR) - - min = q1 - time.Duration(iqrLowerCoef*iqr) // lower fence, anything below this is an outlier - max = q3 + time.Duration(iqrUpperCoef*iqr) // upper fence, anything above this is an outlier - return min, max -} - -// Reworked and translated in Go from: -// https://github.com/haifengl/smile/blob/master/math/src/main/java/smile/sort/QuickSelect.java -// Originally Copyright (c) 2010 Haifeng Li -// Licensed under the Apache License, Version 2.0 -// -// This could potentially be implemented as a standalone function -// that only depends on the sort.Interface methods, but that would -// probably introduce some performance overhead because of the -// dynamic dispatch. -func (d durations) quickSelect(k int) time.Duration { - n := len(d) - l := 0 - ir := n - 1 - - var i, j, mid int - for { - if ir <= l+1 { - if ir == l+1 && d[ir] < d[l] { - d.Swap(l, ir) - } - return d[k] - } - mid = (l + ir) >> 1 - d.Swap(mid, l+1) - if d[l] > d[ir] { - d.Swap(l, ir) - } - if d[l+1] > d[ir] { - d.Swap(l+1, ir) - } - if d[l] > d[l+1] { - d.Swap(l, l+1) - } - i = l + 1 - j = ir - for { - for i++; d[i] < d[l+1]; i++ { //nolint:revive - } - for j--; d[j] > d[l+1]; j-- { //nolint:revive - } - if j < i { - break - } - d.Swap(i, j) - } - d.Swap(l+1, j) - if j >= k { - ir = j - 1 - } - if j <= k { - l = i - } - } -} - -// Uses Quickselect to avoid sorting the whole slice -// https://en.wikipedia.org/wiki/Quickselect -func (d durations) SelectGetNormalBounds(radius, iqrLowerCoef, iqrUpperCoef float64) (min, max time.Duration) { - if len(d) == 0 { - return - } - radius = math.Min(0.5, radius) - last := float64(len(d) - 1) - - q1 := d.quickSelect(int(math.Round(last * (0.5 - radius)))) - q3 := d.quickSelect(int(math.Round(last * (0.5 + radius)))) - - iqr := float64(q3 - q1) - min = q1 - time.Duration(iqrLowerCoef*iqr) - max = q3 + time.Duration(iqrUpperCoef*iqr) - return -} - -//easyjson:json -type samples []*Sample diff --git a/output/cloud/v1/data_test.go b/output/cloud/v1/data_test.go deleted file mode 100644 index 6bdd43ab110..00000000000 --- a/output/cloud/v1/data_test.go +++ /dev/null @@ -1,335 +0,0 @@ -package cloud - -import ( - "encoding/json" - "fmt" - "math/rand" - "strconv" - "testing" - "time" - - "github.com/mailru/easyjson" - "github.com/stretchr/testify/assert" - "gopkg.in/guregu/null.v3" - - "go.k6.io/k6/lib/netext/httpext" - "go.k6.io/k6/metrics" -) - -func TestSampleMarshaling(t *testing.T) { - t.Parallel() - - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - now := time.Now() - exptoMicroSecond := now.UnixNano() / 1000 - - testCases := []struct { - s *Sample - json string - }{ - { - &Sample{ - Type: DataTypeSingle, - Metric: metrics.VUsName, - Data: &SampleDataSingle{ - Type: builtinMetrics.VUs.Type, - Time: toMicroSecond(now), - Tags: []byte(`{"aaa":"bbb","ccc":"123"}`), - Value: 999, - }, - }, - fmt.Sprintf(`{"type":"Point","metric":"vus","data":{"time":"%d","type":"gauge","tags":{"aaa":"bbb","ccc":"123"},"value":999}}`, exptoMicroSecond), - }, - { - &Sample{ - Type: DataTypeMap, - Metric: "iter_li_all", - Data: &SampleDataMap{ - Time: toMicroSecond(now), - Tags: []byte(`{"test":"mest"}`), - Values: map[string]float64{ - metrics.DataSentName: 1234.5, - metrics.DataReceivedName: 6789.1, - metrics.IterationDurationName: metrics.D(10 * time.Second), - }, - }, - }, - fmt.Sprintf(`{"type":"Points","metric":"iter_li_all","data":{"time":"%d","type":"counter","tags":{"test":"mest"},"values":{"data_received":6789.1,"data_sent":1234.5,"iteration_duration":10000}}}`, exptoMicroSecond), - }, - { - NewSampleFromTrail(&httpext.Trail{ - EndTime: now, - Duration: 123000, - Blocked: 1000, - Connecting: 2000, - TLSHandshaking: 3000, - Sending: 4000, - Waiting: 5000, - Receiving: 6000, - Tags: registry.RootTagSet(), - }), - fmt.Sprintf(`{"type":"Points","metric":"http_req_li_all","data":{"time":"%d","type":"counter","values":{"http_req_blocked":0.001,"http_req_connecting":0.002,"http_req_duration":0.123,"http_req_receiving":0.006,"http_req_sending":0.004,"http_req_tls_handshaking":0.003,"http_req_waiting":0.005,"http_reqs":1}}}`, exptoMicroSecond), - }, - { - NewSampleFromTrail(&httpext.Trail{ - EndTime: now, - Duration: 123000, - Blocked: 1000, - Connecting: 2000, - TLSHandshaking: 3000, - Sending: 4000, - Waiting: 5000, - Receiving: 6000, - Failed: null.NewBool(false, true), - Tags: registry.RootTagSet(), - }), - fmt.Sprintf(`{"type":"Points","metric":"http_req_li_all","data":{"time":"%d","type":"counter","values":{"http_req_blocked":0.001,"http_req_connecting":0.002,"http_req_duration":0.123,"http_req_failed":0,"http_req_receiving":0.006,"http_req_sending":0.004,"http_req_tls_handshaking":0.003,"http_req_waiting":0.005,"http_reqs":1}}}`, exptoMicroSecond), - }, - { - func() *Sample { - aggrData := &SampleDataAggregatedHTTPReqs{ - Time: exptoMicroSecond, - Type: "aggregated_trend", - Tags: []byte(`{"test":"mest"}`), - } - aggrData.Add( - &httpext.Trail{ - EndTime: now, - Duration: 123000, - Blocked: 1000, - Connecting: 2000, - TLSHandshaking: 3000, - Sending: 4000, - Waiting: 5000, - Receiving: 6000, - }, - ) - - aggrData.Add( - &httpext.Trail{ - EndTime: now, - Duration: 13000, - Blocked: 3000, - Connecting: 1000, - TLSHandshaking: 4000, - Sending: 5000, - Waiting: 8000, - Receiving: 8000, - }, - ) - aggrData.CalcAverages() - - return &Sample{ - Type: DataTypeAggregatedHTTPReqs, - Metric: "http_req_li_all", - Data: aggrData, - } - }(), - fmt.Sprintf(`{"type":"AggregatedPoints","metric":"http_req_li_all","data":{"time":"%d","type":"aggregated_trend","count":2,"tags":{"test":"mest"},"values":{"http_req_duration":{"min":0.013,"max":0.123,"avg":0.068},"http_req_blocked":{"min":0.001,"max":0.003,"avg":0.002},"http_req_connecting":{"min":0.001,"max":0.002,"avg":0.0015},"http_req_tls_handshaking":{"min":0.003,"max":0.004,"avg":0.0035},"http_req_sending":{"min":0.004,"max":0.005,"avg":0.0045},"http_req_waiting":{"min":0.005,"max":0.008,"avg":0.0065},"http_req_receiving":{"min":0.006,"max":0.008,"avg":0.007}}}}`, exptoMicroSecond), - }, - { - func() *Sample { - aggrData := &SampleDataAggregatedHTTPReqs{ - Time: exptoMicroSecond, - Type: "aggregated_trend", - Tags: []byte(`{"test": "mest"}`), - } - aggrData.Add( - &httpext.Trail{ - EndTime: now, - Duration: 123000, - Blocked: 1000, - Connecting: 2000, - TLSHandshaking: 3000, - Sending: 4000, - Waiting: 5000, - Receiving: 6000, - Failed: null.BoolFrom(false), - }, - ) - - aggrData.Add( - &httpext.Trail{ - EndTime: now, - Duration: 13000, - Blocked: 3000, - Connecting: 1000, - TLSHandshaking: 4000, - Sending: 5000, - Waiting: 8000, - Receiving: 8000, - }, - ) - aggrData.CalcAverages() - - return &Sample{ - Type: DataTypeAggregatedHTTPReqs, - Metric: "http_req_li_all", - Data: aggrData, - } - }(), - fmt.Sprintf(`{"type":"AggregatedPoints","metric":"http_req_li_all","data":{"time":"%d","type":"aggregated_trend","count":2,"tags":{"test":"mest"},"values":{"http_req_duration":{"min":0.013,"max":0.123,"avg":0.068},"http_req_blocked":{"min":0.001,"max":0.003,"avg":0.002},"http_req_connecting":{"min":0.001,"max":0.002,"avg":0.0015},"http_req_tls_handshaking":{"min":0.003,"max":0.004,"avg":0.0035},"http_req_sending":{"min":0.004,"max":0.005,"avg":0.0045},"http_req_waiting":{"min":0.005,"max":0.008,"avg":0.0065},"http_req_receiving":{"min":0.006,"max":0.008,"avg":0.007},"http_req_failed":{"count":1,"nz_count":0}}}}`, exptoMicroSecond), - }, - } - - for i, tc := range testCases { - sJSON, err := easyjson.Marshal(tc.s) - if !assert.NoError(t, err) { - continue - } - t.Logf(string(sJSON)) - assert.JSONEq(t, tc.json, string(sJSON), "testcase"+strconv.Itoa(i)) - - var newS Sample - assert.NoError(t, json.Unmarshal(sJSON, &newS)) - assert.Equal(t, tc.s.Type, newS.Type) - assert.Equal(t, tc.s.Metric, newS.Metric) - assert.IsType(t, tc.s.Data, newS.Data) - // Cannot directly compare tc.s.Data and newS.Data (because of internal time.Time and SampleTags fields) - newJSON, err := easyjson.Marshal(newS) - assert.NoError(t, err) - assert.JSONEq(t, string(sJSON), string(newJSON)) - } -} - -func TestMetricAggregation(t *testing.T) { - t.Parallel() - m := AggregatedMetric{} - m.Add(1 * time.Second) - m.Add(1 * time.Second) - m.Add(3 * time.Second) - m.Add(5 * time.Second) - m.Add(10 * time.Second) - m.Calc(5) - assert.Equal(t, m.Min, metrics.D(1*time.Second)) - assert.Equal(t, m.Max, metrics.D(10*time.Second)) - assert.Equal(t, m.Avg, metrics.D(4*time.Second)) -} - -// For more realistic request time distributions, import -// "gonum.org/v1/gonum/stat/distuv" and use something like this: -// -// randSrc := rand.NewSource(uint64(time.Now().UnixNano())) -// dist := distuv.LogNormal{Mu: 0, Sigma: 0.5, Src: randSrc} -// -// then set the data elements to time.Duration(dist.Rand() * multiplier) -// -// I've not used that after the initial tests because it's a big -// external dependency that's not really needed for the tests at -// this point. -func getDurations(r *rand.Rand, count int, min, multiplier float64) durations { - data := make(durations, count) - for j := 0; j < count; j++ { - data[j] = time.Duration(min + r.Float64()*multiplier) - } - return data -} - -func BenchmarkDurationBounds(b *testing.B) { - iqrRadius := 0.25 // If it's something different, the Q in IQR won't make much sense... - iqrLowerCoef := 1.5 - iqrUpperCoef := 1.5 - - seed := time.Now().UnixNano() - r := rand.New(rand.NewSource(seed)) //nolint:gosec - b.Logf("Random source seeded with %d\n", seed) - - getData := func(b *testing.B, count int) durations { - b.StopTimer() - defer b.StartTimer() - return getDurations(r, count, 0.1*float64(time.Second), float64(time.Second)) - } - - for count := 100; count <= 5000; count += 500 { - count := count - b.Run(fmt.Sprintf("Sort-no-interp-%d-elements", count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - data := getData(b, count) - data.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, false) - } - }) - b.Run(fmt.Sprintf("Sort-with-interp-%d-elements", count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - data := getData(b, count) - data.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) - } - }) - b.Run(fmt.Sprintf("Select-%d-elements", count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - data := getData(b, count) - data.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) - } - }) - } -} - -//nolint:paralleltest -func TestQuickSelectAndBounds(t *testing.T) { - seed := time.Now().UnixNano() - r := rand.New(rand.NewSource(seed)) //nolint:gosec - t.Logf("Random source seeded with %d\n", seed) - - mult := time.Millisecond - for _, count := range []int{1, 2, 3, 4, 5, 10, 15, 20, 25, 50, 100, 250 + r.Intn(100)} { - count := count - t.Run(fmt.Sprintf("simple-%d", count), func(t *testing.T) { - data := make(durations, count) - for i := 0; i < count; i++ { - data[i] = time.Duration(i) * mult - } - rand.Shuffle(len(data), data.Swap) - for i := 0; i < 10; i++ { - dataCopy := make(durations, count) - assert.Equal(t, count, copy(dataCopy, data)) - k := r.Intn(count) - assert.Equal(t, dataCopy.quickSelect(k), time.Duration(k)*mult) - } - }) - t.Run(fmt.Sprintf("random-%d", count), func(t *testing.T) { - testCases := []struct{ r, l, u float64 }{ - {0.25, 1.5, 1.5}, // Textbook - {0.25, 1.5, 1.3}, // Defaults - {0.1, 0.5, 0.3}, // Extreme narrow - {0.3, 2, 1.8}, // Extreme wide - } - - for tcNum, tc := range testCases { - tc := tc - data := getDurations(r, count, 0.3*float64(time.Second), 2*float64(time.Second)) - dataForSort := make(durations, count) - dataForSelect := make(durations, count) - assert.Equal(t, count, copy(dataForSort, data)) - assert.Equal(t, count, copy(dataForSelect, data)) - assert.Equal(t, dataForSort, dataForSelect) - - t.Run(fmt.Sprintf("bounds-tc%d", tcNum), func(t *testing.T) { - sortMin, sortMax := dataForSort.SortGetNormalBounds(tc.r, tc.l, tc.u, false) - selectMin, selectMax := dataForSelect.SelectGetNormalBounds(tc.r, tc.l, tc.u) - assert.Equal(t, sortMin, selectMin) - assert.Equal(t, sortMax, selectMax) - - k := r.Intn(count) - assert.Equal(t, dataForSort[k], dataForSelect.quickSelect(k)) - assert.Equal(t, dataForSort[k], data.quickSelect(k)) - }) - } - }) - } -} - -func TestSortInterpolation(t *testing.T) { - t.Parallel() - - // Super contrived example to make the checks easy - 11 values from 0 to 10 seconds inclusive - count := 11 - data := make(durations, count) - for i := 0; i < count; i++ { - data[i] = time.Duration(i) * time.Second - } - - min, max := data.SortGetNormalBounds(0.25, 1, 1, true) - // Expected values: Q1=2.5, Q3=7.5, IQR=5, so with 1 for coefficients we can expect min=-2,5, max=12.5 seconds - assert.Equal(t, min, -2500*time.Millisecond) - assert.Equal(t, max, 12500*time.Millisecond) -} diff --git a/output/cloud/v1/metrics_client.go b/output/cloud/v1/metrics_client.go deleted file mode 100644 index 5a036b3d4ce..00000000000 --- a/output/cloud/v1/metrics_client.go +++ /dev/null @@ -1,116 +0,0 @@ -package cloud - -import ( - "bytes" - "compress/gzip" - "context" - "errors" - "fmt" - "io" - "net/http" - "strconv" - "sync" - "time" - - easyjson "github.com/mailru/easyjson" - "github.com/sirupsen/logrus" - - "go.k6.io/k6/cloudapi" -) - -// MetricsClient is a wrapper around the cloudapi.Client that is also capable of pushing -type MetricsClient struct { - *cloudapi.Client - logger logrus.FieldLogger - host string - noCompress bool - - pushBufferPool sync.Pool -} - -// NewMetricsClient creates and initializes a new MetricsClient. -func NewMetricsClient(client *cloudapi.Client, logger logrus.FieldLogger, host string, noCompress bool) *MetricsClient { - return &MetricsClient{ - Client: client, - logger: logger, - host: host, - noCompress: noCompress, - pushBufferPool: sync.Pool{ - New: func() interface{} { - return &bytes.Buffer{} - }, - }, - } -} - -// PushMetric pushes the provided metric samples for the given referenceID -func (mc *MetricsClient) PushMetric(referenceID string, s []*Sample) error { - start := time.Now() - url := fmt.Sprintf("%s/v1/metrics/%s", mc.host, referenceID) - - jsonStart := time.Now() - b, err := easyjson.Marshal(samples(s)) - if err != nil { - return err - } - jsonTime := time.Since(jsonStart) - - // TODO: change the context, maybe to one with a timeout - req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, nil) - if err != nil { - return err - } - - req.Header.Set("X-Payload-Sample-Count", strconv.Itoa(len(s))) - var additionalFields logrus.Fields - - if !mc.noCompress { - buf, ok := mc.pushBufferPool.Get().(*bytes.Buffer) - if !ok { - return errors.New("failed to convert a buffer pool item " + - "into the expected type bytes Buffer for gzip compression operation") - } - buf.Reset() - defer mc.pushBufferPool.Put(buf) - unzippedSize := len(b) - buf.Grow(unzippedSize / expectedGzipRatio) - gzipStart := time.Now() - { - g, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed) - if _, err = g.Write(b); err != nil { - return err - } - if err = g.Close(); err != nil { - return err - } - } - gzipTime := time.Since(gzipStart) - - req.Header.Set("Content-Encoding", "gzip") - req.Header.Set("X-Payload-Byte-Count", strconv.Itoa(unzippedSize)) - - additionalFields = logrus.Fields{ - "unzipped_size": unzippedSize, - "gzip_t": gzipTime, - "content_length": buf.Len(), - } - - b = buf.Bytes() - } - - req.Header.Set("Content-Length", strconv.Itoa(len(b))) - req.Body = io.NopCloser(bytes.NewReader(b)) - req.GetBody = func() (io.ReadCloser, error) { - return io.NopCloser(bytes.NewReader(b)), nil - } - - err = mc.Client.Do(req, nil) - - mc.logger.WithFields(logrus.Fields{ - "t": time.Since(start), - "json_t": jsonTime, - "part_size": len(s), - }).WithFields(additionalFields).Debug("Pushed part to cloud") - - return err -} diff --git a/output/cloud/v1/output.go b/output/cloud/v1/output.go deleted file mode 100644 index 3ef15c17734..00000000000 --- a/output/cloud/v1/output.go +++ /dev/null @@ -1,555 +0,0 @@ -// Package cloud implements an Output that flushes to the k6 Cloud platform -// using the version1 of the protocol flushing a json-based payload. -package cloud - -import ( - "context" - "net/http" - "strconv" - "sync" - "time" - - "github.com/mailru/easyjson" - "github.com/sirupsen/logrus" - - "go.k6.io/k6/cloudapi" - "go.k6.io/k6/cloudapi/insights" - "go.k6.io/k6/errext" - "go.k6.io/k6/errext/exitcodes" - insightsOutput "go.k6.io/k6/output/cloud/insights" - - "go.k6.io/k6/lib/netext" - "go.k6.io/k6/lib/netext/httpext" - "go.k6.io/k6/metrics" -) - -// Output sends result data to the k6 Cloud service. -type Output struct { - logger logrus.FieldLogger - config cloudapi.Config - - // referenceID is the legacy name used by the Backend for the test run id. - // Note: This output's version is almost deprecated so we don't apply - // the renaming to its internals. - referenceID string - client *MetricsClient - - bufferMutex sync.Mutex - bufferHTTPTrails []*httpext.Trail - bufferSamples []*Sample - - insightsClient insightsOutput.Client - requestMetadatasCollector insightsOutput.RequestMetadatasCollector - requestMetadatasFlusher insightsOutput.RequestMetadatasFlusher - - // TODO: optimize this - // - // Since the real-time metrics refactoring (https://github.com/k6io/k6/pull/678), - // we should no longer have to handle metrics that have times long in the past. So instead of a - // map, we can probably use a simple slice (or even an array!) as a ring buffer to store the - // aggregation buckets. This should save us a some time, since it would make the lookups and WaitPeriod - // checks basically O(1). And even if for some reason there are occasional metrics with past times that - // don't fit in the chosen ring buffer size, we could just send them along to the buffer unaggregated - aggrBuckets map[int64]aggregationBucket - - stopSendingMetrics chan struct{} - stopAggregation chan struct{} - aggregationDone *sync.WaitGroup - stopOutput chan struct{} - outputDone *sync.WaitGroup - testStopFunc func(error) -} - -// New creates a new Cloud output version 1. -func New(logger logrus.FieldLogger, conf cloudapi.Config, testAPIClient *cloudapi.Client) (*Output, error) { - return &Output{ - config: conf, - client: NewMetricsClient(testAPIClient, logger, conf.Host.String, conf.NoCompress.Bool), - aggrBuckets: map[int64]aggregationBucket{}, - logger: logger, - - stopSendingMetrics: make(chan struct{}), - stopAggregation: make(chan struct{}), - aggregationDone: &sync.WaitGroup{}, - stopOutput: make(chan struct{}), - outputDone: &sync.WaitGroup{}, - }, nil -} - -// SetTestRunID sets the passed test run id. -func (out *Output) SetTestRunID(id string) { - out.referenceID = id -} - -// Start starts the Output, it starts the background goroutines -// for aggregating and flushing the collected metrics samples. -func (out *Output) Start() error { - aggregationPeriod := out.config.AggregationPeriod.TimeDuration() - // If enabled, start periodically aggregating the collected HTTP trails - if aggregationPeriod > 0 { - out.aggregationDone.Add(1) - go func() { - defer out.aggregationDone.Done() - aggregationWaitPeriod := out.config.AggregationWaitPeriod.TimeDuration() - aggregationTicker := time.NewTicker(aggregationPeriod) - defer aggregationTicker.Stop() - - for { - select { - case <-out.stopSendingMetrics: - return - case <-aggregationTicker.C: - out.aggregateHTTPTrails(aggregationWaitPeriod) - case <-out.stopAggregation: - out.aggregateHTTPTrails(0) - out.flushHTTPTrails() - return - } - } - }() - } - - if insightsOutput.Enabled(out.config) { - testRunID, err := strconv.ParseInt(out.referenceID, 10, 64) - if err != nil { - return err - } - out.requestMetadatasCollector = insightsOutput.NewCollector(testRunID) - - insightsClientConfig := insights.NewDefaultClientConfigForTestRun( - out.config.TracesHost.String, - out.config.Token.String, - testRunID, - ) - insightsClient := insights.NewClient(insightsClientConfig) - - if err := insightsClient.Dial(context.Background()); err != nil { - return err - } - - out.insightsClient = insightsClient - out.requestMetadatasFlusher = insightsOutput.NewFlusher(insightsClient, out.requestMetadatasCollector) - out.runFlushRequestMetadatas() - } - - out.outputDone.Add(1) - go func() { - defer out.outputDone.Done() - pushTicker := time.NewTicker(out.config.MetricPushInterval.TimeDuration()) - defer pushTicker.Stop() - for { - select { - case <-out.stopSendingMetrics: - return - default: - } - select { - case <-out.stopOutput: - out.pushMetrics() - return - case <-pushTicker.C: - out.pushMetrics() - } - } - }() - - return nil -} - -// StopWithTestError gracefully stops all metric emission from the output: when -// all metric samples are emitted, it makes a cloud API call to finish the test -// run. If testErr was specified, it extracts the RunStatus from it. -func (out *Output) StopWithTestError(_ error) error { - out.logger.Debug("Stopping the cloud output...") - close(out.stopAggregation) - out.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation - out.logger.Debug("Aggregation stopped, stopping metric emission...") - close(out.stopOutput) - out.outputDone.Wait() - out.logger.Debug("Metric emission stopped, calling cloud API...") - if insightsOutput.Enabled(out.config) { - if err := out.insightsClient.Close(); err != nil { - out.logger.WithError(err).Error("Failed to close the insights client") - } - } - return nil -} - -// SetTestRunStopCallback receives the function that stops the engine on error -func (out *Output) SetTestRunStopCallback(stopFunc func(error)) { - out.testStopFunc = stopFunc -} - -// AddMetricSamples receives a set of metric samples. This method is never -// called concurrently, so it defers as much of the work as possible to the -// asynchronous goroutines initialized in Start(). -func (out *Output) AddMetricSamples(sampleContainers []metrics.SampleContainer) { - select { - case <-out.stopSendingMetrics: - return - default: - } - - if out.referenceID == "" { - return - } - - newSamples := []*Sample{} - newHTTPTrails := []*httpext.Trail{} - - for _, sampleContainer := range sampleContainers { - switch sc := sampleContainer.(type) { - case *httpext.Trail: - // Check if aggregation is enabled, - if out.config.AggregationPeriod.Duration > 0 { - newHTTPTrails = append(newHTTPTrails, sc) - } else { - newSamples = append(newSamples, NewSampleFromTrail(sc)) - } - case *netext.NetTrail: - // TODO: aggregate? - values := map[string]float64{ - metrics.DataSentName: float64(sc.BytesWritten), - metrics.DataReceivedName: float64(sc.BytesRead), - } - - if sc.FullIteration { - values[metrics.IterationDurationName] = metrics.D(sc.EndTime.Sub(sc.StartTime)) - values[metrics.IterationsName] = 1 - } - - encodedTags, err := easyjson.Marshal(sc.GetTags()) - if err != nil { - out.logger.WithError(err).Error("Encoding tags failed") - } - newSamples = append(newSamples, &Sample{ - Type: DataTypeMap, - Metric: "iter_li_all", - Data: &SampleDataMap{ - Time: toMicroSecond(sc.GetTime()), - Tags: encodedTags, - Values: values, - }, - }) - default: - for _, sample := range sampleContainer.GetSamples() { - encodedTags, err := easyjson.Marshal(sample.Tags) - if err != nil { - out.logger.WithError(err).Error("Encoding tags failed") - } - - newSamples = append(newSamples, &Sample{ - Type: DataTypeSingle, - Metric: sample.Metric.Name, - Data: &SampleDataSingle{ - Type: sample.Metric.Type, - Time: toMicroSecond(sample.Time), - Tags: encodedTags, - Value: sample.Value, - }, - }) - } - } - } - - if len(newSamples) > 0 || len(newHTTPTrails) > 0 { - out.bufferMutex.Lock() - out.bufferSamples = append(out.bufferSamples, newSamples...) - out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...) - out.bufferMutex.Unlock() - } - - if insightsOutput.Enabled(out.config) { - out.requestMetadatasCollector.CollectRequestMetadatas(sampleContainers) - } -} - -//nolint:funlen,nestif,gocognit -func (out *Output) aggregateHTTPTrails(waitPeriod time.Duration) { - out.bufferMutex.Lock() - newHTTPTrails := out.bufferHTTPTrails - out.bufferHTTPTrails = nil - out.bufferMutex.Unlock() - - aggrPeriod := int64(out.config.AggregationPeriod.Duration) - - // Distribute all newly buffered HTTP trails into buckets and sub-buckets - for _, trail := range newHTTPTrails { - bucketID := trail.GetTime().UnixNano() / aggrPeriod - - // Get or create a time bucket for that trail period - bucket, ok := out.aggrBuckets[bucketID] - if !ok { - bucket = aggregationBucket{} - out.aggrBuckets[bucketID] = bucket - } - - subBucket, ok := bucket[trail.Tags] - if !ok { - subBucket = make([]*httpext.Trail, 0, 100) - } - bucket[trail.Tags] = append(subBucket, trail) - } - - // Which buckets are still new and we'll wait for trails to accumulate before aggregating - bucketCutoffID := time.Now().Add(-waitPeriod).UnixNano() / aggrPeriod - iqrRadius := out.config.AggregationOutlierIqrRadius.Float64 - iqrLowerCoef := out.config.AggregationOutlierIqrCoefLower.Float64 - iqrUpperCoef := out.config.AggregationOutlierIqrCoefUpper.Float64 - newSamples := []*Sample{} - - // Handle all aggregation buckets older than bucketCutoffID - for bucketID, subBucket := range out.aggrBuckets { - if bucketID > bucketCutoffID { - continue - } - - for tags, httpTrails := range subBucket { - // start := time.Now() // this is in a combination with the log at the end - trailCount := int64(len(httpTrails)) - if trailCount < out.config.AggregationMinSamples.Int64 { - for _, trail := range httpTrails { - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } - continue - } - encodedTags, err := easyjson.Marshal(tags) - if err != nil { - out.logger.WithError(err).Error("Encoding tags failed") - } - - aggrData := &SampleDataAggregatedHTTPReqs{ - Time: toMicroSecond(time.Unix(0, bucketID*aggrPeriod+aggrPeriod/2)), - Type: "aggregated_trend", - Tags: encodedTags, - } - - if out.config.AggregationSkipOutlierDetection.Bool { - // Simply add up all HTTP trails, no outlier detection - for _, trail := range httpTrails { - aggrData.Add(trail) - } - } else { - connDurations := make(durations, trailCount) - reqDurations := make(durations, trailCount) - for i, trail := range httpTrails { - connDurations[i] = trail.ConnDuration - reqDurations[i] = trail.Duration - } - - var minConnDur, maxConnDur, minReqDur, maxReqDur time.Duration - if trailCount < out.config.AggregationOutlierAlgoThreshold.Int64 { - // Since there are fewer samples, we'll use the interpolation-enabled and - // more precise sorting-based algorithm - minConnDur, maxConnDur = connDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) - minReqDur, maxReqDur = reqDurations.SortGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef, true) - } else { - minConnDur, maxConnDur = connDurations.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) - minReqDur, maxReqDur = reqDurations.SelectGetNormalBounds(iqrRadius, iqrLowerCoef, iqrUpperCoef) - } - - for _, trail := range httpTrails { - if trail.ConnDuration < minConnDur || - trail.ConnDuration > maxConnDur || - trail.Duration < minReqDur || - trail.Duration > maxReqDur { - // Seems like an outlier, add it as a standalone metric - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } else { - // Aggregate the trail - aggrData.Add(trail) - } - } - } - - aggrData.CalcAverages() - - if aggrData.Count > 0 { - newSamples = append(newSamples, &Sample{ - Type: DataTypeAggregatedHTTPReqs, - Metric: "http_req_li_all", - Data: aggrData, - }) - } - } - delete(out.aggrBuckets, bucketID) - } - - if len(newSamples) > 0 { - out.bufferMutex.Lock() - out.bufferSamples = append(out.bufferSamples, newSamples...) - out.bufferMutex.Unlock() - } -} - -func (out *Output) flushHTTPTrails() { - out.bufferMutex.Lock() - defer out.bufferMutex.Unlock() - - newSamples := []*Sample{} - for _, trail := range out.bufferHTTPTrails { - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } - for _, bucket := range out.aggrBuckets { - for _, subBucket := range bucket { - for _, trail := range subBucket { - newSamples = append(newSamples, NewSampleFromTrail(trail)) - } - } - } - - out.bufferHTTPTrails = nil - out.aggrBuckets = map[int64]aggregationBucket{} - out.bufferSamples = append(out.bufferSamples, newSamples...) -} - -// shouldStopSendingMetrics returns true if the output should interrupt the metric flush. -// -// note: The actual test execution should continues, -// since for local k6 run tests the end-of-test summary (or any other outputs) will still work, -// but the cloud output doesn't send any more metrics. -// Instead, if cloudapi.Config.StopOnError is enabled -// the cloud output should stop the whole test run too. -// This logic should be handled by the caller. -func (out *Output) shouldStopSendingMetrics(err error) bool { - if err == nil { - return false - } - if errResp, ok := err.(cloudapi.ResponseError); ok && errResp.Response != nil { //nolint:errorlint - // The Cloud service returns the error code 4 when it doesn't accept any more metrics. - // So, when k6 sees that, the cloud output just stops prematurely. - return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 - } - - return false -} - -type pushJob struct { - done chan error - samples []*Sample -} - -// ceil(a/b) -func ceilDiv(a, b int) int { - r := a / b - if a%b != 0 { - r++ - } - return r -} - -func (out *Output) pushMetrics() { - out.bufferMutex.Lock() - if len(out.bufferSamples) == 0 { - out.bufferMutex.Unlock() - return - } - buffer := out.bufferSamples - out.bufferSamples = nil - out.bufferMutex.Unlock() - - count := len(buffer) - out.logger.WithFields(logrus.Fields{ - "samples": count, - }).Debug("Pushing metrics to cloud") - start := time.Now() - - numberOfPackages := ceilDiv(len(buffer), int(out.config.MaxMetricSamplesPerPackage.Int64)) //nolint:staticcheck - numberOfWorkers := int(out.config.MetricPushConcurrency.Int64) - if numberOfWorkers > numberOfPackages { - numberOfWorkers = numberOfPackages - } - - ch := make(chan pushJob, numberOfPackages) - for i := 0; i < numberOfWorkers; i++ { - go func() { - for job := range ch { - err := out.client.PushMetric(out.referenceID, job.samples) - job.done <- err - if out.shouldStopSendingMetrics(err) { - return - } - } - }() - } - - jobs := make([]pushJob, 0, numberOfPackages) - - for len(buffer) > 0 { - size := len(buffer) - if size > int(out.config.MaxMetricSamplesPerPackage.Int64) { //nolint:staticcheck - size = int(out.config.MaxMetricSamplesPerPackage.Int64) //nolint:staticcheck - } - job := pushJob{done: make(chan error, 1), samples: buffer[:size]} - ch <- job - jobs = append(jobs, job) - buffer = buffer[size:] - } - - close(ch) - - for _, job := range jobs { - err := <-job.done - if err != nil { - if out.shouldStopSendingMetrics(err) { - out.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error") - serr := errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), - errext.AbortedByOutput, - ) - if out.config.StopOnError.Bool { - out.testStopFunc(serr) - } - close(out.stopSendingMetrics) - break - } - out.logger.WithError(err).Warn("Failed to send metrics to cloud") - } - } - out.logger.WithFields(logrus.Fields{ - "samples": count, - "t": time.Since(start), - }).Debug("Pushing metrics to cloud finished") -} - -func (out *Output) runFlushRequestMetadatas() { - t := time.NewTicker(out.config.TracesPushInterval.TimeDuration()) - - for i := int64(0); i < out.config.TracesPushConcurrency.Int64; i++ { - out.outputDone.Add(1) - go func() { - defer out.outputDone.Done() - defer t.Stop() - - for { - select { - case <-out.stopSendingMetrics: - return - default: - } - select { - case <-out.stopOutput: - out.flushRequestMetadatas() - return - case <-t.C: - out.flushRequestMetadatas() - } - } - }() - } -} - -func (out *Output) flushRequestMetadatas() { - start := time.Now() - - err := out.requestMetadatasFlusher.Flush() - if err != nil { - out.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud") - - return - } - - out.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud") -} - -const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate diff --git a/output/cloud/v1/output_test.go b/output/cloud/v1/output_test.go deleted file mode 100644 index 5c85ed41e45..00000000000 --- a/output/cloud/v1/output_test.go +++ /dev/null @@ -1,727 +0,0 @@ -package cloud - -import ( - "bytes" - "compress/gzip" - "encoding/json" - "fmt" - "io" - "math/rand" - "net/http" - "net/http/httptest" - "net/url" - "os" - "sort" - "strconv" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" - - "go.k6.io/k6/cloudapi" - "go.k6.io/k6/lib" - "go.k6.io/k6/lib/consts" - "go.k6.io/k6/lib/netext" - "go.k6.io/k6/lib/netext/httpext" - "go.k6.io/k6/lib/testutils" - "go.k6.io/k6/lib/testutils/httpmultibin" - "go.k6.io/k6/lib/types" - "go.k6.io/k6/metrics" - "go.k6.io/k6/output" -) - -func tagEqual(expected, got json.RawMessage) bool { - var expectedMap, gotMap map[string]string - err := json.Unmarshal(expected, &expectedMap) - if err != nil { - panic("tagEqual: " + err.Error()) - } - - err = json.Unmarshal(got, &gotMap) - if err != nil { - panic("tagEqual: " + err.Error()) - } - - if len(expectedMap) != len(gotMap) { - return false - } - - for k, v := range gotMap { - if k == "url" { - if expectedMap["name"] != v { - return false - } - } else if expectedMap[k] != v { - return false - } - } - return true -} - -func getSampleChecker(t *testing.T, expSamples <-chan []Sample) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - - expSamples := <-expSamples - require.Len(t, receivedSamples, len(expSamples)) - - for i, expSample := range expSamples { - receivedSample := receivedSamples[i] - assert.Equal(t, expSample.Metric, receivedSample.Metric) - assert.Equal(t, expSample.Type, receivedSample.Type) - - if callbackCheck, ok := expSample.Data.(func(interface{})); ok { - callbackCheck(receivedSample.Data) - continue - } - - require.IsType(t, expSample.Data, receivedSample.Data) - - switch expData := expSample.Data.(type) { - case *SampleDataSingle: - receivedData, ok := receivedSample.Data.(*SampleDataSingle) - assert.True(t, ok) - assert.JSONEq(t, string(expData.Tags), string(receivedData.Tags)) - assert.Equal(t, expData.Time, receivedData.Time) - assert.Equal(t, expData.Type, receivedData.Type) - assert.Equal(t, expData.Value, receivedData.Value) - case *SampleDataMap: - receivedData, ok := receivedSample.Data.(*SampleDataMap) - assert.True(t, ok) - assert.True(t, tagEqual(expData.Tags, receivedData.Tags)) - assert.Equal(t, expData.Time, receivedData.Time) - assert.Equal(t, expData.Type, receivedData.Type) - assert.Equal(t, expData.Values, receivedData.Values) - case *SampleDataAggregatedHTTPReqs: - receivedData, ok := receivedSample.Data.(*SampleDataAggregatedHTTPReqs) - assert.True(t, ok) - assert.JSONEq(t, string(expData.Tags), string(receivedData.Tags)) - assert.Equal(t, expData.Time, receivedData.Time) - assert.Equal(t, expData.Type, receivedData.Type) - assert.Equal(t, expData.Values, receivedData.Values) - default: - t.Errorf("Unknown data type %#v", expData) - } - } - } -} - -func skewTrail(r *rand.Rand, t httpext.Trail, minCoef, maxCoef float64) httpext.Trail { - coef := minCoef + r.Float64()*(maxCoef-minCoef) - addJitter := func(d *time.Duration) { - *d = time.Duration(float64(*d) * coef) - } - addJitter(&t.Blocked) - addJitter(&t.Connecting) - addJitter(&t.TLSHandshaking) - addJitter(&t.Sending) - addJitter(&t.Waiting) - addJitter(&t.Receiving) - t.ConnDuration = t.Connecting + t.TLSHandshaking - t.Duration = t.Sending + t.Waiting + t.Receiving - return t -} - -func TestCloudOutput(t *testing.T) { - t.Parallel() - - getTestRunner := func(minSamples int) func(t *testing.T) { - return func(t *testing.T) { - t.Parallel() - runCloudOutputTestCase(t, minSamples) - } - } - - for tcNum, minSamples := range []int{60, 75, 100} { - tcNum, minSamples := tcNum, minSamples - t.Run(fmt.Sprintf("tc%d_minSamples%d", tcNum, minSamples), func(t *testing.T) { - t.Parallel() - getTestRunner(minSamples) - }) - } -} - -func runCloudOutputTestCase(t *testing.T, minSamples int) { - seed := time.Now().UnixNano() - r := rand.New(rand.NewSource(seed)) //nolint:gosec - t.Logf("Random source seeded with %d\n", seed) - - tb := httpmultibin.NewHTTPMultiBin(t) - registry := metrics.NewRegistry() - - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - Environment: map[string]string{ - "K6_CLOUD_PUSH_REF_ID": "123", - "K6_CLOUD_METRIC_PUSH_INTERVAL": "10ms", - "K6_CLOUD_AGGREGATION_PERIOD": "30ms", - "K6_CLOUD_AGGREGATION_CALC_INTERVAL": "40ms", - "K6_CLOUD_AGGREGATION_WAIT_PERIOD": "5ms", - "K6_CLOUD_AGGREGATION_MIN_SAMPLES": strconv.Itoa(minSamples), - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(t, err) - - out.SetTestRunID("123") - require.NoError(t, out.Start()) - - now := time.Now() - tagMap := map[string]string{"test": "mest", "a": "b", "name": "name", "url": "name"} - tags := registry.RootTagSet().WithTagsFromMap(tagMap) - - expSamples := make(chan []Sample) - defer close(expSamples) - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) - - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - - enctags, err := json.Marshal(tags) - require.NoError(t, err) - expSamples <- []Sample{{ - Type: DataTypeSingle, - Metric: metrics.VUsName, - Data: &SampleDataSingle{ - Type: builtinMetrics.VUs.Type, - Time: toMicroSecond(now), - Tags: enctags, - Value: 1.0, - }, - }} - - simpleTrail := httpext.Trail{ - Blocked: 100 * time.Millisecond, - Connecting: 200 * time.Millisecond, - TLSHandshaking: 300 * time.Millisecond, - Sending: 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - - EndTime: now, - ConnDuration: 500 * time.Millisecond, - Duration: 1500 * time.Millisecond, - Tags: tags, - } - out.AddMetricSamples([]metrics.SampleContainer{&simpleTrail}) - expSamples <- []Sample{*NewSampleFromTrail(&simpleTrail)} - - smallSkew := 0.02 - - trails := []metrics.SampleContainer{} - durations := make([]time.Duration, 0, len(trails)) - for i := int64(0); i < out.config.AggregationMinSamples.Int64; i++ { - similarTrail := skewTrail(r, simpleTrail, 1.0, 1.0+smallSkew) - trails = append(trails, &similarTrail) - durations = append(durations, similarTrail.Duration) - } - sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) - t.Logf("Sorted durations: %#v", durations) // Useful to debug any failures, doesn't get in the way otherwise - - checkAggrMetric := func(normal time.Duration, aggr AggregatedMetric) { - assert.True(t, aggr.Min <= aggr.Avg) - assert.True(t, aggr.Avg <= aggr.Max) - assert.InEpsilon(t, normal, metrics.ToD(aggr.Min), smallSkew) - assert.InEpsilon(t, normal, metrics.ToD(aggr.Avg), smallSkew) - assert.InEpsilon(t, normal, metrics.ToD(aggr.Max), smallSkew) - } - - outlierTrail := skewTrail(r, simpleTrail, 2.0+smallSkew, 3.0+smallSkew) - trails = append(trails, &outlierTrail) - out.AddMetricSamples(trails) - expSamples <- []Sample{ - *NewSampleFromTrail(&outlierTrail), - { - Type: DataTypeAggregatedHTTPReqs, - Metric: "http_req_li_all", - Data: func(data interface{}) { - aggrData, ok := data.(*SampleDataAggregatedHTTPReqs) - assert.True(t, ok) - assert.JSONEq(t, `{"test": "mest", "a": "b", "name": "name", "url": "name"}`, string(aggrData.Tags)) - assert.Equal(t, out.config.AggregationMinSamples.Int64, int64(aggrData.Count)) - assert.Equal(t, "aggregated_trend", aggrData.Type) - assert.InDelta(t, now.UnixNano(), aggrData.Time*1000, float64(out.config.AggregationPeriod.Duration)) - - checkAggrMetric(simpleTrail.Duration, aggrData.Values.Duration) - checkAggrMetric(simpleTrail.Blocked, aggrData.Values.Blocked) - checkAggrMetric(simpleTrail.Connecting, aggrData.Values.Connecting) - checkAggrMetric(simpleTrail.TLSHandshaking, aggrData.Values.TLSHandshaking) - checkAggrMetric(simpleTrail.Sending, aggrData.Values.Sending) - checkAggrMetric(simpleTrail.Waiting, aggrData.Values.Waiting) - checkAggrMetric(simpleTrail.Receiving, aggrData.Values.Receiving) - }, - }, - } - - require.NoError(t, out.StopWithTestError(nil)) -} - -func TestCloudOutputMaxPerPacket(t *testing.T) { - t.Parallel() - - tb := httpmultibin.NewHTTPMultiBin(t) - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(t, err) - out.SetTestRunID("12") - - maxMetricSamplesPerPackage := 20 - out.config.MaxMetricSamplesPerPackage = null.IntFrom(int64(maxMetricSamplesPerPackage)) //nolint:staticcheck - - now := time.Now() - registry := metrics.NewRegistry() - tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) - gotTheLimit := false - var m sync.Mutex - tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), - func(_ http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - assert.True(t, len(receivedSamples) <= maxMetricSamplesPerPackage) - if len(receivedSamples) == maxMetricSamplesPerPackage { - m.Lock() - gotTheLimit = true - m.Unlock() - } - }) - - require.NoError(t, out.Start()) - - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - for j := time.Duration(1); j <= 200; j++ { - container := make([]metrics.SampleContainer, 0, 500) - for i := time.Duration(1); i <= 50; i++ { - //nolint:durationcheck - container = append(container, &httpext.Trail{ - Blocked: i % 200 * 100 * time.Millisecond, - Connecting: i % 200 * 200 * time.Millisecond, - TLSHandshaking: i % 200 * 300 * time.Millisecond, - Sending: i * i * 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - - EndTime: now.Add(i * 100), - ConnDuration: 500 * time.Millisecond, - Duration: j * i * 1500 * time.Millisecond, - Tags: tags, - }) - } - out.AddMetricSamples(container) - } - - require.NoError(t, out.StopWithTestError(nil)) - assert.True(t, gotTheLimit) -} - -func TestCloudOutputStopSendingMetric(t *testing.T) { - t.Parallel() - t.Run("stop engine on error", func(t *testing.T) { - t.Parallel() - testCloudOutputStopSendingMetric(t, true) - }) - - t.Run("don't stop engine on error", func(t *testing.T) { - t.Parallel() - testCloudOutputStopSendingMetric(t, false) - }) -} - -func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) { - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) - - tb := httpmultibin.NewHTTPMultiBin(t) - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ - "host": "%s", "noCompress": true, - "maxMetricSamplesPerPackage": 50, - "name": "something-that-should-be-overwritten", - "stopOnError": %t - }`, tb.ServerHTTP.URL, stopOnError)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - External: map[string]json.RawMessage{ - "loadimpact": json.RawMessage(`{"name": "my-custom-name"}`), - }, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - var expectedTestStopFuncCalled int64 - if stopOnError { - expectedTestStopFuncCalled = 1 - } - var TestStopFuncCalled int64 - out.testStopFunc = func(error) { - atomic.AddInt64(&TestStopFuncCalled, 1) - } - require.NoError(t, err) - now := time.Now() - tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) - - count := 1 - max := 5 - tb.Mux.HandleFunc("/v1/metrics/12", func(w http.ResponseWriter, r *http.Request) { - count++ - if count == max { - type payload struct { - Error cloudapi.ResponseError `json:"error"` - } - res := &payload{} - res.Error = cloudapi.ResponseError{Code: 4} - w.Header().Set("Content-Type", "application/json") - data, err := json.Marshal(res) - if err != nil { - t.Fatal(err) - } - w.WriteHeader(http.StatusForbidden) - _, _ = w.Write(data) - return - } - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - }) - - out.SetTestRunID("12") - require.NoError(t, out.Start()) - - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - for j := time.Duration(1); j <= 200; j++ { - container := make([]metrics.SampleContainer, 0, 500) - for i := time.Duration(1); i <= 50; i++ { - //nolint:durationcheck - container = append(container, &httpext.Trail{ - Blocked: i % 200 * 100 * time.Millisecond, - Connecting: i % 200 * 200 * time.Millisecond, - TLSHandshaking: i % 200 * 300 * time.Millisecond, - Sending: i * i * 400 * time.Millisecond, - Waiting: 500 * time.Millisecond, - Receiving: 600 * time.Millisecond, - - EndTime: now.Add(i * 100), - ConnDuration: 500 * time.Millisecond, - Duration: j * i * 1500 * time.Millisecond, - Tags: tags, - }) - } - out.AddMetricSamples(container) - } - - require.NoError(t, out.StopWithTestError(nil)) - - select { - case <-out.stopSendingMetrics: - // all is fine - default: - t.Fatal("sending metrics wasn't stopped") - } - require.Equal(t, max, count) - require.Equal(t, expectedTestStopFuncCalled, TestStopFuncCalled) - - nBufferSamples := len(out.bufferSamples) - nBufferHTTPTrails := len(out.bufferHTTPTrails) - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.VUs, - Tags: tags, - }, - Time: now, - Value: 1.0, - }}) - if nBufferSamples != len(out.bufferSamples) || nBufferHTTPTrails != len(out.bufferHTTPTrails) { - t.Errorf("Output still collects data after stop sending metrics") - } -} - -func TestCloudOutputPushRefID(t *testing.T) { - t.Parallel() - - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(metrics.NewRegistry()) - - expSamples := make(chan []Sample) - defer close(expSamples) - - tb := httpmultibin.NewHTTPMultiBin(t) - failHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - t.Errorf("%s should not have been called at all", r.RequestURI) - }) - tb.Mux.HandleFunc("/v1/tests", failHandler) - tb.Mux.HandleFunc("/v1/tests/333", failHandler) - tb.Mux.HandleFunc("/v1/metrics/333", getSampleChecker(t, expSamples)) - - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ - "host": "%s", "noCompress": true, - "metricPushInterval": "10ms", - "aggregationPeriod": "0ms", - "pushRefID": "333" - }`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "/script.js"}, - }) - require.NoError(t, err) - - out.SetTestRunID("333") - require.NoError(t, out.Start()) - - now := time.Now() - tags := registry.RootTagSet().WithTagsFromMap(map[string]string{"test": "mest", "a": "b"}) - encodedTags, err := json.Marshal(tags) - require.NoError(t, err) - - out.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.HTTPReqDuration, - Tags: tags, - }, - Time: now, - Value: 123.45, - }}) - exp := []Sample{{ - Type: DataTypeSingle, - Metric: metrics.HTTPReqDurationName, - Data: &SampleDataSingle{ - Type: builtinMetrics.HTTPReqDuration.Type, - Time: toMicroSecond(now), - Tags: encodedTags, - Value: 123.45, - }, - }} - - select { - case expSamples <- exp: - case <-time.After(5 * time.Second): - t.Error("test timeout") - } - - require.NoError(t, out.StopWithTestError(nil)) -} - -func TestCloudOutputRecvIterLIAllIterations(t *testing.T) { - t.Parallel() - - tb := httpmultibin.NewHTTPMultiBin(t) - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{ - "host": "%s", "noCompress": true, - "maxMetricSamplesPerPackage": 50 - }`, tb.ServerHTTP.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "path/to/script.js"}, - }) - require.NoError(t, err) - - gotIterations := false - var m sync.Mutex - expValues := map[string]float64{ - "data_received": 100, - "data_sent": 200, - "iteration_duration": 60000, - "iterations": 1, - } - - tb.Mux.HandleFunc("/v1/metrics/123", func(_ http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - assert.NoError(t, err) - - receivedSamples := []Sample{} - assert.NoError(t, json.Unmarshal(body, &receivedSamples)) - - assert.Len(t, receivedSamples, 1) - assert.Equal(t, "iter_li_all", receivedSamples[0].Metric) - assert.Equal(t, DataTypeMap, receivedSamples[0].Type) - data, ok := receivedSamples[0].Data.(*SampleDataMap) - assert.True(t, ok) - assert.Equal(t, expValues, data.Values) - - m.Lock() - gotIterations = true - m.Unlock() - }) - - out.SetTestRunID("123") - require.NoError(t, out.Start()) - - now := time.Now() - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - simpleNetTrail := netext.NetTrail{ - BytesRead: 100, - BytesWritten: 200, - FullIteration: true, - StartTime: now.Add(-time.Minute), - EndTime: now, - Samples: []metrics.Sample{ - { - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.DataSent, - Tags: registry.RootTagSet(), - }, - Time: now, - Value: float64(200), - }, - { - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.DataReceived, - Tags: registry.RootTagSet(), - }, - Time: now, - Value: float64(100), - }, - { - TimeSeries: metrics.TimeSeries{ - Metric: builtinMetrics.Iterations, - Tags: registry.RootTagSet(), - }, - Time: now, - Value: 1, - }, - }, - } - - out.AddMetricSamples([]metrics.SampleContainer{&simpleNetTrail}) - require.NoError(t, out.StopWithTestError(nil)) - require.True(t, gotIterations) -} - -func TestPublishMetric(t *testing.T) { - t.Parallel() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - g, err := gzip.NewReader(r.Body) - - require.NoError(t, err) - var buf bytes.Buffer - _, err = io.Copy(&buf, g) //nolint:gosec - require.NoError(t, err) - byteCount, err := strconv.Atoi(r.Header.Get("x-payload-byte-count")) - require.NoError(t, err) - require.Equal(t, buf.Len(), byteCount) - - samplesCount, err := strconv.Atoi(r.Header.Get("x-payload-sample-count")) - require.NoError(t, err) - var samples []*Sample - err = json.Unmarshal(buf.Bytes(), &samples) - require.NoError(t, err) - require.Equal(t, len(samples), samplesCount) - - _, err = fmt.Fprintf(w, "") - require.NoError(t, err) - })) - defer server.Close() - - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": false}`, server.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(1 * time.Second), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "script.js"}, - }) - require.NoError(t, err) - - samples := []*Sample{ - { - Type: "Point", - Metric: "metric", - Data: &SampleDataSingle{ - Type: 1, - Time: toMicroSecond(time.Now()), - Value: 1.2, - }, - }, - } - err = out.client.PushMetric("1", samples) - assert.NoError(t, err) -} - -func TestNewOutputClientTimeout(t *testing.T) { - t.Parallel() - ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - })) - defer ts.Close() - - out, err := newTestOutput(output.Params{ - Logger: testutils.NewLogger(t), - JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "timeout": "2ms"}`, ts.URL)), - ScriptOptions: lib.Options{ - Duration: types.NullDurationFrom(50 * time.Millisecond), - SystemTags: &metrics.DefaultSystemTagSet, - }, - ScriptPath: &url.URL{Path: "script.js"}, - }) - require.NoError(t, err) - - err = out.client.PushMetric("testmetric", nil) - assert.True(t, os.IsTimeout(err)) //nolint:forbidigo -} - -func newTestOutput(params output.Params) (*Output, error) { - conf, err := cloudapi.GetConsolidatedConfig( - params.JSONConfig, params.Environment, params.ConfigArgument, params.ScriptOptions.External) - if err != nil { - return nil, err - } - - apiClient := cloudapi.NewClient( - params.Logger, conf.Token.String, conf.Host.String, - consts.Version, conf.Timeout.TimeDuration()) - - return New(params.Logger, conf, apiClient) -} From 520663ce3897aec43f3375038b64c16e64b3e047 Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Wed, 24 May 2023 18:36:15 +0200 Subject: [PATCH 2/5] output/cloud: New default config --- cloudapi/config.go | 37 ++++++++----------------------------- 1 file changed, 8 insertions(+), 29 deletions(-) diff --git a/cloudapi/config.go b/cloudapi/config.go index f6db3aee04a..f471b044ecb 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -162,25 +162,19 @@ type Config struct { // NewConfig creates a new Config instance with default values for some fields. func NewConfig() Config { - c := Config{ + return Config{ + APIVersion: null.NewInt(2, false), Host: null.NewString("https://ingest.k6.io", false), LogsTailURL: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false), WebAppURL: null.NewString("https://app.k6.io", false), MetricPushInterval: types.NewNullDuration(1*time.Second, false), MetricPushConcurrency: null.NewInt(1, false), - - TracesEnabled: null.NewBool(true, false), - TracesHost: null.NewString("grpc-k6-api-prod-prod-us-east-0.grafana.net:443", false), - TracesPushInterval: types.NewNullDuration(1*time.Second, false), - TracesPushConcurrency: null.NewInt(1, false), - - MaxMetricSamplesPerPackage: null.NewInt(100000, false), - Timeout: types.NewNullDuration(1*time.Minute, false), - APIVersion: null.NewInt(2, false), + Timeout: types.NewNullDuration(1*time.Minute, false), // The set value (1000) is selected for performance reasons. // Any change to this value should be first discussed with internal stakeholders. MaxTimeSeriesInBatch: null.NewInt(1000, false), + // TODO: the following values were used by the previous default version (v1). // We decided to keep the same values mostly for having a smoother migration to v2. // Because the previous version's aggregation config, a few lines below, is overwritten @@ -191,27 +185,12 @@ func NewConfig() Config { // https://github.com/grafana/k6/blob/44e1e63aadb66784ff0a12b8d9821a0fdc9e7467/output/cloud/expv2/collect.go#L72-L77 AggregationPeriod: types.NewNullDuration(3*time.Second, false), AggregationWaitPeriod: types.NewNullDuration(8*time.Second, false), - } - if c.APIVersion.Int64 == 1 { - // Aggregation is disabled by default for legacy version, since AggregationPeriod has no default value - // but if it's enabled manually or from the cloud service and the cloud doesn't override the config then - // those are the default values it will use. - c.AggregationPeriod = types.NewNullDuration(0, false) - c.AggregationCalcInterval = types.NewNullDuration(3*time.Second, false) - c.AggregationWaitPeriod = types.NewNullDuration(5*time.Second, false) - c.AggregationMinSamples = null.NewInt(25, false) - c.AggregationOutlierAlgoThreshold = null.NewInt(75, false) - c.AggregationOutlierIqrRadius = null.NewFloat(0.25, false) - - // Since we're measuring durations, the upper coefficient is slightly - // lower, since outliers from that side are more interesting than ones - // close to zero. - c.AggregationOutlierIqrCoefLower = null.NewFloat(1.5, false) - c.AggregationOutlierIqrCoefUpper = null.NewFloat(1.3, false) + TracesEnabled: null.NewBool(true, false), + TracesHost: null.NewString("grpc-k6-api-prod-prod-us-east-0.grafana.net:443", false), + TracesPushInterval: types.NewNullDuration(1*time.Second, false), + TracesPushConcurrency: null.NewInt(1, false), } - - return c } // Apply saves config non-zero config values from the passed config in the receiver. From 8083d2ca554637bf8387fb69c184f07e9354efae Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Wed, 24 May 2023 18:37:24 +0200 Subject: [PATCH 3/5] cloud/output: Switch off v1 --- output/cloud/output.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/output/cloud/output.go b/output/cloud/output.go index 9c50f383fed..0828170e327 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -16,7 +16,6 @@ import ( "go.k6.io/k6/metrics" "go.k6.io/k6/output" cloudv2 "go.k6.io/k6/output/cloud/expv2" - cloudv1 "go.k6.io/k6/output/cloud/v1" "gopkg.in/guregu/null.v3" ) @@ -44,7 +43,7 @@ type apiVersion int64 const ( apiVersionUndefined apiVersion = iota - apiVersion1 + apiVersion1 // disabled apiVersion2 ) @@ -347,7 +346,7 @@ func (out *Output) startVersionedOutput() error { switch out.config.APIVersion.Int64 { case int64(apiVersion1): - out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client) + err = errors.New("v1 is not supported anymore") case int64(apiVersion2): out.versionedOutput, err = cloudv2.New(out.logger, out.config, out.client) default: From 3eaf5b3b9b456d4e30725ec44e44f4d8a05a5722 Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Wed, 24 May 2023 18:37:59 +0200 Subject: [PATCH 4/5] output/cloud: Adjust tests for working with v2 --- output/cloud/output_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/output/cloud/output_test.go b/output/cloud/output_test.go index cf68bf0c0e4..7b2bdeb098b 100644 --- a/output/cloud/output_test.go +++ b/output/cloud/output_test.go @@ -20,7 +20,6 @@ import ( "go.k6.io/k6/metrics" "go.k6.io/k6/output" cloudv2 "go.k6.io/k6/output/cloud/expv2" - cloudv1 "go.k6.io/k6/output/cloud/v1" "gopkg.in/guregu/null.v3" ) @@ -99,7 +98,7 @@ func TestOutputCreateTestWithConfigOverwrite(t *testing.T) { "reference_id": "12345", "config": { "metricPushInterval": "10ms", - "aggregationPeriod": "1s" + "aggregationPeriod": "40s" } }`) case "/v1/tests/12345": @@ -126,7 +125,7 @@ func TestOutputCreateTestWithConfigOverwrite(t *testing.T) { require.NoError(t, out.Start()) assert.Equal(t, types.NullDurationFrom(10*time.Millisecond), out.config.MetricPushInterval) - assert.Equal(t, types.NullDurationFrom(1*time.Second), out.config.AggregationPeriod) + assert.Equal(t, types.NullDurationFrom(40*time.Second), out.config.AggregationPeriod) // Assert that it overwrites only the provided values expTimeout := types.NewNullDuration(60*time.Second, false) @@ -182,23 +181,18 @@ func TestOutputStartVersionedOutputV2(t *testing.T) { assert.True(t, ok) } -func TestOutputStartVersionedOutputV1(t *testing.T) { +func TestOutputStartVersionedOutputV1Error(t *testing.T) { t.Parallel() o := Output{ testRunID: "123", config: cloudapi.Config{ APIVersion: null.IntFrom(1), - // Here, we are enabling but silencing the related async op - MetricPushInterval: types.NullDurationFrom(1 * time.Hour), }, } err := o.startVersionedOutput() - require.NoError(t, err) - - _, ok := o.versionedOutput.(*cloudv1.Output) - assert.True(t, ok) + assert.ErrorContains(t, err, "not supported anymore") } func TestOutputStartWithTestRunID(t *testing.T) { From d11a4f18501cea68051a40f213935eb856b228a1 Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Thu, 6 Jul 2023 10:30:30 +0200 Subject: [PATCH 5/5] cloudapi: Drop v1-only options --- cloudapi/api_test.go | 2 +- cloudapi/config.go | 140 +++------------------------------------- cloudapi/config_test.go | 46 ++++++------- output/cloud/output.go | 5 -- 4 files changed, 29 insertions(+), 164 deletions(-) diff --git a/cloudapi/api_test.go b/cloudapi/api_test.go index 89aeb46659f..0024e30edc2 100644 --- a/cloudapi/api_test.go +++ b/cloudapi/api_test.go @@ -45,7 +45,7 @@ func TestCreateTestRun(t *testing.T) { assert.NotNil(t, resp.ConfigOverride) assert.True(t, resp.ConfigOverride.AggregationPeriod.Valid) assert.Equal(t, types.Duration(2*time.Second), resp.ConfigOverride.AggregationPeriod.Duration) - assert.False(t, resp.ConfigOverride.AggregationMinSamples.Valid) + assert.False(t, resp.ConfigOverride.MaxTimeSeriesInBatch.Valid) } func TestFinished(t *testing.T) { diff --git a/cloudapi/config.go b/cloudapi/config.go index f471b044ecb..c244130290a 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -28,21 +28,27 @@ type Config struct { StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"` APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` - // Defines the max allowed number of time series in a single batch. - MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"` - // PushRefID represents the test run id. // Note: It is a legacy name used by the backend, the code in k6 open-source // references it as test run id. // Currently, a renaming is not planned. PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"` + // Defines the max allowed number of time series in a single batch. + MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"` + // The time interval between periodic API calls for sending samples to the cloud ingest service. MetricPushInterval types.NullDuration `json:"metricPushInterval" envconfig:"K6_CLOUD_METRIC_PUSH_INTERVAL"` // This is how many concurrent pushes will be done at the same time to the cloud MetricPushConcurrency null.Int `json:"metricPushConcurrency" envconfig:"K6_CLOUD_METRIC_PUSH_CONCURRENCY"` + // If specified and is greater than 0, sample aggregation with that period is enabled + AggregationPeriod types.NullDuration `json:"aggregationPeriod" envconfig:"K6_CLOUD_AGGREGATION_PERIOD"` + + // If aggregation is enabled, this specifies how long we'll wait for period samples to accumulate before trying to aggregate them. + AggregationWaitPeriod types.NullDuration `json:"aggregationWaitPeriod" envconfig:"K6_CLOUD_AGGREGATION_WAIT_PERIOD"` + // Indicates whether to send traces to the k6 Insights backend service. TracesEnabled null.Bool `json:"tracesEnabled" envconfig:"K6_CLOUD_TRACES_ENABLED"` @@ -54,110 +60,6 @@ type Config struct { // The time interval between periodic API calls for sending samples to the cloud ingest service. TracesPushInterval types.NullDuration `json:"tracesPushInterval" envconfig:"K6_CLOUD_TRACES_PUSH_INTERVAL"` - - // Aggregation docs: - // - // If AggregationPeriod is specified and if it is greater than 0, HTTP metric aggregation - // with that period will be enabled. The general algorithm is this: - // - HTTP trail samples will be collected separately and not - // included in the default sample buffer (which is directly sent - // to the cloud service every MetricPushInterval). - // - On every AggregationCalcInterval, all collected HTTP Trails will be - // split into AggregationPeriod-sized time buckets (time slots) and - // then into sub-buckets according to their tags (each sub-bucket - // will contain only HTTP trails with the same sample tags - - // proto, staus, URL, method, etc.). - // - If at that time the specified AggregationWaitPeriod has not passed - // for a particular time bucket, it will be left undisturbed until the next - // AggregationCalcInterval tick comes along. - // - If AggregationWaitPeriod has passed for a time bucket, all of its - // sub-buckets will be traversed. Any sub-buckets that have less than - // AggregationMinSamples HTTP trails in them will not be aggregated. - // Instead the HTTP trails in them will just be individually added - // to the default sample buffer, like they would be if there was no - // aggregation. - // - Sub-buckets with at least AggregationMinSamples HTTP trails on the - // other hand will be aggregated according to the algorithm below: - // - If AggregationSkipOutlierDetection is enabled, all of the collected - // HTTP trails in that sub-bucket will be directly aggregated into a single - // compoind metric sample, without any attempt at outlier detection. - // IMPORTANT: This is intended only for testing purposes only or, in - // extreme cases, when the resulting metrics' precision isn't very important, - // since it could lead to a huge loss of granularity and the masking - // of any outlier samples in the data. - // - By default (since AggregationSkipOutlierDetection is not enabled), - // the collected HTTP trails will be checked for outliers, so we don't lose - // granularity by accidentally aggregating them. That happens by finding - // the "quartiles" (by default the 75th and 25th percentiles) in the - // sub-bucket datapoints and using the inter-quartile range (IQR) to find - // any outliers (https://en.wikipedia.org/wiki/Interquartile_range#Outliers, - // though the specific parameters and coefficients can be customized - // by the AggregationOutlier{Radius,CoefLower,CoefUpper} options) - // - Depending on the number of samples in the sub-bucket, two different - // algorithms could be used to calculate the quartiles. If there are - // fewer samples (between AggregationMinSamples and AggregationOutlierAlgoThreshold), - // then a more precise but also more computationally-heavy sorting-based algorithm - // will be used. For sub-buckets with more samples, a lighter quickselect-based - // algorithm will be used, potentially with a very minor loss of precision. - // - Regardless of the used algorithm, once the quartiles for that sub-bucket - // are found and the IQR is calculated, every HTTP trail in the sub-bucket will - // be checked if it seems like an outlier. HTTP trails are evaluated by two different - // criteria whether they seem like outliers - by their total connection time (i.e. - // http_req_connecting + http_req_tls_handshaking) and by their total request time - // (i.e. http_req_sending + http_req_waiting + http_req_receiving). If any of those - // properties of an HTTP trail is out of the calculated "normal" bounds for the - // sub-bucket, it will be considered an outlier and will be sent to the cloud - // individually - it's simply added to the default sample buffer, like it would - // be if there was no aggregation. - // - Finally, all non-outliers are aggregated and the resultig single metric is also - // added to the default sample buffer for sending to the cloud ingest service - // on the next MetricPushInterval event. - - // If specified and is greater than 0, sample aggregation with that period is enabled - AggregationPeriod types.NullDuration `json:"aggregationPeriod" envconfig:"K6_CLOUD_AGGREGATION_PERIOD"` - - // If aggregation is enabled, this is how often new HTTP trails will be sorted into buckets and sub-buckets and aggregated. - AggregationCalcInterval types.NullDuration `json:"aggregationCalcInterval" envconfig:"K6_CLOUD_AGGREGATION_CALC_INTERVAL"` - - // If aggregation is enabled, this specifies how long we'll wait for period samples to accumulate before trying to aggregate them. - AggregationWaitPeriod types.NullDuration `json:"aggregationWaitPeriod" envconfig:"K6_CLOUD_AGGREGATION_WAIT_PERIOD"` - - // If aggregation is enabled, but the collected samples for a certain AggregationPeriod after AggregationPushDelay has passed are less than this number, they won't be aggregated. - AggregationMinSamples null.Int `json:"aggregationMinSamples" envconfig:"K6_CLOUD_AGGREGATION_MIN_SAMPLES"` - - // If this is enabled and a sub-bucket has more than AggregationMinSamples HTTP trails in it, they would all be - // aggregated without attempting to find and separate any outlier metrics first. - // IMPORTANT: This is intended for testing purposes only or, in extreme cases, when the result precision - // isn't very important and the improved aggregation percentage would be worth the potentially huge loss - // of metric granularity and possible masking of any outlier samples. - AggregationSkipOutlierDetection null.Bool `json:"aggregationSkipOutlierDetection" envconfig:"K6_CLOUD_AGGREGATION_SKIP_OUTLIER_DETECTION"` - - // If aggregation and outlier detection are enabled, this option specifies the - // number of HTTP trails in a sub-bucket that determine which quartile-calculating - // algorithm would be used: - // - for fewer samples (between MinSamples and OutlierAlgoThreshold), a more precise - // (i.e. supporting interpolation), but also more computationally-heavy sorting - // algorithm will be used to find the quartiles. - // - if there are more samples than OutlierAlgoThreshold in the sub-bucket, a - // QuickSelect-based (https://en.wikipedia.org/wiki/Quickselect) algorithm will - // be used. It doesn't support interpolation, so there's a small loss of precision - // in the outlier detection, but it's not as resource-heavy as the sorting algorithm. - AggregationOutlierAlgoThreshold null.Int `json:"aggregationOutlierAlgoThreshold" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_ALGO_THRESHOLD"` - - // The radius (as a fraction) from the median at which to sample Q1 and Q3. - // By default it's one quarter (0.25) and if set to something different, the Q in IQR - // won't make much sense... But this would allow us to select tighter sample groups for - // aggregation if we want. - AggregationOutlierIqrRadius null.Float `json:"aggregationOutlierIqrRadius" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_IQR_RADIUS"` - - // Connection or request times with how many IQRs below Q1 to consier as non-aggregatable outliers. - AggregationOutlierIqrCoefLower null.Float `json:"aggregationOutlierIqrCoefLower" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_IQR_COEF_LOWER"` - - // Connection or request times with how many IQRs above Q3 to consier as non-aggregatable outliers. - AggregationOutlierIqrCoefUpper null.Float `json:"aggregationOutlierIqrCoefUpper" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_IQR_COEF_UPPER"` - - // Deprecated: Remove this when migration from the cloud output v1 will be completed - MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"` } // NewConfig creates a new Config instance with default values for some fields. @@ -233,9 +135,6 @@ func (c Config) Apply(cfg Config) Config { if cfg.APIVersion.Valid { c.APIVersion = cfg.APIVersion } - if cfg.MaxMetricSamplesPerPackage.Valid { - c.MaxMetricSamplesPerPackage = cfg.MaxMetricSamplesPerPackage - } if cfg.MaxTimeSeriesInBatch.Valid { c.MaxTimeSeriesInBatch = cfg.MaxTimeSeriesInBatch } @@ -260,30 +159,9 @@ func (c Config) Apply(cfg Config) Config { if cfg.AggregationPeriod.Valid { c.AggregationPeriod = cfg.AggregationPeriod } - if cfg.AggregationCalcInterval.Valid { - c.AggregationCalcInterval = cfg.AggregationCalcInterval - } if cfg.AggregationWaitPeriod.Valid { c.AggregationWaitPeriod = cfg.AggregationWaitPeriod } - if cfg.AggregationMinSamples.Valid { - c.AggregationMinSamples = cfg.AggregationMinSamples - } - if cfg.AggregationSkipOutlierDetection.Valid { - c.AggregationSkipOutlierDetection = cfg.AggregationSkipOutlierDetection - } - if cfg.AggregationOutlierAlgoThreshold.Valid { - c.AggregationOutlierAlgoThreshold = cfg.AggregationOutlierAlgoThreshold - } - if cfg.AggregationOutlierIqrRadius.Valid { - c.AggregationOutlierIqrRadius = cfg.AggregationOutlierIqrRadius - } - if cfg.AggregationOutlierIqrCoefLower.Valid { - c.AggregationOutlierIqrCoefLower = cfg.AggregationOutlierIqrCoefLower - } - if cfg.AggregationOutlierIqrCoefUpper.Valid { - c.AggregationOutlierIqrCoefUpper = cfg.AggregationOutlierIqrCoefUpper - } return c } diff --git a/cloudapi/config_test.go b/cloudapi/config_test.go index c0ae33770c2..6e84a147052 100644 --- a/cloudapi/config_test.go +++ b/cloudapi/config_test.go @@ -24,34 +24,26 @@ func TestConfigApply(t *testing.T) { assert.Equal(t, defaults, defaults.Apply(empty).Apply(empty)) full := Config{ - Token: null.NewString("Token", true), - ProjectID: null.NewInt(1, true), - Name: null.NewString("Name", true), - Host: null.NewString("Host", true), - Timeout: types.NewNullDuration(5*time.Second, true), - LogsTailURL: null.NewString("LogsTailURL", true), - PushRefID: null.NewString("PushRefID", true), - WebAppURL: null.NewString("foo", true), - NoCompress: null.NewBool(true, true), - StopOnError: null.NewBool(true, true), - APIVersion: null.NewInt(2, true), - MaxMetricSamplesPerPackage: null.NewInt(2, true), - MaxTimeSeriesInBatch: null.NewInt(3, true), - MetricPushInterval: types.NewNullDuration(1*time.Second, true), - MetricPushConcurrency: null.NewInt(3, true), - TracesEnabled: null.NewBool(true, true), - TracesHost: null.NewString("TracesHost", true), - TracesPushInterval: types.NewNullDuration(10*time.Second, true), + Token: null.NewString("Token", true), + ProjectID: null.NewInt(1, true), + Name: null.NewString("Name", true), + Host: null.NewString("Host", true), + Timeout: types.NewNullDuration(5*time.Second, true), + LogsTailURL: null.NewString("LogsTailURL", true), + PushRefID: null.NewString("PushRefID", true), + WebAppURL: null.NewString("foo", true), + NoCompress: null.NewBool(true, true), + StopOnError: null.NewBool(true, true), + APIVersion: null.NewInt(2, true), + AggregationPeriod: types.NewNullDuration(2*time.Second, true), + AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true), + MaxTimeSeriesInBatch: null.NewInt(3, true), + MetricPushInterval: types.NewNullDuration(1*time.Second, true), + MetricPushConcurrency: null.NewInt(3, true), + TracesEnabled: null.NewBool(true, true), + TracesHost: null.NewString("TracesHost", true), + TracesPushInterval: types.NewNullDuration(10*time.Second, true), TracesPushConcurrency: null.NewInt(6, true), - AggregationPeriod: types.NewNullDuration(2*time.Second, true), - AggregationCalcInterval: types.NewNullDuration(3*time.Second, true), - AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true), - AggregationMinSamples: null.NewInt(4, true), - AggregationSkipOutlierDetection: null.NewBool(true, true), - AggregationOutlierAlgoThreshold: null.NewInt(5, true), - AggregationOutlierIqrRadius: null.NewFloat(6, true), - AggregationOutlierIqrCoefLower: null.NewFloat(7, true), - AggregationOutlierIqrCoefUpper: null.NewFloat(8, true), } assert.Equal(t, full, full.Apply(empty)) diff --git a/output/cloud/output.go b/output/cloud/output.go index 0828170e327..08eb52017ea 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -112,11 +112,6 @@ func newOutput(params output.Params) (*Output, error) { conf.MetricPushConcurrency.Int64) } - if conf.MaxMetricSamplesPerPackage.Int64 < 1 { //nolint:staticcheck - return nil, fmt.Errorf("metric samples per package must be a positive number but is %d", - conf.MaxMetricSamplesPerPackage.Int64) //nolint:staticcheck - } - if conf.MaxTimeSeriesInBatch.Int64 < 1 { return nil, fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", conf.MaxTimeSeriesInBatch.Int64)