Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert cache to a ttl cache #77

Open
wants to merge 1 commit into
base: db_main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ require (

require (
github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5
github.com/hashicorp/golang-lru v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.29.0
Expand Down
8 changes: 3 additions & 5 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type HandlerConfig struct {
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
LogFailedQueries bool `yaml:"log_failed_queries"`
FailedQueryCacheCapacity int `yaml:"failed_query_cache_capacity"`
FailedQueryTTL time.Duration `yaml:"failed_query_ttl"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add a cmd line flag for this config.
image

}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand Down Expand Up @@ -76,11 +77,8 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
}

if cfg.FailedQueryCacheCapacity > 0 {
level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity)
FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, reg)
if errQueryCache != nil {
level.Warn(log).Log(errQueryCache.Error())
}
level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity, "ttl", cfg.FailedQueryTTL.String())
FailedQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, cfg.FailedQueryTTL, reg)
h.failedQueryCache = FailedQueryCache
}

Expand Down
41 changes: 19 additions & 22 deletions internal/cortex/frontend/transport/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package utils

import (
"fmt"
"github.com/hashicorp/golang-lru/v2/expirable"
"net/http"
"net/url"
"regexp"
"strconv"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -25,39 +25,35 @@ var (
type FailedQueryCache struct {
regex *regexp.Regexp
errorExtract *regexp.Regexp
lruCache *lru.Cache
lruCache *expirable.LRU[string, int]
cachedHits prometheus.Counter
cachedQueries prometheus.Gauge
}

func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryCache, error) {
func NewFailedQueryCache(capacity int, ttlDuration time.Duration, reg prometheus.Registerer) *FailedQueryCache {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the eventual goal of a TTL is to cache expensive queries as well. If a query fetches too many time series, add it to the cache.
Can you rename the cache like ExpensiveQueryCache and make it cache both failed queries (usually expensive ones), long-running ones (> 4 minutes), and the ones fetching too many time series?

regex := regexp.MustCompile(`[\s\n\t]+`)
errorExtract := regexp.MustCompile(`Code\((\d+)\)`)
lruCache, err := lru.New(capacity)
if err != nil {
lruCache = nil
err = fmt.Errorf("failed to create lru cache: %s", err)
return nil, err
}
lruCacheWithTTL := expirable.NewLRU[string, int](capacity, nil, ttlDuration)

cachedHits := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cached_failed_queries_count",
Help: "Total number of queries that hit the failed query cache.",
Name: "cached_failed_queries_count",
Help: "Total number of queries that hit the failed query cache.",
})
cachedQueries := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "failed_query_cache_size",
Help: "How many queries are cached in the failed query cache.",
Name: "failed_query_cache_size",
Help: "How many queries are cached in the failed query cache.",
})
cachedQueries.Set(0)

return &FailedQueryCache{
regex: regex,
errorExtract: errorExtract,
lruCache: lruCache,
lruCache: lruCacheWithTTL,
cachedHits: cachedHits,
cachedQueries: cachedQueries,
}, err
}
}

// UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome
Expand Down Expand Up @@ -92,19 +88,20 @@ func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNorm

func (f *FailedQueryCache) addCacheEntry(queryExpressionNormalized string, queryExpressionRangeLength int) {
// Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value.
if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains {
if contains := f.lruCache.Contains(queryExpressionNormalized); contains {
if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok {
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int))
queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue)
}
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
}
f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm Add() is thread-safe? Multiple goroutines from the HTTP server may call this function here concurrently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it uses a mutex


f.cachedQueries.Set(float64(f.lruCache.Len()))
}

// QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message.
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits prometheus.Counter) (bool, string) {
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength {
cachedQueryRangeSeconds := value.(int)
func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *expirable.LRU[string, int], cachedHits prometheus.Counter) (bool, string) {
if value, ok := lruCache.Get(queryExpressionNormalized); ok && value <= queryExpressionRangeLength {
cachedQueryRangeSeconds := value
message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil)
cachedHits.Inc()
return true, message
Expand Down Expand Up @@ -159,7 +156,7 @@ func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values, q
queryExpressionRangeLength := getQueryRangeSeconds(query)
// TODO(hc.zhu): add a flag for the threshold
// The current gateway timeout is 5 minutes, so we cache the failed query running longer than 5 minutes - 10 seconds.
if queryResponseTime > time.Second * (60 * 5 - 10) {
if queryResponseTime > time.Second*(60*5-10) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the magic number a flag since you are touching this part? :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt, it was due to go format

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do it in a separate PR

// Cache long running queries regardless of the error code. The most common case is "context canceled".
f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength)
message := createLogMessage("Cached a failed long running query", queryExpressionNormalized, -1, queryExpressionRangeLength, err)
Expand Down
21 changes: 9 additions & 12 deletions internal/cortex/frontend/transport/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,16 @@ func verifyMetricCount(t *testing.T, reg *prometheus.Registry, expectedCount int

func TestNewFailedQueryCache(t *testing.T) {
reg := prometheus.NewRegistry()
cache, err := NewFailedQueryCache(2, reg)
cache := NewFailedQueryCache(2, 0, reg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add test cases where the TTL is not 0?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does 0 TTL mean? Never expire?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 means no ttl

if cache == nil {
t.Fatalf("Expected cache to be created, but got nil")
}
if err != nil {
t.Fatalf("Expected no error message, but got: %s", err.Error())
}
verifyMetricCount(t, reg, 2)
}

func TestUpdateFailedQueryCache(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(3, reg)
cache := NewFailedQueryCache(3, 0, reg)

tests := []struct {
name string
Expand Down Expand Up @@ -206,7 +203,7 @@ func TestUpdateFailedQueryCache(t *testing.T) {
// TestQueryHitCache tests the QueryHitCache method
func TestQueryHitCache(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(2, reg)
cache := NewFailedQueryCache(2, 0, reg)
lruCache := cache.lruCache

lruCache.Add("test_query", 100)
Expand Down Expand Up @@ -289,7 +286,7 @@ func TestQueryHitCache(t *testing.T) {

func TestCacheCounterVec(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(2, reg)
cache := NewFailedQueryCache(2, 0, reg)
lruCache := cache.lruCache

lruCache.Add("test_query", 100)
Expand Down Expand Up @@ -371,12 +368,12 @@ func TestCacheCounterVec(t *testing.T) {

func TestCacheLongRunningFailedQuery(t *testing.T) {
reg := prometheus.NewRegistry()
cache, _ := NewFailedQueryCache(3, reg)
cache := NewFailedQueryCache(3, 0, reg)

tests := []struct {
name string
err error
query url.Values
name string
err error
query url.Values
}{
{
name: "No error code in error message",
Expand All @@ -401,7 +398,7 @@ func TestCacheLongRunningFailedQuery(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Long running failed query without an error code
cached, _ := cache.UpdateFailedQueryCache(tt.err, tt.query, time.Second*(5 * 60 - 1))
cached, _ := cache.UpdateFailedQueryCache(tt.err, tt.query, time.Second*(5*60-1))
if !cached {
t.Errorf("Should cache short running failed query without an error code")
}
Expand Down
Loading