From f60a77d8ebbef35eb734d1d9b9602e89689c4082 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Tue, 27 Aug 2024 17:08:36 -0700 Subject: [PATCH] convert cache to a ttl cache --- go.mod | 1 - internal/cortex/frontend/transport/handler.go | 8 ++-- .../cortex/frontend/transport/utils/utils.go | 41 +++++++++---------- .../frontend/transport/utils/utils_test.go | 21 ++++------ 4 files changed, 31 insertions(+), 40 deletions(-) diff --git a/go.mod b/go.mod index 078e2f1e48..cd2dc3c416 100644 --- a/go.mod +++ b/go.mod @@ -116,7 +116,6 @@ require ( require ( github.com/cortexproject/promqlsmith v0.0.0-20240326071418-c2a9ca1e89f5 - github.com/hashicorp/golang-lru v0.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.29.0 diff --git a/internal/cortex/frontend/transport/handler.go b/internal/cortex/frontend/transport/handler.go index d9acbbdbcd..67cbbefebc 100644 --- a/internal/cortex/frontend/transport/handler.go +++ b/internal/cortex/frontend/transport/handler.go @@ -48,6 +48,7 @@ type HandlerConfig struct { QueryStatsEnabled bool `yaml:"query_stats_enabled"` LogFailedQueries bool `yaml:"log_failed_queries"` FailedQueryCacheCapacity int `yaml:"failed_query_cache_capacity"` + FailedQueryTTL time.Duration `yaml:"failed_query_ttl"` } // Handler accepts queries and forwards them to RoundTripper. It can log slow queries, @@ -76,11 +77,8 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge } if cfg.FailedQueryCacheCapacity > 0 { - level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity) - FailedQueryCache, errQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, reg) - if errQueryCache != nil { - level.Warn(log).Log(errQueryCache.Error()) - } + level.Info(log).Log("msg", "Creating failed query cache", "capacity", cfg.FailedQueryCacheCapacity, "ttl", cfg.FailedQueryTTL.String()) + FailedQueryCache := utils.NewFailedQueryCache(cfg.FailedQueryCacheCapacity, cfg.FailedQueryTTL, reg) h.failedQueryCache = FailedQueryCache } diff --git a/internal/cortex/frontend/transport/utils/utils.go b/internal/cortex/frontend/transport/utils/utils.go index 08d4764ea8..3d52781ecc 100644 --- a/internal/cortex/frontend/transport/utils/utils.go +++ b/internal/cortex/frontend/transport/utils/utils.go @@ -6,13 +6,13 @@ package utils import ( "fmt" + "github.com/hashicorp/golang-lru/v2/expirable" "net/http" "net/url" "regexp" "strconv" "time" - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -25,39 +25,35 @@ var ( type FailedQueryCache struct { regex *regexp.Regexp errorExtract *regexp.Regexp - lruCache *lru.Cache + lruCache *expirable.LRU[string, int] cachedHits prometheus.Counter cachedQueries prometheus.Gauge } -func NewFailedQueryCache(capacity int, reg prometheus.Registerer) (*FailedQueryCache, error) { +func NewFailedQueryCache(capacity int, ttlDuration time.Duration, reg prometheus.Registerer) *FailedQueryCache { regex := regexp.MustCompile(`[\s\n\t]+`) errorExtract := regexp.MustCompile(`Code\((\d+)\)`) - lruCache, err := lru.New(capacity) - if err != nil { - lruCache = nil - err = fmt.Errorf("failed to create lru cache: %s", err) - return nil, err - } + lruCacheWithTTL := expirable.NewLRU[string, int](capacity, nil, ttlDuration) + cachedHits := promauto.With(reg).NewCounter(prometheus.CounterOpts{ Namespace: "cortex", - Name: "cached_failed_queries_count", - Help: "Total number of queries that hit the failed query cache.", + Name: "cached_failed_queries_count", + Help: "Total number of queries that hit the failed query cache.", }) cachedQueries := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", - Name: "failed_query_cache_size", - Help: "How many queries are cached in the failed query cache.", + Name: "failed_query_cache_size", + Help: "How many queries are cached in the failed query cache.", }) cachedQueries.Set(0) return &FailedQueryCache{ regex: regex, errorExtract: errorExtract, - lruCache: lruCache, + lruCache: lruCacheWithTTL, cachedHits: cachedHits, cachedQueries: cachedQueries, - }, err + } } // UpdateFailedQueryCache returns true if query is cached so that callsite can increase counter, returns message as a string for callsite to log outcome @@ -92,19 +88,20 @@ func (f *FailedQueryCache) updateFailedQueryCache(err error, queryExpressionNorm func (f *FailedQueryCache) addCacheEntry(queryExpressionNormalized string, queryExpressionRangeLength int) { // Checks if queryExpression is already in cache, and updates time range length value to min of stored and new value. - if contains, _ := f.lruCache.ContainsOrAdd(queryExpressionNormalized, queryExpressionRangeLength); contains { + if contains := f.lruCache.Contains(queryExpressionNormalized); contains { if oldValue, ok := f.lruCache.Get(queryExpressionNormalized); ok { - queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue.(int)) + queryExpressionRangeLength = min(queryExpressionRangeLength, oldValue) } - f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength) } + f.lruCache.Add(queryExpressionNormalized, queryExpressionRangeLength) + f.cachedQueries.Set(float64(f.lruCache.Len())) } // QueryHitCache checks if the lru cache is hit and returns whether to increment counter for cache hits along with appropriate message. -func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *lru.Cache, cachedHits prometheus.Counter) (bool, string) { - if value, ok := lruCache.Get(queryExpressionNormalized); ok && value.(int) <= queryExpressionRangeLength { - cachedQueryRangeSeconds := value.(int) +func queryHitCache(queryExpressionNormalized string, queryExpressionRangeLength int, lruCache *expirable.LRU[string, int], cachedHits prometheus.Counter) (bool, string) { + if value, ok := lruCache.Get(queryExpressionNormalized); ok && value <= queryExpressionRangeLength { + cachedQueryRangeSeconds := value message := createLogMessage("Retrieved query from cache", queryExpressionNormalized, cachedQueryRangeSeconds, queryExpressionRangeLength, nil) cachedHits.Inc() return true, message @@ -159,7 +156,7 @@ func (f *FailedQueryCache) UpdateFailedQueryCache(err error, query url.Values, q queryExpressionRangeLength := getQueryRangeSeconds(query) // TODO(hc.zhu): add a flag for the threshold // The current gateway timeout is 5 minutes, so we cache the failed query running longer than 5 minutes - 10 seconds. - if queryResponseTime > time.Second * (60 * 5 - 10) { + if queryResponseTime > time.Second*(60*5-10) { // Cache long running queries regardless of the error code. The most common case is "context canceled". f.addCacheEntry(queryExpressionNormalized, queryExpressionRangeLength) message := createLogMessage("Cached a failed long running query", queryExpressionNormalized, -1, queryExpressionRangeLength, err) diff --git a/internal/cortex/frontend/transport/utils/utils_test.go b/internal/cortex/frontend/transport/utils/utils_test.go index c21252083b..19d4d2d7ab 100644 --- a/internal/cortex/frontend/transport/utils/utils_test.go +++ b/internal/cortex/frontend/transport/utils/utils_test.go @@ -34,19 +34,16 @@ func verifyMetricCount(t *testing.T, reg *prometheus.Registry, expectedCount int func TestNewFailedQueryCache(t *testing.T) { reg := prometheus.NewRegistry() - cache, err := NewFailedQueryCache(2, reg) + cache := NewFailedQueryCache(2, 0, reg) if cache == nil { t.Fatalf("Expected cache to be created, but got nil") } - if err != nil { - t.Fatalf("Expected no error message, but got: %s", err.Error()) - } verifyMetricCount(t, reg, 2) } func TestUpdateFailedQueryCache(t *testing.T) { reg := prometheus.NewRegistry() - cache, _ := NewFailedQueryCache(3, reg) + cache := NewFailedQueryCache(3, 0, reg) tests := []struct { name string @@ -206,7 +203,7 @@ func TestUpdateFailedQueryCache(t *testing.T) { // TestQueryHitCache tests the QueryHitCache method func TestQueryHitCache(t *testing.T) { reg := prometheus.NewRegistry() - cache, _ := NewFailedQueryCache(2, reg) + cache := NewFailedQueryCache(2, 0, reg) lruCache := cache.lruCache lruCache.Add("test_query", 100) @@ -289,7 +286,7 @@ func TestQueryHitCache(t *testing.T) { func TestCacheCounterVec(t *testing.T) { reg := prometheus.NewRegistry() - cache, _ := NewFailedQueryCache(2, reg) + cache := NewFailedQueryCache(2, 0, reg) lruCache := cache.lruCache lruCache.Add("test_query", 100) @@ -371,12 +368,12 @@ func TestCacheCounterVec(t *testing.T) { func TestCacheLongRunningFailedQuery(t *testing.T) { reg := prometheus.NewRegistry() - cache, _ := NewFailedQueryCache(3, reg) + cache := NewFailedQueryCache(3, 0, reg) tests := []struct { - name string - err error - query url.Values + name string + err error + query url.Values }{ { name: "No error code in error message", @@ -401,7 +398,7 @@ func TestCacheLongRunningFailedQuery(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Long running failed query without an error code - cached, _ := cache.UpdateFailedQueryCache(tt.err, tt.query, time.Second*(5 * 60 - 1)) + cached, _ := cache.UpdateFailedQueryCache(tt.err, tt.query, time.Second*(5*60-1)) if !cached { t.Errorf("Should cache short running failed query without an error code") }