Skip to content

Commit

Permalink
[coordinator] Set default namespace tag to avoid colliding with commo…
Browse files Browse the repository at this point in the history
…nly used "namespace" label (#2878)

* [coordinator] Set default namespace tag to avoid colliding with common "namespace" default value

* Use defined constant

* Add downsampler test case to demonstrate override namespace tag

Co-authored-by: Wesley Kim <[email protected]>
  • Loading branch information
robskillington and wesleyk committed Nov 12, 2020
1 parent 3746bf0 commit ba6a303
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 2 deletions.
83 changes: 83 additions & 0 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,87 @@ func TestDownsamplerAggregationWithRemoteAggregatorClient(t *testing.T) {
testDownsamplerRemoteAggregation(t, testDownsampler)
}

func TestDownsamplerWithOverrideNamespace(t *testing.T) {
overrideNamespaceTag := "override_namespace_tag"

gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value",
// Set namespace tags on ingested metrics.
// The test demonstrates that overrideNamespaceTag is respected, meaning setting
// values on defaultNamespaceTag won't affect aggregation.
defaultNamespaceTag: "namespace_ignored",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 42},
{value: 64, offset: 5 * time.Second},
},
}
res := 5 * time.Second
ret := 30 * 24 * time.Hour
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
rulesConfig: &RulesConfiguration{
RollupRules: []RollupRuleConfiguration{
{
Filter: fmt.Sprintf(
"%s:http_requests app:* status_code:* endpoint:*",
nameTag),
Transforms: []TransformConfiguration{
{
Transform: &TransformOperationConfiguration{
Type: transformation.PerSecond,
},
},
{
Rollup: &RollupOperationConfiguration{
MetricName: "http_requests_by_status_code",
GroupBy: []string{"app", "status_code", "endpoint"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: res,
Retention: ret,
},
},
},
},
},
matcherConfig: MatcherConfiguration{NamespaceTag: overrideNamespaceTag},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
{
tags: map[string]string{
nameTag: "http_requests_by_status_code",
string(rollupTagName): string(rollupTagValue),
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 4.4}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Retention: ret,
},
},
},
},
})

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas {
ds, ok := testDownsampler.downsampler.(*downsampler)
require.True(t, ok)
Expand Down Expand Up @@ -1751,6 +1832,7 @@ type testDownsamplerOptions struct {
sampleAppenderOpts *SampleAppenderOptions
remoteClientMock *client.MockClient
rulesConfig *RulesConfiguration
matcherConfig MatcherConfiguration

// Test ingest and expectations overrides
ingest *testDownsamplerOptionsIngest
Expand Down Expand Up @@ -1821,6 +1903,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
if opts.rulesConfig != nil {
cfg.Rules = opts.rulesConfig
}
cfg.Matcher = opts.matcherConfig

instance, err := cfg.NewDownsampler(DownsamplerOptions{
Storage: storage,
Expand Down
13 changes: 11 additions & 2 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ const (
)

var (
numShards = runtime.NumCPU()
numShards = runtime.NumCPU()
defaultNamespaceTag = metric.M3MetricsPrefixString + "_namespace__"

errNoStorage = errors.New("downsampling enabled with storage not set")
errNoClusterClient = errors.New("downsampling enabled with cluster client not set")
Expand Down Expand Up @@ -267,6 +268,9 @@ type Configuration struct {
type MatcherConfiguration struct {
// Cache if non-zero will set the capacity of the rules matching cache.
Cache MatcherCacheConfiguration `yaml:"cache"`
// NamespaceTag defines the namespace tag to use to select rules
// namespace to evaluate against. Default is "__m3_namespace__".
NamespaceTag string `yaml:"namespaceTag"`
}

// MatcherCacheConfiguration is the configuration for the rule matcher cache.
Expand Down Expand Up @@ -647,13 +651,17 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
logger = instrumentOpts.Logger()
openTimeout = defaultOpenTimeout
m3PrefixFilter = false
namespaceTag = defaultNamespaceTag
)
if o.StorageFlushConcurrency > 0 {
storageFlushConcurrency = o.StorageFlushConcurrency
}
if o.OpenTimeout > 0 {
openTimeout = o.OpenTimeout
}
if cfg.Matcher.NamespaceTag != "" {
namespaceTag = cfg.Matcher.NamespaceTag
}

pools := o.newAggregatorPools()
ruleSetOpts := o.newAggregatorRulesOptions(pools)
Expand All @@ -662,7 +670,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
SetClockOptions(clockOpts).
SetInstrumentOptions(instrumentOpts).
SetRuleSetOptions(ruleSetOpts).
SetKVStore(o.RulesKVStore)
SetKVStore(o.RulesKVStore).
SetNamespaceTag([]byte(namespaceTag))

// NB(r): If rules are being explicitly set in config then we are
// going to use an in memory KV store for rules and explicitly set them up.
Expand Down

0 comments on commit ba6a303

Please sign in to comment.