Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local instance's tags to forwarded metrics #188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *Server) FlushLocal(ctx context.Context) {

// we cannot do this until we're done using tempMetrics within this function,
// since not everything in tempMetrics is safe for sharing
go s.flushForward(span.Attach(ctx), tempMetrics)
go s.flushForward(span.Attach(ctx), s.Tags, tempMetrics)

go func() {
for _, p := range s.getPlugins() {
Expand Down Expand Up @@ -345,7 +345,7 @@ func (s *Server) flushPart(ctx context.Context, metricSlice []samplers.DDMetric,
}, "flush", true)
}

func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
func (s *Server) flushForward(ctx context.Context, tags []string, wms []WorkerMetrics) {
span, _ := trace.StartSpanFromContext(ctx, "")
defer span.Finish()
jmLength := 0
Expand All @@ -368,6 +368,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
}).Error("Could not export metric")
continue
}
jm.Tags = append(jm.Tags, tags...)
jsonMetrics = append(jsonMetrics, jm)
}
for _, histo := range wm.histograms {
Expand All @@ -380,6 +381,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
}).Error("Could not export metric")
continue
}
jm.Tags = append(jm.Tags, tags...)
jsonMetrics = append(jsonMetrics, jm)
}
for _, set := range wm.sets {
Expand All @@ -392,6 +394,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
}).Error("Could not export metric")
continue
}
jm.Tags = append(jm.Tags, tags...)
jsonMetrics = append(jsonMetrics, jm)
}
for _, timer := range wm.timers {
Expand All @@ -406,6 +409,7 @@ func (s *Server) flushForward(ctx context.Context, wms []WorkerMetrics) {
}
// the exporter doesn't know that these two are "different"
jm.Type = "timer"
jm.Tags = append(jm.Tags, tags...)
jsonMetrics = append(jsonMetrics, jm)
}
}
Expand Down
77 changes: 64 additions & 13 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ type fixture struct {
api *httptest.Server
server Server
ddmetrics chan DDMetricsRequest
importmetrics chan []samplers.JSONMetric
interval time.Duration
flushMaxPerBody int
}
Expand All @@ -196,41 +197,65 @@ func newFixture(t *testing.T, config Config) *fixture {
assert.NoError(t, err)

// Set up a remote server (the API that we're sending the data to)
// (e.g. Datadog)
f := &fixture{nil, Server{}, make(chan DDMetricsRequest, 10), interval, config.FlushMaxPerBody}
// (e.g. Datadog) and our own /import
f := &fixture{nil, Server{}, make(chan DDMetricsRequest, 10), make(chan []samplers.JSONMetric, 10), interval, config.FlushMaxPerBody}
f.api = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
zr, err := zlib.NewReader(r.Body)
if err != nil {
t.Fatal(err)
}

var ddmetrics DDMetricsRequest
if r.URL.Path == "/api/v1/series" {
zr, err := zlib.NewReader(r.Body)
if err != nil {
t.Fatal(err)
}

err = json.NewDecoder(zr).Decode(&ddmetrics)
if err != nil {
t.Fatal(err)
}
var ddmetrics DDMetricsRequest

f.ddmetrics <- ddmetrics
err = json.NewDecoder(zr).Decode(&ddmetrics)
if err != nil {
t.Fatal(err)
}

f.ddmetrics <- ddmetrics
} else if r.URL.Path == "/import" {
rz, err := zlib.NewReader(r.Body)
if err != nil {
t.Fatal(err)
}

var importmetrics []samplers.JSONMetric
err = json.NewDecoder(rz).Decode(&importmetrics)
if err != nil {
t.Fatal(err)
}

f.importmetrics <- importmetrics
} else {
t.Fatalf("Got unexpected HTTP request to %s", r.URL.Path)
}
w.WriteHeader(http.StatusAccepted)
}))

config.APIHostname = f.api.URL
// If the config is aiming to do it's own import, let's replace the config
// with that of our fixture.
if config.ForwardAddress == "http://localhost" {
config.ForwardAddress = f.api.URL
}
config.NumWorkers = 1
f.server = setupVeneurServer(t, config, nil)
return f
}

func (f *fixture) Close() {
// make Close safe to call multiple times
if f.ddmetrics == nil {
if f.ddmetrics == nil && f.importmetrics == nil {
return
}

f.api.Close()
f.server.Shutdown()
close(f.ddmetrics)
f.ddmetrics = nil
f.importmetrics = nil
}

// TestLocalServerUnaggregatedMetrics tests the behavior of
Expand Down Expand Up @@ -288,6 +313,32 @@ func TestGlobalServerFlush(t *testing.T) {
assertMetrics(t, ddmetrics, expectedMetrics)
}

func TestLocalAddsImportTags(t *testing.T) {
metricValues, _ := generateMetrics()
config := localConfig()
config.Tags = []string{"foo:bar"}
f := newFixture(t, config)
defer f.Close()

for _, value := range metricValues {
f.server.Workers[0].ProcessMetric(&samplers.UDPMetric{
MetricKey: samplers.MetricKey{
Name: "a.b.c",
Type: "histogram",
},
Value: value,
Digest: 12345,
SampleRate: 1.0,
})
}

f.server.Flush()

importmetrics := <-f.importmetrics
assert.Equal(t, 1, len(importmetrics), 1, "import received the histogram")
assert.Equal(t, "foo:bar", importmetrics[0].Tags[0], "imported histogram has sender's tags")
}

func TestLocalServerMixedMetrics(t *testing.T) {
// The exact gob stream that we will receive might differ, so we can't
// test against the bytestream directly. But the two streams should unmarshal
Expand Down