diff --git a/pkg/cache/lru.go b/pkg/cache/lru.go index edd31a5..67e869b 100644 --- a/pkg/cache/lru.go +++ b/pkg/cache/lru.go @@ -1,90 +1,137 @@ package cache import ( - "context" "fmt" "sync" "time" "github.com/Revolyssup/arp/pkg/logger" - "github.com/Revolyssup/arp/pkg/utils" ) type Node[T any] struct { - Left *Node[T] - Right *Node[T] - key string - value T + Left *Node[T] + Right *Node[T] + key string + value T + expiration int64 // Unix nano timestamp, 0 means no expiration } type LRUCache[T any] struct { Head *Node[T] Tail *Node[T] - m map[string]*Node[T] // ← Store NODES, not just values + m map[string]*Node[T] maxSize int mx sync.Mutex log *logger.Logger - //for managing the cleanup goroutines for TTL - ttlCtx context.Context - ttlCancel context.CancelFunc - keyCancels map[string]context.CancelFunc - ttlWg sync.WaitGroup + // Hashicorp-style TTL management + janitor *janitor + stopJanitor chan struct{} } -func NewLRUCache[T any](size int, logger *logger.Logger) *LRUCache[T] { - // Note: Its okay to use Background here as this is the context that will be - // used for all TTL goroutines. Passing context from NewLRUCache will complicate things for no reason. - // LRUCache exposes a Destroy() method for cleanup which the caller must respect. - ctx, cancel := context.WithCancel(context.Background()) - return &LRUCache[T]{ - m: make(map[string]*Node[T]), - maxSize: size, - log: logger.WithComponent("LRUCache"), - ttlCtx: ctx, - keyCancels: make(map[string]context.CancelFunc), - ttlCancel: cancel, +type janitor struct { + Interval time.Duration + stop chan struct{} +} + +func NewLRUCache[T any](size int, cleanupInterval time.Duration, logger *logger.Logger) *LRUCache[T] { + lru := &LRUCache[T]{ + m: make(map[string]*Node[T]), + maxSize: size, + log: logger.WithComponent("LRUCache"), + stopJanitor: make(chan struct{}), + } + + // Start janitor if cleanup interval is positive + if cleanupInterval > 0 { + lru.startJanitor(cleanupInterval) + } + + return lru +} + +func (lru *LRUCache[T]) startJanitor(interval time.Duration) { + j := &janitor{ + Interval: interval, + stop: make(chan struct{}), + } + lru.janitor = j + + go lru.runJanitor(j) +} + +func (lru *LRUCache[T]) runJanitor(j *janitor) { + ticker := time.NewTicker(j.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + lru.deleteExpired() + case <-j.stop: + return + case <-lru.stopJanitor: + return + } } } -func (nlru *LRUCache[T]) Stop() { - nlru.log.Debugf("Stopping LRU Cache TTL goroutines") - nlru.mx.Lock() - defer nlru.mx.Unlock() +func (lru *LRUCache[T]) deleteExpired() { + lru.mx.Lock() + defer lru.mx.Unlock() + + now := time.Now().UnixNano() + expiredKeys := make([]string, 0) - if nlru.ttlCancel != nil { - nlru.ttlCancel() + // First pass: collect expired keys + for key, node := range lru.m { + if node.expiration > 0 && now > node.expiration { + expiredKeys = append(expiredKeys, key) + } + } + + // Second pass: remove expired items + for _, key := range expiredKeys { + if node, exists := lru.m[key]; exists { + lru.log.Debugf("TTL expired for key %s, deleting from LRU Cache", key) + lru.pop(node) + delete(lru.m, key) + } } - nlru.ttlWg.Wait() } -// Resets but keeps the cache operational -func (nlru *LRUCache[T]) Reset() { - nlru.log.Debugf("Resetting LRU Cache") - nlru.Stop() +func (lru *LRUCache[T]) Stop() { + lru.log.Debugf("Stopping LRU Cache janitor") + lru.mx.Lock() + defer lru.mx.Unlock() + + // Stop the janitor + if lru.janitor != nil { + close(lru.janitor.stop) + lru.janitor = nil + } + close(lru.stopJanitor) +} - nlru.mx.Lock() - defer nlru.mx.Unlock() - nlru.Head = nil - nlru.Tail = nil - nlru.m = make(map[string]*Node[T]) +func (lru *LRUCache[T]) Reset() { + lru.log.Debugf("Resetting LRU Cache") + lru.mx.Lock() + defer lru.mx.Unlock() - // recreate context for TTL goroutines - nlru.ttlCtx, nlru.ttlCancel = context.WithCancel(context.Background()) + lru.Head = nil + lru.Tail = nil + lru.m = make(map[string]*Node[T]) } -// Destroys completely - cannot be used after this -func (nlru *LRUCache[T]) Destroy() { - nlru.log.Debugf("Destroying LRU Cache") - nlru.Stop() - nlru.mx.Lock() - defer nlru.mx.Unlock() - nlru.Head = nil - nlru.Tail = nil - nlru.m = nil - nlru.ttlCtx = nil - nlru.keyCancels = nil - nlru.ttlCancel = nil +func (lru *LRUCache[T]) Destroy() { + lru.log.Debugf("Destroying LRU Cache") + lru.Stop() + + lru.mx.Lock() + defer lru.mx.Unlock() + lru.Head = nil + lru.Tail = nil + lru.m = nil } func (nlru *LRUCache[T]) DebugGet() map[string]T { @@ -105,14 +152,6 @@ func (nlru *LRUCache[T]) PrintList() string { return ans } -func (lru *LRUCache[T]) cancelKeyTTL(key string) { - if cancel, exists := lru.keyCancels[key]; exists { - lru.log.Debugf("Cancelling TTL goroutine for key %s", key) - cancel() - delete(lru.keyCancels, key) - } -} - // O(1) pop - no traversal needed! func (lru *LRUCache[T]) pop(node *Node[T]) { if node == nil { @@ -160,6 +199,14 @@ func (lru *LRUCache[T]) Get(key string) (ansval T, ok bool) { defer lru.mx.Unlock() if node, ok := lru.m[key]; ok { + // Check expiration + if node.expiration > 0 && time.Now().UnixNano() > node.expiration { + lru.log.Debugf("Key %s found but expired", key) + lru.pop(node) + delete(lru.m, key) + return ansval, false + } + // Move to front - O(1) operations lru.pop(node) lru.push(node) @@ -175,8 +222,6 @@ func (lru *LRUCache[T]) Delete(key string) (ok bool) { if node, ok := lru.m[key]; ok { lru.pop(node) - // Cancel any existing TTL goroutine for this key - lru.cancelKeyTTL(key) delete(lru.m, key) return true } @@ -188,9 +233,6 @@ func (lru *LRUCache[T]) poptail() { if lru.Tail == nil { return } - // Cancel TTL cleanup go routine for the evicted key - lru.cancelKeyTTL(lru.Tail.key) - // Remove from map and list delete(lru.m, lru.Tail.key) lru.pop(lru.Tail) @@ -201,19 +243,24 @@ func (lru *LRUCache[T]) Set(key string, val T, ttl time.Duration) { lru.mx.Lock() defer lru.mx.Unlock() + var expiration int64 + if ttl > 0 { + expiration = time.Now().Add(ttl).UnixNano() + } + // Check if key already exists if existingNode, exists := lru.m[key]; exists { - // Cancel any existing TTL goroutine for this key - lru.cancelKeyTTL(key) - // Update value and move to front + // Update value, expiration and move to front existingNode.value = val + existingNode.expiration = expiration lru.pop(existingNode) lru.push(existingNode) } else { // Create new node newNode := &Node[T]{ - key: key, - value: val, + key: key, + value: val, + expiration: expiration, } // Evict if needed @@ -225,26 +272,4 @@ func (lru *LRUCache[T]) Set(key string, val T, ttl time.Duration) { lru.m[key] = newNode lru.push(newNode) } - - // TTL handling with proper context - if ttl >= 0 { - keyCtx, keyCancel := context.WithCancel(lru.ttlCtx) - lru.keyCancels[key] = keyCancel - lru.ttlWg.Add(1) - utils.GoWithRecover(func() { - defer lru.ttlWg.Done() - - select { - case <-keyCtx.Done(): - lru.log.Debugf("TTL context done for key %s", key) - return - case <-time.After(ttl): - lru.log.Debugf("TTL expired for key %s, deleting from LRU Cache after %v", key, ttl) - lru.Delete(key) - } - }, func(err any) { - lru.log.Infof("panic in LRU cache ttl goroutine: %v", err) - lru.ttlWg.Done() - }) - } } diff --git a/pkg/cache/lru_test.go b/pkg/cache/lru_test.go index e2f2fbe..3f395d0 100644 --- a/pkg/cache/lru_test.go +++ b/pkg/cache/lru_test.go @@ -9,7 +9,7 @@ import ( ) func TestLRU(t *testing.T) { - lru := NewLRUCache[int](2, logger.New(logger.LevelInfo)) + lru := NewLRUCache[int](2, 1*time.Second, logger.New(logger.LevelInfo)) lru.Set("a", 1, time.Second*2) lru.Set("b", 2, time.Second*2) val, ok := lru.Get("a") @@ -31,7 +31,7 @@ func TestLRU(t *testing.T) { // BenchmarkLRUCache_Set benchmarks setting values in the cache func BenchmarkLRUCache_Set(b *testing.B) { log := logger.New(logger.LevelInfo).WithComponent("benchmark") - cache := NewLRUCache[string](1000, log) + cache := NewLRUCache[string](1000, 1*time.Second, log) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -43,7 +43,7 @@ func BenchmarkLRUCache_Set(b *testing.B) { // BenchmarkLRUCache_Get benchmarks getting values from the cache func BenchmarkLRUCache_Get(b *testing.B) { log := logger.New(logger.LevelInfo).WithComponent("benchmark") - cache := NewLRUCache[string](1000, log) + cache := NewLRUCache[string](1000, 1*time.Second, log) // Pre-populate cache for i := 0; i < 1000; i++ { @@ -61,7 +61,7 @@ func BenchmarkLRUCache_Get(b *testing.B) { // BenchmarkLRUCache_GetMiss benchmarks cache misses func BenchmarkLRUCache_GetMiss(b *testing.B) { log := logger.New(logger.LevelInfo).WithComponent("benchmark") - cache := NewLRUCache[string](1000, log) + cache := NewLRUCache[string](1000, 1*time.Second, log) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -73,7 +73,7 @@ func BenchmarkLRUCache_GetMiss(b *testing.B) { // BenchmarkLRUCache_SetGetMixed benchmarks mixed operations func BenchmarkLRUCache_SetGetMixed(b *testing.B) { log := logger.New(logger.LevelInfo).WithComponent("benchmark") - cache := NewLRUCache[string](1000, log) + cache := NewLRUCache[string](1000, 1*time.Second, log) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -93,7 +93,7 @@ func BenchmarkLRUCache_DifferentSizes(b *testing.B) { for _, size := range sizes { b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) { log := logger.New(logger.LevelInfo).WithComponent("benchmark") - cache := NewLRUCache[string](size, log) + cache := NewLRUCache[string](size, 1*time.Second, log) // Pre-populate for i := 0; i < size; i++ { @@ -112,7 +112,7 @@ func BenchmarkLRUCache_DifferentSizes(b *testing.B) { // Benchmark concurrent access func BenchmarkLRUCache_Concurrent(b *testing.B) { log := logger.New(logger.LevelInfo).WithComponent("benchmark") - cache := NewLRUCache[string](1000, log) + cache := NewLRUCache[string](1000, 1*time.Second, log) // Pre-populate for i := 0; i < 1000; i++ { diff --git a/pkg/plugin/responsecache/responsecache.go b/pkg/plugin/responsecache/responsecache.go index ee145be..d704203 100644 --- a/pkg/plugin/responsecache/responsecache.go +++ b/pkg/plugin/responsecache/responsecache.go @@ -76,7 +76,7 @@ func (p *ResponseCache) ValidateAndSetConfig(conf types.PluginConf) error { } p.config = conf size := conf["size"].(int) - p.cache = cache.NewLRUCache[[]byte](size, p.logger) + p.cache = cache.NewLRUCache[[]byte](size, 1*time.Second, p.logger) p.logger.Debugf("Initialized ResponseCache plugin with size %d", size) return nil } diff --git a/pkg/route/matcher.go b/pkg/route/matcher.go index b3560e5..fc77a82 100644 --- a/pkg/route/matcher.go +++ b/pkg/route/matcher.go @@ -32,7 +32,7 @@ func NewPathMatcher(logger *logger.Logger) *PathMatcher { }, 0), prefixRoutes: make(map[string][]*Route), logger: l, - cache: cache.NewLRUCache[[]*Route](100, l), + cache: cache.NewLRUCache[[]*Route](100, 1*time.Second, l), } } @@ -92,7 +92,7 @@ func (pm *PathMatcher) Clear() { routes []*Route }, 0) pm.prefixRoutes = make(map[string][]*Route) - pm.cache = cache.NewLRUCache[[]*Route](100, pm.logger) + pm.cache = cache.NewLRUCache[[]*Route](100, 1*time.Second, pm.logger) } type MethodMatcher struct {