Skip to content

Commit

Permalink
Merge pull request #133 from glightfoot/graphite-tags
Browse files Browse the repository at this point in the history
simple graphite tags support
  • Loading branch information
Matthias Rampke authored Jul 21, 2020
2 parents e101883 + 2487dcb commit 70aca33
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 93 deletions.
7 changes: 5 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
run:
modules-download-mode: vendor

# Run only staticcheck for now. Additional linters will be enabled one-by-one.
skip-dirs:
- vendor
- e2e
# Run only staticcheck and goimports for now. Additional linters will be enabled one-by-one.
linters:
enable:
- staticcheck
- goimports
disable-all: true
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ Metrics will be available on [http://localhost:9108/metrics](http://localhost:91
To avoid using unbounded memory, metrics will be garbage collected five minutes after
they are last pushed to. This is configurable with the `--graphite.sample-expiry` flag.

## Graphite Tags
The graphite_exporter accepts metrics in the [tagged carbon format](https://graphite.readthedocs.io/en/latest/tags.html). Labels specified in the mapping configuration take precedence over tags in the metric. In the case where there are valid and invalid tags supplied in one metric, the invalid tags will be dropped and the `graphite_tag_parse_failures` counter will be incremented. The exporter accepts inconsistent label sets, but this may cause issues querying the data in Prometheus.

## Metric Mapping and Configuration

**Please note there has been a breaking change in configuration after version 0.2.0. The YAML style config from [statsd_exporter](https://github.com/prometheus/statsd_exporter) is now used. See conversion instructions below**
Expand Down
78 changes: 78 additions & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,81 @@ rspamd.spam_count 3 NOW`
}
}
}

// Test to ensure that inconsistent label sets are accepted and exported correctly
func TestInconsistentLabelsE2E(t *testing.T) {
cwd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}

webAddr, graphiteAddr := fmt.Sprintf("127.0.0.1:%d", 9108), fmt.Sprintf("127.0.0.1:%d", 9109)
exporter := exec.Command(
filepath.Join(cwd, "..", "graphite_exporter"),
"--web.listen-address", webAddr,
"--graphite.listen-address", graphiteAddr,
"--graphite.mapping-config", filepath.Join(cwd, "fixtures", "mapping.yml"),
)
err = exporter.Start()
if err != nil {
t.Fatalf("execution error: %v", err)
}
defer exporter.Process.Kill()

for i := 0; i < 20; i++ {
if i > 0 {
time.Sleep(1 * time.Second)
}
resp, err := http.Get("http://" + webAddr)
if err != nil {
continue
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
break
}
}

now := time.Now()

input := `rspamd.actions;action=add_header 2 NOW
rspamd.actions;action=greylist 0 NOW
rspamd.actions;action2=add_header 2 NOW
rspamd.actions;action2=greylist 0 NOW
`
input = strings.NewReplacer("NOW", fmt.Sprintf("%d", now.Unix())).Replace(input)

output := []string{
"rspamd_actions{action=\"add_header\"} 2",
"rspamd_actions{action=\"greylist\"} 0",
"rspamd_actions{action2=\"add_header\"} 2",
"rspamd_actions{action2=\"greylist\"} 0",
}

conn, err := net.Dial("tcp", graphiteAddr)
if err != nil {
t.Fatalf("connection error: %v", err)
}
defer conn.Close()
_, err = conn.Write([]byte(input))
if err != nil {
t.Fatalf("write error: %v", err)
}

time.Sleep(5 * time.Second)

resp, err := http.Get("http://" + path.Join(webAddr, "metrics"))
if err != nil {
t.Fatalf("get error: %v", err)
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("read error: %v", err)
}
for _, s := range output {
if !strings.Contains(string(b), s) {
t.Fatalf("Expected %q in %q – input: %q – time: %s", s, string(b), input, now)
}
}
}
63 changes: 53 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ var (
Help: "How long in seconds a metric sample is valid for.",
},
)
tagParseFailures = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "graphite_tag_parse_failures",
Help: "Total count of samples with invalid tags",
})
invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]")
)

type graphiteSample struct {
OriginalName string
Name string
Labels map[string]string
Labels prometheus.Labels
Help string
Value float64
Type prometheus.ValueType
Expand Down Expand Up @@ -126,26 +131,65 @@ func (c *graphiteCollector) processLines() {
}
}

func parseMetricNameAndTags(name string) (string, prometheus.Labels, error) {
var err error

labels := make(prometheus.Labels)

parts := strings.Split(name, ";")
parsedName := parts[0]

tags := parts[1:]
for _, tag := range tags {
kv := strings.SplitN(tag, "=", 2)
if len(kv) != 2 {
// don't add this tag, continue processing tags but return an error
tagParseFailures.Inc()
err = fmt.Errorf("error parsing tag %s", tag)
continue
}

k := kv[0]
v := kv[1]
labels[k] = v
}

return parsedName, labels, err
}

func (c *graphiteCollector) processLine(line string) {
line = strings.TrimSpace(line)
level.Debug(c.logger).Log("msg", "Incoming line", "line", line)

parts := strings.Split(line, " ")
if len(parts) != 3 {
level.Info(c.logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line)
return
}

originalName := parts[0]
var name string
mapping, labels, present := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge)

if (present && mapping.Action == mapper.ActionTypeDrop) || (!present && c.strictMatch) {
parsedName, labels, err := parseMetricNameAndTags(originalName)
if err != nil {
level.Debug(c.logger).Log("msg", "Invalid tags", "line", line, "err", err.Error())
}

mapping, mappingLabels, mappingPresent := c.mapper.GetMapping(parsedName, mapper.MetricTypeGauge)

// add mapping labels to parsed labels
for k, v := range mappingLabels {
labels[k] = v
}

if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && c.strictMatch) {
return
}

if present {
var name string
if mappingPresent {
name = invalidMetricChars.ReplaceAllString(mapping.Name, "_")
} else {
name = invalidMetricChars.ReplaceAllString(originalName, "_")
name = invalidMetricChars.ReplaceAllString(parsedName, "_")
}

value, err := strconv.ParseFloat(parts[1], 64)
Expand Down Expand Up @@ -222,10 +266,8 @@ func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) {
}
}

// Describe implements prometheus.Collector.
func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- lastProcessed.Desc()
}
// Describe implements prometheus.Collector but does not yield a description, allowing inconsistent label sets
func (c graphiteCollector) Describe(_ chan<- *prometheus.Desc) {}

func init() {
prometheus.MustRegister(version.NewCollector("graphite_exporter"))
Expand Down Expand Up @@ -257,6 +299,7 @@ func main() {
logger := promlog.New(promlogConfig)

prometheus.MustRegister(sampleExpiryMetric)
prometheus.MustRegister(tagParseFailures)
sampleExpiryMetric.Set(sampleExpiry.Seconds())

level.Info(logger).Log("msg", "Starting graphite_exporter", "version_info", version.Info())
Expand Down
65 changes: 44 additions & 21 deletions main_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import (
"github.com/go-kit/kit/log"
)

func benchmarkProcessLine(times int, b *testing.B) {
logger := log.NewNopLogger()
c := newGraphiteCollector(logger)
var (
logger = log.NewNopLogger()
c = newGraphiteCollector(logger)

now := time.Now()
now = time.Now()

rawInput := `rspamd.actions.add_header 2 NOW
rspamd.actions.greylist 0 NOW
rspamd.actions.no_action 24 NOW
rspamd.actions.reject 1 NOW
rspamd.actions.rewrite_subject 0 NOW
rawInput = `rspamd.actions.add_header 2 NOW
rspamd.actions;action=greylist 0 NOW
rspamd.actions;action=no_action 24 NOW
rspamd.actions;action=reject 1 NOW
rspamd.actions;action=rewrite_subject 0 NOW
rspamd.actions.soft_reject 0 NOW
rspamd.bytes_allocated 4165268944 NOW
rspamd.chunks_allocated 4294966730 NOW
Expand All @@ -47,32 +47,55 @@ rspamd.pools_freed 171 NOW
rspamd.scanned 27 NOW
rspamd.shared_chunks_allocated 34 NOW
rspamd.spam_count 3 NOW`
rawInput = strings.NewReplacer("NOW", fmt.Sprintf("%d", now.Unix())).Replace(rawInput)
input := strings.Split(rawInput, "\n")
rawInput2 = strings.NewReplacer("NOW", fmt.Sprintf("%d", now.Unix())).Replace(rawInput)
input = strings.Split(rawInput2, "\n")

// The name should be the same length to ensure the only difference is the tag parsing
untaggedLine = fmt.Sprintf("rspamd.actions 2 %d", now.Unix())
taggedLine = fmt.Sprintf("rspamd.actions;action=add_header;foo=bar 2 %d", now.Unix())
)

func init() {
c.mapper = &mockMapper{
name: "not_used",
present: false,
}
}

// reset benchmark timer to not measure startup costs
b.ResetTimer()

func benchmarkProcessLines(times int, b *testing.B, lines []string) {
for n := 0; n < b.N; n++ {
for i := 0; i < times; i++ {
for _, l := range input {
for _, l := range lines {
c.processLine(l)
}
}
}
}

func BenchmarkProcessLine1(b *testing.B) {
benchmarkProcessLine(1, b)
func benchmarkProcessLine(b *testing.B, line string) {
// always report allocations since this is a hot path
b.ReportAllocs()

for n := 0; n < b.N; n++ {
c.processLine(line)
}
}

// Mixed lines benchmarks
func BenchmarkProcessLineMixed1(b *testing.B) {
benchmarkProcessLines(1, b, input)
}
func BenchmarkProcessLine5(b *testing.B) {
benchmarkProcessLine(5, b)
func BenchmarkProcessLineMixed5(b *testing.B) {
benchmarkProcessLines(5, b, input)
}
func BenchmarkProcessLineMixed50(b *testing.B) {
benchmarkProcessLines(50, b, input)
}

// Individual line benchmarks
func BenchmarkProcessLineUntagged(b *testing.B) {
benchmarkProcessLine(b, untaggedLine)
}
func BenchmarkProcessLine50(b *testing.B) {
benchmarkProcessLine(50, b)
func BenchmarkProcessLineTagged(b *testing.B) {
benchmarkProcessLine(b, taggedLine)
}
Loading

0 comments on commit 70aca33

Please sign in to comment.