Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output/cloud: Drop version 1 #3092

Merged
merged 5 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudapi/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestCreateTestRun(t *testing.T) {
assert.NotNil(t, resp.ConfigOverride)
assert.True(t, resp.ConfigOverride.AggregationPeriod.Valid)
assert.Equal(t, types.Duration(2*time.Second), resp.ConfigOverride.AggregationPeriod.Duration)
assert.False(t, resp.ConfigOverride.AggregationMinSamples.Valid)
assert.False(t, resp.ConfigOverride.MaxTimeSeriesInBatch.Valid)
}

func TestFinished(t *testing.T) {
Expand Down
177 changes: 17 additions & 160 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ type Config struct {
StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"`
APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"`

// Defines the max allowed number of time series in a single batch.
MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"`

// PushRefID represents the test run id.
// Note: It is a legacy name used by the backend, the code in k6 open-source
// references it as test run id.
// Currently, a renaming is not planned.
PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"`

// Defines the max allowed number of time series in a single batch.
MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"`

// The time interval between periodic API calls for sending samples to the cloud ingest service.
MetricPushInterval types.NullDuration `json:"metricPushInterval" envconfig:"K6_CLOUD_METRIC_PUSH_INTERVAL"`

// This is how many concurrent pushes will be done at the same time to the cloud
MetricPushConcurrency null.Int `json:"metricPushConcurrency" envconfig:"K6_CLOUD_METRIC_PUSH_CONCURRENCY"`

// If specified and is greater than 0, sample aggregation with that period is enabled
AggregationPeriod types.NullDuration `json:"aggregationPeriod" envconfig:"K6_CLOUD_AGGREGATION_PERIOD"`

// If aggregation is enabled, this specifies how long we'll wait for period samples to accumulate before trying to aggregate them.
AggregationWaitPeriod types.NullDuration `json:"aggregationWaitPeriod" envconfig:"K6_CLOUD_AGGREGATION_WAIT_PERIOD"`

// Indicates whether to send traces to the k6 Insights backend service.
TracesEnabled null.Bool `json:"tracesEnabled" envconfig:"K6_CLOUD_TRACES_ENABLED"`

Expand All @@ -54,133 +60,23 @@ type Config struct {

// The time interval between periodic API calls for sending samples to the cloud ingest service.
TracesPushInterval types.NullDuration `json:"tracesPushInterval" envconfig:"K6_CLOUD_TRACES_PUSH_INTERVAL"`

// Aggregation docs:
//
// If AggregationPeriod is specified and if it is greater than 0, HTTP metric aggregation
// with that period will be enabled. The general algorithm is this:
// - HTTP trail samples will be collected separately and not
// included in the default sample buffer (which is directly sent
// to the cloud service every MetricPushInterval).
// - On every AggregationCalcInterval, all collected HTTP Trails will be
// split into AggregationPeriod-sized time buckets (time slots) and
// then into sub-buckets according to their tags (each sub-bucket
// will contain only HTTP trails with the same sample tags -
// proto, staus, URL, method, etc.).
// - If at that time the specified AggregationWaitPeriod has not passed
// for a particular time bucket, it will be left undisturbed until the next
// AggregationCalcInterval tick comes along.
// - If AggregationWaitPeriod has passed for a time bucket, all of its
// sub-buckets will be traversed. Any sub-buckets that have less than
// AggregationMinSamples HTTP trails in them will not be aggregated.
// Instead the HTTP trails in them will just be individually added
// to the default sample buffer, like they would be if there was no
// aggregation.
// - Sub-buckets with at least AggregationMinSamples HTTP trails on the
// other hand will be aggregated according to the algorithm below:
// - If AggregationSkipOutlierDetection is enabled, all of the collected
// HTTP trails in that sub-bucket will be directly aggregated into a single
// compoind metric sample, without any attempt at outlier detection.
// IMPORTANT: This is intended only for testing purposes only or, in
// extreme cases, when the resulting metrics' precision isn't very important,
// since it could lead to a huge loss of granularity and the masking
// of any outlier samples in the data.
// - By default (since AggregationSkipOutlierDetection is not enabled),
// the collected HTTP trails will be checked for outliers, so we don't lose
// granularity by accidentally aggregating them. That happens by finding
// the "quartiles" (by default the 75th and 25th percentiles) in the
// sub-bucket datapoints and using the inter-quartile range (IQR) to find
// any outliers (https://en.wikipedia.org/wiki/Interquartile_range#Outliers,
// though the specific parameters and coefficients can be customized
// by the AggregationOutlier{Radius,CoefLower,CoefUpper} options)
// - Depending on the number of samples in the sub-bucket, two different
// algorithms could be used to calculate the quartiles. If there are
// fewer samples (between AggregationMinSamples and AggregationOutlierAlgoThreshold),
// then a more precise but also more computationally-heavy sorting-based algorithm
// will be used. For sub-buckets with more samples, a lighter quickselect-based
// algorithm will be used, potentially with a very minor loss of precision.
// - Regardless of the used algorithm, once the quartiles for that sub-bucket
// are found and the IQR is calculated, every HTTP trail in the sub-bucket will
// be checked if it seems like an outlier. HTTP trails are evaluated by two different
// criteria whether they seem like outliers - by their total connection time (i.e.
// http_req_connecting + http_req_tls_handshaking) and by their total request time
// (i.e. http_req_sending + http_req_waiting + http_req_receiving). If any of those
// properties of an HTTP trail is out of the calculated "normal" bounds for the
// sub-bucket, it will be considered an outlier and will be sent to the cloud
// individually - it's simply added to the default sample buffer, like it would
// be if there was no aggregation.
// - Finally, all non-outliers are aggregated and the resultig single metric is also
// added to the default sample buffer for sending to the cloud ingest service
// on the next MetricPushInterval event.

// If specified and is greater than 0, sample aggregation with that period is enabled
AggregationPeriod types.NullDuration `json:"aggregationPeriod" envconfig:"K6_CLOUD_AGGREGATION_PERIOD"`

// If aggregation is enabled, this is how often new HTTP trails will be sorted into buckets and sub-buckets and aggregated.
AggregationCalcInterval types.NullDuration `json:"aggregationCalcInterval" envconfig:"K6_CLOUD_AGGREGATION_CALC_INTERVAL"`

// If aggregation is enabled, this specifies how long we'll wait for period samples to accumulate before trying to aggregate them.
AggregationWaitPeriod types.NullDuration `json:"aggregationWaitPeriod" envconfig:"K6_CLOUD_AGGREGATION_WAIT_PERIOD"`

// If aggregation is enabled, but the collected samples for a certain AggregationPeriod after AggregationPushDelay has passed are less than this number, they won't be aggregated.
AggregationMinSamples null.Int `json:"aggregationMinSamples" envconfig:"K6_CLOUD_AGGREGATION_MIN_SAMPLES"`

// If this is enabled and a sub-bucket has more than AggregationMinSamples HTTP trails in it, they would all be
// aggregated without attempting to find and separate any outlier metrics first.
// IMPORTANT: This is intended for testing purposes only or, in extreme cases, when the result precision
// isn't very important and the improved aggregation percentage would be worth the potentially huge loss
// of metric granularity and possible masking of any outlier samples.
AggregationSkipOutlierDetection null.Bool `json:"aggregationSkipOutlierDetection" envconfig:"K6_CLOUD_AGGREGATION_SKIP_OUTLIER_DETECTION"`

// If aggregation and outlier detection are enabled, this option specifies the
// number of HTTP trails in a sub-bucket that determine which quartile-calculating
// algorithm would be used:
// - for fewer samples (between MinSamples and OutlierAlgoThreshold), a more precise
// (i.e. supporting interpolation), but also more computationally-heavy sorting
// algorithm will be used to find the quartiles.
// - if there are more samples than OutlierAlgoThreshold in the sub-bucket, a
// QuickSelect-based (https://en.wikipedia.org/wiki/Quickselect) algorithm will
// be used. It doesn't support interpolation, so there's a small loss of precision
// in the outlier detection, but it's not as resource-heavy as the sorting algorithm.
AggregationOutlierAlgoThreshold null.Int `json:"aggregationOutlierAlgoThreshold" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_ALGO_THRESHOLD"`

// The radius (as a fraction) from the median at which to sample Q1 and Q3.
// By default it's one quarter (0.25) and if set to something different, the Q in IQR
// won't make much sense... But this would allow us to select tighter sample groups for
// aggregation if we want.
AggregationOutlierIqrRadius null.Float `json:"aggregationOutlierIqrRadius" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_IQR_RADIUS"`

// Connection or request times with how many IQRs below Q1 to consier as non-aggregatable outliers.
AggregationOutlierIqrCoefLower null.Float `json:"aggregationOutlierIqrCoefLower" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_IQR_COEF_LOWER"`

// Connection or request times with how many IQRs above Q3 to consier as non-aggregatable outliers.
AggregationOutlierIqrCoefUpper null.Float `json:"aggregationOutlierIqrCoefUpper" envconfig:"K6_CLOUD_AGGREGATION_OUTLIER_IQR_COEF_UPPER"`

// Deprecated: Remove this when migration from the cloud output v1 will be completed
MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"`
}

// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
c := Config{
return Config{
APIVersion: null.NewInt(2, false),
Host: null.NewString("https://ingest.k6.io", false),
LogsTailURL: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false),
WebAppURL: null.NewString("https://app.k6.io", false),
MetricPushInterval: types.NewNullDuration(1*time.Second, false),
MetricPushConcurrency: null.NewInt(1, false),

TracesEnabled: null.NewBool(true, false),
TracesHost: null.NewString("grpc-k6-api-prod-prod-us-east-0.grafana.net:443", false),
TracesPushInterval: types.NewNullDuration(1*time.Second, false),
TracesPushConcurrency: null.NewInt(1, false),

MaxMetricSamplesPerPackage: null.NewInt(100000, false),
Timeout: types.NewNullDuration(1*time.Minute, false),
APIVersion: null.NewInt(2, false),
Timeout: types.NewNullDuration(1*time.Minute, false),

// The set value (1000) is selected for performance reasons.
// Any change to this value should be first discussed with internal stakeholders.
MaxTimeSeriesInBatch: null.NewInt(1000, false),

// TODO: the following values were used by the previous default version (v1).
// We decided to keep the same values mostly for having a smoother migration to v2.
// Because the previous version's aggregation config, a few lines below, is overwritten
Expand All @@ -191,27 +87,12 @@ func NewConfig() Config {
// https://github.com/grafana/k6/blob/44e1e63aadb66784ff0a12b8d9821a0fdc9e7467/output/cloud/expv2/collect.go#L72-L77
AggregationPeriod: types.NewNullDuration(3*time.Second, false),
AggregationWaitPeriod: types.NewNullDuration(8*time.Second, false),
}

if c.APIVersion.Int64 == 1 {
// Aggregation is disabled by default for legacy version, since AggregationPeriod has no default value
// but if it's enabled manually or from the cloud service and the cloud doesn't override the config then
// those are the default values it will use.
c.AggregationPeriod = types.NewNullDuration(0, false)
c.AggregationCalcInterval = types.NewNullDuration(3*time.Second, false)
c.AggregationWaitPeriod = types.NewNullDuration(5*time.Second, false)
c.AggregationMinSamples = null.NewInt(25, false)
c.AggregationOutlierAlgoThreshold = null.NewInt(75, false)
c.AggregationOutlierIqrRadius = null.NewFloat(0.25, false)

// Since we're measuring durations, the upper coefficient is slightly
// lower, since outliers from that side are more interesting than ones
// close to zero.
c.AggregationOutlierIqrCoefLower = null.NewFloat(1.5, false)
c.AggregationOutlierIqrCoefUpper = null.NewFloat(1.3, false)
TracesEnabled: null.NewBool(true, false),
TracesHost: null.NewString("grpc-k6-api-prod-prod-us-east-0.grafana.net:443", false),
TracesPushInterval: types.NewNullDuration(1*time.Second, false),
TracesPushConcurrency: null.NewInt(1, false),
}

return c
}

// Apply saves config non-zero config values from the passed config in the receiver.
Expand Down Expand Up @@ -254,9 +135,6 @@ func (c Config) Apply(cfg Config) Config {
if cfg.APIVersion.Valid {
c.APIVersion = cfg.APIVersion
}
if cfg.MaxMetricSamplesPerPackage.Valid {
c.MaxMetricSamplesPerPackage = cfg.MaxMetricSamplesPerPackage
}
if cfg.MaxTimeSeriesInBatch.Valid {
c.MaxTimeSeriesInBatch = cfg.MaxTimeSeriesInBatch
}
Expand All @@ -281,30 +159,9 @@ func (c Config) Apply(cfg Config) Config {
if cfg.AggregationPeriod.Valid {
c.AggregationPeriod = cfg.AggregationPeriod
}
if cfg.AggregationCalcInterval.Valid {
c.AggregationCalcInterval = cfg.AggregationCalcInterval
}
if cfg.AggregationWaitPeriod.Valid {
c.AggregationWaitPeriod = cfg.AggregationWaitPeriod
}
if cfg.AggregationMinSamples.Valid {
c.AggregationMinSamples = cfg.AggregationMinSamples
}
if cfg.AggregationSkipOutlierDetection.Valid {
c.AggregationSkipOutlierDetection = cfg.AggregationSkipOutlierDetection
}
if cfg.AggregationOutlierAlgoThreshold.Valid {
c.AggregationOutlierAlgoThreshold = cfg.AggregationOutlierAlgoThreshold
}
if cfg.AggregationOutlierIqrRadius.Valid {
c.AggregationOutlierIqrRadius = cfg.AggregationOutlierIqrRadius
}
if cfg.AggregationOutlierIqrCoefLower.Valid {
c.AggregationOutlierIqrCoefLower = cfg.AggregationOutlierIqrCoefLower
}
if cfg.AggregationOutlierIqrCoefUpper.Valid {
c.AggregationOutlierIqrCoefUpper = cfg.AggregationOutlierIqrCoefUpper
}
return c
}

Expand Down
46 changes: 19 additions & 27 deletions cloudapi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,26 @@ func TestConfigApply(t *testing.T) {
assert.Equal(t, defaults, defaults.Apply(empty).Apply(empty))

full := Config{
Token: null.NewString("Token", true),
ProjectID: null.NewInt(1, true),
Name: null.NewString("Name", true),
Host: null.NewString("Host", true),
Timeout: types.NewNullDuration(5*time.Second, true),
LogsTailURL: null.NewString("LogsTailURL", true),
PushRefID: null.NewString("PushRefID", true),
WebAppURL: null.NewString("foo", true),
NoCompress: null.NewBool(true, true),
StopOnError: null.NewBool(true, true),
APIVersion: null.NewInt(2, true),
MaxMetricSamplesPerPackage: null.NewInt(2, true),
MaxTimeSeriesInBatch: null.NewInt(3, true),
MetricPushInterval: types.NewNullDuration(1*time.Second, true),
MetricPushConcurrency: null.NewInt(3, true),
TracesEnabled: null.NewBool(true, true),
TracesHost: null.NewString("TracesHost", true),
TracesPushInterval: types.NewNullDuration(10*time.Second, true),
Token: null.NewString("Token", true),
ProjectID: null.NewInt(1, true),
Name: null.NewString("Name", true),
Host: null.NewString("Host", true),
Timeout: types.NewNullDuration(5*time.Second, true),
LogsTailURL: null.NewString("LogsTailURL", true),
PushRefID: null.NewString("PushRefID", true),
WebAppURL: null.NewString("foo", true),
NoCompress: null.NewBool(true, true),
StopOnError: null.NewBool(true, true),
APIVersion: null.NewInt(2, true),
AggregationPeriod: types.NewNullDuration(2*time.Second, true),
AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true),
MaxTimeSeriesInBatch: null.NewInt(3, true),
MetricPushInterval: types.NewNullDuration(1*time.Second, true),
MetricPushConcurrency: null.NewInt(3, true),
TracesEnabled: null.NewBool(true, true),
TracesHost: null.NewString("TracesHost", true),
TracesPushInterval: types.NewNullDuration(10*time.Second, true),
TracesPushConcurrency: null.NewInt(6, true),
AggregationPeriod: types.NewNullDuration(2*time.Second, true),
AggregationCalcInterval: types.NewNullDuration(3*time.Second, true),
AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true),
AggregationMinSamples: null.NewInt(4, true),
AggregationSkipOutlierDetection: null.NewBool(true, true),
AggregationOutlierAlgoThreshold: null.NewInt(5, true),
AggregationOutlierIqrRadius: null.NewFloat(6, true),
AggregationOutlierIqrCoefLower: null.NewFloat(7, true),
AggregationOutlierIqrCoefUpper: null.NewFloat(8, true),
}

assert.Equal(t, full, full.Apply(empty))
Expand Down
10 changes: 2 additions & 8 deletions output/cloud/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
cloudv2 "go.k6.io/k6/output/cloud/expv2"
cloudv1 "go.k6.io/k6/output/cloud/v1"
"gopkg.in/guregu/null.v3"
)

Expand Down Expand Up @@ -44,7 +43,7 @@ type apiVersion int64

const (
apiVersionUndefined apiVersion = iota
apiVersion1
apiVersion1 // disabled
apiVersion2
)

Expand Down Expand Up @@ -113,11 +112,6 @@ func newOutput(params output.Params) (*Output, error) {
conf.MetricPushConcurrency.Int64)
}

if conf.MaxMetricSamplesPerPackage.Int64 < 1 { //nolint:staticcheck
return nil, fmt.Errorf("metric samples per package must be a positive number but is %d",
conf.MaxMetricSamplesPerPackage.Int64) //nolint:staticcheck
}

if conf.MaxTimeSeriesInBatch.Int64 < 1 {
return nil, fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d",
conf.MaxTimeSeriesInBatch.Int64)
Expand Down Expand Up @@ -347,7 +341,7 @@ func (out *Output) startVersionedOutput() error {

switch out.config.APIVersion.Int64 {
case int64(apiVersion1):
out.versionedOutput, err = cloudv1.New(out.logger, out.config, out.client)
err = errors.New("v1 is not supported anymore")
case int64(apiVersion2):
out.versionedOutput, err = cloudv2.New(out.logger, out.config, out.client)
default:
Expand Down
Loading
Loading