Skip to content

Commit

Permalink
feat: Add support of multiple kind of cache for relabeling components
Browse files Browse the repository at this point in the history
  • Loading branch information
pbailhache committed Sep 16, 2024
1 parent 34d850e commit ba30cd8
Show file tree
Hide file tree
Showing 10 changed files with 2,830 additions and 87 deletions.
17 changes: 11 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ require (
github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d
github.com/grafana/ckit v0.0.0-20240913130805-0ee98bafad88
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb
github.com/grafana/dskit v0.0.0-20240905221822-931a021fb06b
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d
github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a
github.com/grafana/loki/pkg/push v0.0.0-20240514112848-a1b1eeb09583 // k201 branch
github.com/grafana/loki/v3 v3.0.0-20240513110952-8622293f23b1 // k201 branch
github.com/grafana/pyroscope-go/godeltaprof v0.1.7
github.com/grafana/pyroscope-go/godeltaprof v0.1.8
github.com/grafana/pyroscope/api v0.4.0
github.com/grafana/pyroscope/ebpf v0.4.7
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
Expand Down Expand Up @@ -534,7 +534,7 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/gosnmp/gosnmp v1.37.0 // indirect
github.com/grafana/go-offsets-tracker v0.1.7 // indirect
github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 // indirect
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect
github.com/grafana/jfr-parser v0.8.0 // indirect
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548
github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 // indirect
Expand Down Expand Up @@ -727,7 +727,7 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/stormcat24/protodep v0.1.8 // indirect
github.com/stretchr/objx v0.5.2 // indirect
Expand Down Expand Up @@ -792,7 +792,7 @@ require (
golang.org/x/mod v0.19.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
gonum.org/v1/gonum v0.15.1 // indirect
google.golang.org/genproto v0.0.0-20240708141625-4ad9e859172b // indirect
Expand All @@ -813,7 +813,12 @@ require (

)

require github.com/containerd/platforms v0.2.1 // indirect
require github.com/m3db/prometheus_remote_client_golang v0.4.4

require (
github.com/containerd/platforms v0.2.1 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
)

// NOTE: replace directives below must always be *temporary*.
//
Expand Down
2,210 changes: 2,201 additions & 9 deletions go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion internal/component/faro/receiver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (s *server) Run(ctx context.Context) error {
})

mw := middleware.Instrument{
RouteMatcher: r,
Duration: s.metrics.requestDuration,
RequestBodySize: s.metrics.rxMessageSize,
ResponseBodySize: s.metrics.txMessageSize,
Expand Down
111 changes: 73 additions & 38 deletions internal/component/prometheus/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/cache"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
lru "github.com/hashicorp/golang-lru/v2"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -20,11 +20,19 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"

"go.uber.org/atomic"
)

const name = "prometheus.relabel"

// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
// to be recalculated again.
type labelAndID struct {
Labels labels.Labels `json:"labels"`
ID uint64 `json:"id"`
}

func init() {
component.Register(component.Registration{
Name: name,
Expand All @@ -47,22 +55,40 @@ type Arguments struct {
// The relabelling rules to apply to each metric before it's forwarded.
MetricRelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"`

// Cache size to use for LRU cache.
CacheSize int `alloy:"max_cache_size,attr,optional"`
// DEPRECATED Use type = inmemory and cache_size field.
InMemoryCacheSizeDeprecated int `alloy:"max_cache_size,attr,optional"`

// Cache backend configuration.
CacheConfig cache.CacheConfig `alloy:"cache,block,optional"`
}

// SetToDefault implements syntax.Defaulter.
func (arg *Arguments) SetToDefault() {
*arg = Arguments{
CacheSize: 100_000,
CacheConfig: cache.CacheConfig{
Backend: cache.InMemory,
InMemory: cache.InMemoryCacheConfig{
CacheSize: 100_100,
},
},
}
}

// Validate implements syntax.Validator.
func (arg *Arguments) Validate() error {
if arg.CacheSize <= 0 {
return fmt.Errorf("max_cache_size must be greater than 0 and is %d", arg.CacheSize)
switch arg.CacheConfig.Backend {
case cache.InMemory:
if arg.CacheConfig.InMemory.CacheSize <= 0 {
return fmt.Errorf("cache_size must be greater than 0 and is %d", arg.CacheConfig.InMemory.CacheSize)
}
case cache.Memcached:
// todo
case cache.Redis:
// todo
default:
return fmt.Errorf("unknown cache backend, should be one of %s", cache.SupportedCaches)
}

return nil
}

Expand Down Expand Up @@ -91,7 +117,7 @@ type Component struct {
debugDataPublisher livedebugging.DebugDataPublisher

cacheMut sync.RWMutex
cache *lru.Cache[uint64, *labelAndID]
cache cache.Cache[labelAndID]
}

var (
Expand All @@ -101,7 +127,13 @@ var (

// New creates a new prometheus.relabel component.
func New(o component.Options, args Arguments) (*Component, error) {
cache, err := lru.New[uint64, *labelAndID](args.CacheSize)
// to be removed after deprecation of max cache size
if args.CacheConfig.Backend == "" && args.InMemoryCacheSizeDeprecated != 0 {
args.CacheConfig.Backend = cache.InMemory
args.CacheConfig.InMemory.CacheSize = args.InMemoryCacheSizeDeprecated
}

relabelCache, err := cache.NewCache[labelAndID](args.CacheConfig)
if err != nil {
return nil, err
}
Expand All @@ -117,7 +149,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
}
c := &Component{
opts: o,
cache: cache,
cache: relabelCache,
ls: data.(labelstore.LabelStore),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
Expand Down Expand Up @@ -153,6 +185,8 @@ func New(o component.Options, args Arguments) (*Component, error) {
}
}

ctx := context.Background()

c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, c.ls)
c.receiver = prometheus.NewInterceptor(
c.fanout,
Expand All @@ -162,7 +196,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.relabel(v, l)
newLbl := c.relabel(ctx, v, l)
if newLbl.IsEmpty() {
return 0, nil
}
Expand All @@ -174,7 +208,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.relabel(0, l)
newLbl := c.relabel(ctx, 0, l)
if newLbl.IsEmpty() {
return 0, nil
}
Expand All @@ -185,7 +219,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.relabel(0, l)
newLbl := c.relabel(ctx, 0, l)
if newLbl.IsEmpty() {
return 0, nil
}
Expand All @@ -196,7 +230,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
return 0, fmt.Errorf("%s has exited", o.ID)
}

newLbl := c.relabel(0, l)
newLbl := c.relabel(ctx, 0, l)
if newLbl.IsEmpty() {
return 0, nil
}
Expand Down Expand Up @@ -230,7 +264,13 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.clearCache(newArgs.CacheSize)

// todo maybe recreate whole relabelCache here in case of change for redis/memcached client

// in case of in_memory cache we need to clean the cache
if newArgs.CacheConfig.Backend == cache.InMemory {
c.clearCache(newArgs.CacheConfig.InMemory.CacheSize)
}
c.mrc = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs)
c.fanout.UpdateChildren(newArgs.ForwardTo)

Expand All @@ -239,7 +279,7 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
func (c *Component) relabel(ctx context.Context, val float64, lbls labels.Labels) labels.Labels {
c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -248,12 +288,12 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
relabelled labels.Labels
keep bool
)
newLbls, found := c.getFromCache(globalRef)
newLbls, found := c.getFromCache(ctx, globalRef)
if found {
c.cacheHits.Inc()
// If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels
if newLbls != nil {
relabelled = newLbls.labels
relabelled = newLbls.Labels
}
} else {
// Relabel against a copy of the labels to prevent modifying the original
Expand All @@ -267,11 +307,11 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
// TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple.
// In the future the global ref cache may have some hooks to allow notification of when caches should be evicted.
if value.IsStaleNaN(val) {
c.deleteFromCache(globalRef)
c.deleteFromCache(ctx, globalRef)
}
// Set the cache size to the cache.len
// TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance.
c.cacheSize.Set(float64(c.cache.Len()))
// c.cacheSize.Set(float64(c.cache.GetCacheSize()))

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
Expand All @@ -281,48 +321,43 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
return relabelled
}

func (c *Component) getFromCache(id uint64) (*labelAndID, bool) {
func (c *Component) getFromCache(ctx context.Context, id uint64) (*labelAndID, bool) {
c.cacheMut.RLock()
defer c.cacheMut.RUnlock()

fm, found := c.cache.Get(id)
return fm, found
value, err := c.cache.Get(fmt.Sprintf("%d", id))

return value, err == nil
}

func (c *Component) deleteFromCache(id uint64) {
func (c *Component) deleteFromCache(ctx context.Context, id uint64) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()
c.cacheDeletes.Inc()
c.cache.Remove(id)

c.cache.Remove(fmt.Sprintf("%d", id))
}

func (c *Component) clearCache(cacheSize int) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()
cache, _ := lru.New[uint64, *labelAndID](cacheSize)
c.cache = cache
_ = c.cache.Clear(cacheSize)
}

func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()

if !keep {
c.cache.Add(originalID, nil)
_ = c.cache.Set(fmt.Sprintf("%d", originalID), nil, 0)
return
}
newGlobal := c.ls.GetOrAddGlobalRefID(lbls)
c.cache.Add(originalID, &labelAndID{
labels: lbls,
id: newGlobal,
})

_ = c.cache.Set(fmt.Sprintf("%d", originalID), &labelAndID{
Labels: lbls,
ID: newGlobal,
}, 0)
}

func (c *Component) LiveDebugging(_ int) {}

// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
// to be recalculated again.
type labelAndID struct {
labels labels.Labels
id uint64
}
Loading

0 comments on commit ba30cd8

Please sign in to comment.