Skip to content

Commit

Permalink
fix: expire flows individually, rather than all at once
Browse files Browse the repository at this point in the history
This change keeps the same metric for a given set of labels until it is unused for `flush_interval` settings. This means that an ongoign flow will keep increasing the byte count, rather than being zeroed out again each time `flush_interval` deletes all metrics.

This may make it easier to get better graphs out of ongoing usage.

Signed-off-by: Adam Eijdenberg <[email protected]>
  • Loading branch information
aeijdenberg committed Apr 21, 2024
1 parent d5c0904 commit b114094
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
34 changes: 25 additions & 9 deletions pkg/collector/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package collector

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/rkosegi/ipfix-collector/pkg/public"
"net"
"strconv"
"strings"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/rkosegi/ipfix-collector/pkg/public"
)

func (m *metricEntry) init(prefix string, spec *public.MetricSpec, flushInterval int) {
Expand All @@ -38,20 +41,33 @@ func (m *metricEntry) init(prefix string, spec *public.MetricSpec, flushInterval
Name: spec.Name,
Help: spec.Description,
}, labelNames)
m.cleaner = time.NewTicker(time.Second * time.Duration(flushInterval))
go func() {
for range m.cleaner.C {
m.counter.Reset()
}
}()
m.metrics = ttlcache.New(
ttlcache.WithTTL[string, prometheus.Counter](time.Duration(flushInterval) * time.Second),
)
go m.metrics.Start()
}

func (m *metricEntry) Collect(ch chan<- prometheus.Metric) {
m.metrics.Range(func(item *ttlcache.Item[string, prometheus.Counter]) bool {
ch <- item.Value()
return true
})
}

func (m *metricEntry) Describe(ch chan<- *prometheus.Desc) {
m.counter.Describe(ch)
}

func (m *metricEntry) apply(flow *public.Flow) {
labelValues := make([]string, 0)
for _, lp := range m.labels {
labelValues = append(labelValues, lp.apply(flow))
}
m.counter.WithLabelValues(labelValues...).Add(float64(flow.Raw("bytes").(uint64)))
m.metrics.Get(strings.Join(labelValues, "|"), ttlcache.WithLoader(ttlcache.LoaderFunc[string, prometheus.Counter](
func(c *ttlcache.Cache[string, prometheus.Counter], key string) *ttlcache.Item[string, prometheus.Counter] {
return c.Set(key, m.counter.WithLabelValues(labelValues...), ttlcache.DefaultTTL)
},
))).Value().Add(float64(flow.Raw("bytes").(uint64)))
}

func (lp *labelProcessor) init(label public.MetricLabel) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/collector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package collector

import (
"fmt"
"github.com/cloudflare/goflow/v3/pb"

flowprotob "github.com/cloudflare/goflow/v3/pb"
"github.com/cloudflare/goflow/v3/utils"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (c *col) Describe(descs chan<- *prometheus.Desc) {
c.totalFlowsCounter.Describe(descs)
c.scrapingSum.Describe(descs)
for _, m := range c.metrics {
m.counter.Describe(descs)
m.Describe(descs)
}
}

Expand All @@ -62,7 +63,7 @@ func (c *col) Collect(ch chan<- prometheus.Metric) {
c.droppedFlowsCounter.Collect(ch)
c.totalFlowsCounter.Collect(ch)
for _, m := range c.metrics {
m.counter.Collect(ch)
m.Collect(ch)
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/collector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
package collector

import (
"os"

"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/rkosegi/ipfix-collector/pkg/public"
"gopkg.in/yaml.v3"
"os"
"time"
)

type metricEntry struct {
counter *prometheus.CounterVec
labels []*labelProcessor
cleaner *time.Ticker
metrics *ttlcache.Cache[string, prometheus.Counter]
}

type FilterFn func(flow *public.Flow) bool
Expand Down

0 comments on commit b114094

Please sign in to comment.