Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support of multiple kind of cache for relabeling components #1692

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo=
github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
Expand Down
1 change: 0 additions & 1 deletion internal/component/faro/receiver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (s *server) Run(ctx context.Context) error {
})

mw := middleware.Instrument{
RouteMatcher: r,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the impact of this change (This is related to the update of dskit : grafana/dskit@27d7d41)

Duration: s.metrics.requestDuration,
RequestBodySize: s.metrics.rxMessageSize,
ResponseBodySize: s.metrics.txMessageSize,
Expand Down
87 changes: 58 additions & 29 deletions internal/component/prometheus/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/service/cache"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
lru "github.com/hashicorp/golang-lru/v2"
prometheus_client "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -20,11 +20,19 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"

"go.uber.org/atomic"
)

const name = "prometheus.relabel"

// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
// to be recalculated again.
type labelAndID struct {
Labels labels.Labels `json:"labels"`
ID uint64 `json:"id"`
}

func init() {
component.Register(component.Registration{
Name: name,
Expand All @@ -47,22 +55,38 @@ type Arguments struct {
// The relabelling rules to apply to each metric before it's forwarded.
MetricRelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"`

// Cache size to use for LRU cache.
CacheSize int `alloy:"max_cache_size,attr,optional"`
// DEPRECATED Use type = inmemory and cache_size field.
InMemoryCacheSizeDeprecated int `alloy:"max_cache_size,attr,optional"`

// Cache backend configuration.
CacheConfig cache.CacheConfig `alloy:"cache,block,optional"`
}

// SetToDefault implements syntax.Defaulter.
func (arg *Arguments) SetToDefault() {
*arg = Arguments{
CacheSize: 100_000,
CacheConfig: cache.CacheConfig{
Backend: cache.InMemory,
InMemory: cache.InMemoryCacheConfig{
CacheSize: 100_000,
},
},
}
}

// Validate implements syntax.Validator.
func (arg *Arguments) Validate() error {
if arg.CacheSize <= 0 {
return fmt.Errorf("max_cache_size must be greater than 0 and is %d", arg.CacheSize)
switch arg.CacheConfig.Backend {
case cache.InMemory:
if arg.CacheConfig.InMemory.CacheSize <= 0 {
return fmt.Errorf("cache_size must be greater than 0 and is %d", arg.CacheConfig.InMemory.CacheSize)
}
case cache.Memcached:
case cache.Redis:
default:
return fmt.Errorf("unknown cache backend, should be one of %s", cache.SupportedCaches)
}

return nil
}

Expand Down Expand Up @@ -91,7 +115,7 @@ type Component struct {
debugDataPublisher livedebugging.DebugDataPublisher

cacheMut sync.RWMutex
cache *lru.Cache[uint64, *labelAndID]
cache cache.Cache[labelAndID]
}

var (
Expand All @@ -101,7 +125,13 @@ var (

// New creates a new prometheus.relabel component.
func New(o component.Options, args Arguments) (*Component, error) {
cache, err := lru.New[uint64, *labelAndID](args.CacheSize)
// to be removed after deprecation of max cache size
if args.CacheConfig.Backend == "" && args.InMemoryCacheSizeDeprecated != 0 {
args.CacheConfig.Backend = cache.InMemory
args.CacheConfig.InMemory.CacheSize = args.InMemoryCacheSizeDeprecated
}

relabelCache, err := cache.NewCache[labelAndID](args.CacheConfig)
if err != nil {
return nil, err
}
Expand All @@ -117,7 +147,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
}
c := &Component{
opts: o,
cache: cache,
cache: relabelCache,
ls: data.(labelstore.LabelStore),
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher),
}
Expand Down Expand Up @@ -230,7 +260,11 @@ func (c *Component) Update(args component.Arguments) error {
defer c.mut.Unlock()

newArgs := args.(Arguments)
c.clearCache(newArgs.CacheSize)

// in case of in_memory cache we need to clean the cache
if newArgs.CacheConfig.Backend == cache.InMemory {
c.clearCache(newArgs.CacheConfig.InMemory.CacheSize)
}
c.mrc = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs)
c.fanout.UpdateChildren(newArgs.ForwardTo)

Expand All @@ -253,7 +287,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
c.cacheHits.Inc()
// If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels
if newLbls != nil {
relabelled = newLbls.labels
relabelled = newLbls.Labels
}
} else {
// Relabel against a copy of the labels to prevent modifying the original
Expand All @@ -271,7 +305,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels {
}
// Set the cache size to the cache.len
// TODO(@mattdurham): Instead of setting this each time could collect on demand for better performance.
c.cacheSize.Set(float64(c.cache.Len()))
// c.cacheSize.Set(float64(c.cache.GetCacheSize()))

componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
Expand All @@ -285,44 +319,39 @@ func (c *Component) getFromCache(id uint64) (*labelAndID, bool) {
c.cacheMut.RLock()
defer c.cacheMut.RUnlock()

fm, found := c.cache.Get(id)
return fm, found
value, err := c.cache.Get(fmt.Sprintf("%d", id))

return value, err == nil
}

func (c *Component) deleteFromCache(id uint64) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()
c.cacheDeletes.Inc()
c.cache.Remove(id)

c.cache.Remove(fmt.Sprintf("%d", id))
}

func (c *Component) clearCache(cacheSize int) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()
cache, _ := lru.New[uint64, *labelAndID](cacheSize)
c.cache = cache
_ = c.cache.Clear(cacheSize)
}

func (c *Component) addToCache(originalID uint64, lbls labels.Labels, keep bool) {
c.cacheMut.Lock()
defer c.cacheMut.Unlock()

if !keep {
c.cache.Add(originalID, nil)
_ = c.cache.Set(fmt.Sprintf("%d", originalID), nil, 0)
return
}
newGlobal := c.ls.GetOrAddGlobalRefID(lbls)
c.cache.Add(originalID, &labelAndID{
labels: lbls,
id: newGlobal,
})

_ = c.cache.Set(fmt.Sprintf("%d", originalID), &labelAndID{
Labels: lbls,
ID: newGlobal,
}, 0)
}

func (c *Component) LiveDebugging(_ int) {}

// labelAndID stores both the globalrefid for the label and the id itself. We store the id so that it doesn't have
// to be recalculated again.
type labelAndID struct {
labels labels.Labels
id uint64
}
50 changes: 24 additions & 26 deletions internal/component/prometheus/relabel/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/prometheus"
"github.com/grafana/alloy/internal/runtime/componenttest"
"github.com/grafana/alloy/internal/service/cache"
"github.com/grafana/alloy/internal/service/labelstore"
"github.com/grafana/alloy/internal/service/livedebugging"
"github.com/grafana/alloy/internal/util"
Expand All @@ -25,39 +26,35 @@ import (
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
func TestLRUCache(t *testing.T) {
lc := labelstore.New(nil, prom.DefaultRegisterer)
relabeller := generateRelabel(t)
relabeller := generateRelabelWithLRUCache(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
require.True(t, relabeller.cache.Len() == 1)
require.True(t, relabeller.cache.GetCacheSize() == 1)
entry, found := relabeller.getFromCache(lc.GetOrAddGlobalRefID(lbls))
require.True(t, found)
require.NotNil(t, entry)
require.True(
t,
lc.GetOrAddGlobalRefID(entry.labels) != lc.GetOrAddGlobalRefID(lbls),
lc.GetOrAddGlobalRefID(entry.Labels) != lc.GetOrAddGlobalRefID(lbls),
)
}

func TestUpdateReset(t *testing.T) {
relabeller := generateRelabel(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
require.True(t, relabeller.cache.Len() == 1)
_ = relabeller.Update(Arguments{
CacheSize: 100000,
MetricRelabelConfigs: []*alloy_relabel.Config{},
})
require.True(t, relabeller.cache.Len() == 0)
}

func TestValidator(t *testing.T) {
args := Arguments{CacheSize: 0}
args := Arguments{
CacheConfig: cache.CacheConfig{
Backend: "unknown",
},
}
err := args.Validate()
require.Error(t, err)

args.CacheSize = 1
args.CacheConfig.Backend = cache.InMemory
err = args.Validate()
require.Error(t, err)

args.CacheConfig.InMemory.CacheSize = 1
err = args.Validate()
require.NoError(t, err)
}
Expand All @@ -83,7 +80,7 @@ func TestNil(t *testing.T) {
Action: "drop",
},
},
CacheSize: 100000,
InMemoryCacheSizeDeprecated: 100000,
})
require.NotNil(t, relabeller)
require.NoError(t, err)
Expand All @@ -93,22 +90,22 @@ func TestNil(t *testing.T) {
}

func TestLRU(t *testing.T) {
relabeller := generateRelabel(t)
relabeller := generateRelabelWithLRUCache(t)

for i := 0; i < 600_000; i++ {
lbls := labels.FromStrings("__address__", "localhost", "inc", strconv.Itoa(i))
relabeller.relabel(0, lbls)
}
require.True(t, relabeller.cache.Len() == 100_000)
require.True(t, relabeller.cache.GetCacheSize() == 100_000)
}

func TestLRUNaN(t *testing.T) {
relabeller := generateRelabel(t)
relabeller := generateRelabelWithLRUCache(t)
lbls := labels.FromStrings("__address__", "localhost")
relabeller.relabel(0, lbls)
require.True(t, relabeller.cache.Len() == 1)
require.True(t, relabeller.cache.GetCacheSize() == 1)
relabeller.relabel(math.Float64frombits(value.StaleNaN), lbls)
require.True(t, relabeller.cache.Len() == 0)
require.True(t, relabeller.cache.GetCacheSize() == 0)
}

func BenchmarkCache(b *testing.B) {
Expand Down Expand Up @@ -147,7 +144,7 @@ func BenchmarkCache(b *testing.B) {
app.Commit()
}

func generateRelabel(t *testing.T) *Component {
func generateRelabelWithLRUCache(t *testing.T) *Component {
ls := labelstore.New(nil, prom.DefaultRegisterer)
fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) {
require.True(t, l.Has("new_label"))
Expand All @@ -170,8 +167,9 @@ func generateRelabel(t *testing.T) *Component {
Action: "replace",
},
},
CacheSize: 100_000,
InMemoryCacheSizeDeprecated: 100_000,
})

require.NotNil(t, relabeller)
require.NoError(t, err)
return relabeller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/alloy/internal/component/prometheus/relabel"
"github.com/grafana/alloy/internal/converter/internal/common"
"github.com/grafana/alloy/internal/converter/internal/prometheusconvert/build"
"github.com/grafana/alloy/internal/service/cache"
prom_relabel "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
)
Expand Down Expand Up @@ -36,7 +37,12 @@ func toRelabelArguments(relabelConfigs []*prom_relabel.Config, forwardTo []stora
return &relabel.Arguments{
ForwardTo: forwardTo,
MetricRelabelConfigs: ToAlloyRelabelConfigs(relabelConfigs),
CacheSize: 100_000,
CacheConfig: cache.CacheConfig{
Backend: cache.InMemory,
InMemory: cache.InMemoryCacheConfig{
CacheSize: 100_000,
},
},
}
}

Expand Down
Loading