diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index bc850de531..4b9f4aac64 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -1274,6 +1274,87 @@ func TestDownsamplerAggregationWithRemoteAggregatorClient(t *testing.T) { testDownsamplerRemoteAggregation(t, testDownsampler) } +func TestDownsamplerWithOverrideNamespace(t *testing.T) { + overrideNamespaceTag := "override_namespace_tag" + + gaugeMetric := testGaugeMetric{ + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value", + // Set namespace tags on ingested metrics. + // The test demonstrates that overrideNamespaceTag is respected, meaning setting + // values on defaultNamespaceTag won't affect aggregation. + defaultNamespaceTag: "namespace_ignored", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 42}, + {value: 64, offset: 5 * time.Second}, + }, + } + res := 5 * time.Second + ret := 30 * 24 * time.Hour + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + rulesConfig: &RulesConfiguration{ + RollupRules: []RollupRuleConfiguration{ + { + Filter: fmt.Sprintf( + "%s:http_requests app:* status_code:* endpoint:*", + nameTag), + Transforms: []TransformConfiguration{ + { + Transform: &TransformOperationConfiguration{ + Type: transformation.PerSecond, + }, + }, + { + Rollup: &RollupOperationConfiguration{ + MetricName: "http_requests_by_status_code", + GroupBy: []string{"app", "status_code", "endpoint"}, + Aggregations: []aggregation.Type{aggregation.Sum}, + }, + }, + }, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }, + matcherConfig: MatcherConfiguration{NamespaceTag: overrideNamespaceTag}, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: []testGaugeMetric{gaugeMetric}, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + { + tags: map[string]string{ + nameTag: "http_requests_by_status_code", + string(rollupTagName): string(rollupTagValue), + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + }, + values: []expectedValue{{value: 4.4}}, + attributes: &storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Resolution: res, + Retention: ret, + }, + }, + }, + }, + }) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas { ds, ok := testDownsampler.downsampler.(*downsampler) require.True(t, ok) @@ -1751,6 +1832,7 @@ type testDownsamplerOptions struct { sampleAppenderOpts *SampleAppenderOptions remoteClientMock *client.MockClient rulesConfig *RulesConfiguration + matcherConfig MatcherConfiguration // Test ingest and expectations overrides ingest *testDownsamplerOptionsIngest @@ -1821,6 +1903,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl if opts.rulesConfig != nil { cfg.Rules = opts.rulesConfig } + cfg.Matcher = opts.matcherConfig instance, err := cfg.NewDownsampler(DownsamplerOptions{ Storage: storage, diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index b1b61df731..da966cfe00 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -86,7 +86,8 @@ const ( ) var ( - numShards = runtime.NumCPU() + numShards = runtime.NumCPU() + defaultNamespaceTag = metric.M3MetricsPrefixString + "_namespace__" errNoStorage = errors.New("downsampling enabled with storage not set") errNoClusterClient = errors.New("downsampling enabled with cluster client not set") @@ -267,6 +268,9 @@ type Configuration struct { type MatcherConfiguration struct { // Cache if non-zero will set the capacity of the rules matching cache. Cache MatcherCacheConfiguration `yaml:"cache"` + // NamespaceTag defines the namespace tag to use to select rules + // namespace to evaluate against. Default is "__m3_namespace__". + NamespaceTag string `yaml:"namespaceTag"` } // MatcherCacheConfiguration is the configuration for the rule matcher cache. @@ -647,6 +651,7 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { logger = instrumentOpts.Logger() openTimeout = defaultOpenTimeout m3PrefixFilter = false + namespaceTag = defaultNamespaceTag ) if o.StorageFlushConcurrency > 0 { storageFlushConcurrency = o.StorageFlushConcurrency @@ -654,6 +659,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { if o.OpenTimeout > 0 { openTimeout = o.OpenTimeout } + if cfg.Matcher.NamespaceTag != "" { + namespaceTag = cfg.Matcher.NamespaceTag + } pools := o.newAggregatorPools() ruleSetOpts := o.newAggregatorRulesOptions(pools) @@ -662,7 +670,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetClockOptions(clockOpts). SetInstrumentOptions(instrumentOpts). SetRuleSetOptions(ruleSetOpts). - SetKVStore(o.RulesKVStore) + SetKVStore(o.RulesKVStore). + SetNamespaceTag([]byte(namespaceTag)) // NB(r): If rules are being explicitly set in config then we are // going to use an in memory KV store for rules and explicitly set them up.