Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 119 additions & 94 deletions pkg/cache/lru.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,137 @@
package cache

import (
"context"
"fmt"
"sync"
"time"

"github.com/Revolyssup/arp/pkg/logger"
"github.com/Revolyssup/arp/pkg/utils"
)

type Node[T any] struct {
Left *Node[T]
Right *Node[T]
key string
value T
Left *Node[T]
Right *Node[T]
key string
value T
expiration int64 // Unix nano timestamp, 0 means no expiration
}

type LRUCache[T any] struct {
Head *Node[T]
Tail *Node[T]
m map[string]*Node[T] // ← Store NODES, not just values
m map[string]*Node[T]
maxSize int
mx sync.Mutex
log *logger.Logger

//for managing the cleanup goroutines for TTL
ttlCtx context.Context
ttlCancel context.CancelFunc
keyCancels map[string]context.CancelFunc
ttlWg sync.WaitGroup
// Hashicorp-style TTL management
janitor *janitor
stopJanitor chan struct{}
}

func NewLRUCache[T any](size int, logger *logger.Logger) *LRUCache[T] {
// Note: Its okay to use Background here as this is the context that will be
// used for all TTL goroutines. Passing context from NewLRUCache will complicate things for no reason.
// LRUCache exposes a Destroy() method for cleanup which the caller must respect.
ctx, cancel := context.WithCancel(context.Background())
return &LRUCache[T]{
m: make(map[string]*Node[T]),
maxSize: size,
log: logger.WithComponent("LRUCache"),
ttlCtx: ctx,
keyCancels: make(map[string]context.CancelFunc),
ttlCancel: cancel,
type janitor struct {
Interval time.Duration
stop chan struct{}
}

func NewLRUCache[T any](size int, cleanupInterval time.Duration, logger *logger.Logger) *LRUCache[T] {
lru := &LRUCache[T]{
m: make(map[string]*Node[T]),
maxSize: size,
log: logger.WithComponent("LRUCache"),
stopJanitor: make(chan struct{}),
}

// Start janitor if cleanup interval is positive
if cleanupInterval > 0 {
lru.startJanitor(cleanupInterval)
}

return lru
}

func (lru *LRUCache[T]) startJanitor(interval time.Duration) {
j := &janitor{
Interval: interval,
stop: make(chan struct{}),
}
lru.janitor = j

go lru.runJanitor(j)
}

func (lru *LRUCache[T]) runJanitor(j *janitor) {
ticker := time.NewTicker(j.Interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
lru.deleteExpired()
case <-j.stop:
return
case <-lru.stopJanitor:
return
}
}
}

func (nlru *LRUCache[T]) Stop() {
nlru.log.Debugf("Stopping LRU Cache TTL goroutines")
nlru.mx.Lock()
defer nlru.mx.Unlock()
func (lru *LRUCache[T]) deleteExpired() {
lru.mx.Lock()
defer lru.mx.Unlock()

now := time.Now().UnixNano()
expiredKeys := make([]string, 0)

if nlru.ttlCancel != nil {
nlru.ttlCancel()
// First pass: collect expired keys
for key, node := range lru.m {
if node.expiration > 0 && now > node.expiration {
expiredKeys = append(expiredKeys, key)
}
}

// Second pass: remove expired items
for _, key := range expiredKeys {
if node, exists := lru.m[key]; exists {
lru.log.Debugf("TTL expired for key %s, deleting from LRU Cache", key)
lru.pop(node)
delete(lru.m, key)
}
}
nlru.ttlWg.Wait()
}

// Resets but keeps the cache operational
func (nlru *LRUCache[T]) Reset() {
nlru.log.Debugf("Resetting LRU Cache")
nlru.Stop()
func (lru *LRUCache[T]) Stop() {
lru.log.Debugf("Stopping LRU Cache janitor")
lru.mx.Lock()
defer lru.mx.Unlock()

// Stop the janitor
if lru.janitor != nil {
close(lru.janitor.stop)
lru.janitor = nil
}
close(lru.stopJanitor)
}

nlru.mx.Lock()
defer nlru.mx.Unlock()
nlru.Head = nil
nlru.Tail = nil
nlru.m = make(map[string]*Node[T])
func (lru *LRUCache[T]) Reset() {
lru.log.Debugf("Resetting LRU Cache")
lru.mx.Lock()
defer lru.mx.Unlock()

// recreate context for TTL goroutines
nlru.ttlCtx, nlru.ttlCancel = context.WithCancel(context.Background())
lru.Head = nil
lru.Tail = nil
lru.m = make(map[string]*Node[T])
}

// Destroys completely - cannot be used after this
func (nlru *LRUCache[T]) Destroy() {
nlru.log.Debugf("Destroying LRU Cache")
nlru.Stop()
nlru.mx.Lock()
defer nlru.mx.Unlock()
nlru.Head = nil
nlru.Tail = nil
nlru.m = nil
nlru.ttlCtx = nil
nlru.keyCancels = nil
nlru.ttlCancel = nil
func (lru *LRUCache[T]) Destroy() {
lru.log.Debugf("Destroying LRU Cache")
lru.Stop()

lru.mx.Lock()
defer lru.mx.Unlock()
lru.Head = nil
lru.Tail = nil
lru.m = nil
}

func (nlru *LRUCache[T]) DebugGet() map[string]T {
Expand All @@ -105,14 +152,6 @@ func (nlru *LRUCache[T]) PrintList() string {
return ans
}

func (lru *LRUCache[T]) cancelKeyTTL(key string) {
if cancel, exists := lru.keyCancels[key]; exists {
lru.log.Debugf("Cancelling TTL goroutine for key %s", key)
cancel()
delete(lru.keyCancels, key)
}
}

// O(1) pop - no traversal needed!
func (lru *LRUCache[T]) pop(node *Node[T]) {
if node == nil {
Expand Down Expand Up @@ -160,6 +199,14 @@ func (lru *LRUCache[T]) Get(key string) (ansval T, ok bool) {
defer lru.mx.Unlock()

if node, ok := lru.m[key]; ok {
// Check expiration
if node.expiration > 0 && time.Now().UnixNano() > node.expiration {
lru.log.Debugf("Key %s found but expired", key)
lru.pop(node)
delete(lru.m, key)
return ansval, false
}

// Move to front - O(1) operations
lru.pop(node)
lru.push(node)
Expand All @@ -175,8 +222,6 @@ func (lru *LRUCache[T]) Delete(key string) (ok bool) {

if node, ok := lru.m[key]; ok {
lru.pop(node)
// Cancel any existing TTL goroutine for this key
lru.cancelKeyTTL(key)
delete(lru.m, key)
return true
}
Expand All @@ -188,9 +233,6 @@ func (lru *LRUCache[T]) poptail() {
if lru.Tail == nil {
return
}
// Cancel TTL cleanup go routine for the evicted key
lru.cancelKeyTTL(lru.Tail.key)

// Remove from map and list
delete(lru.m, lru.Tail.key)
lru.pop(lru.Tail)
Expand All @@ -201,19 +243,24 @@ func (lru *LRUCache[T]) Set(key string, val T, ttl time.Duration) {
lru.mx.Lock()
defer lru.mx.Unlock()

var expiration int64
if ttl > 0 {
expiration = time.Now().Add(ttl).UnixNano()
}

// Check if key already exists
if existingNode, exists := lru.m[key]; exists {
// Cancel any existing TTL goroutine for this key
lru.cancelKeyTTL(key)
// Update value and move to front
// Update value, expiration and move to front
existingNode.value = val
existingNode.expiration = expiration
lru.pop(existingNode)
lru.push(existingNode)
} else {
// Create new node
newNode := &Node[T]{
key: key,
value: val,
key: key,
value: val,
expiration: expiration,
}

// Evict if needed
Expand All @@ -225,26 +272,4 @@ func (lru *LRUCache[T]) Set(key string, val T, ttl time.Duration) {
lru.m[key] = newNode
lru.push(newNode)
}

// TTL handling with proper context
if ttl >= 0 {
keyCtx, keyCancel := context.WithCancel(lru.ttlCtx)
lru.keyCancels[key] = keyCancel
lru.ttlWg.Add(1)
utils.GoWithRecover(func() {
defer lru.ttlWg.Done()

select {
case <-keyCtx.Done():
lru.log.Debugf("TTL context done for key %s", key)
return
case <-time.After(ttl):
lru.log.Debugf("TTL expired for key %s, deleting from LRU Cache after %v", key, ttl)
lru.Delete(key)
}
}, func(err any) {
lru.log.Infof("panic in LRU cache ttl goroutine: %v", err)
lru.ttlWg.Done()
})
}
}
14 changes: 7 additions & 7 deletions pkg/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestLRU(t *testing.T) {
lru := NewLRUCache[int](2, logger.New(logger.LevelInfo))
lru := NewLRUCache[int](2, 1*time.Second, logger.New(logger.LevelInfo))
lru.Set("a", 1, time.Second*2)
lru.Set("b", 2, time.Second*2)
val, ok := lru.Get("a")
Expand All @@ -31,7 +31,7 @@ func TestLRU(t *testing.T) {
// BenchmarkLRUCache_Set benchmarks setting values in the cache
func BenchmarkLRUCache_Set(b *testing.B) {
log := logger.New(logger.LevelInfo).WithComponent("benchmark")
cache := NewLRUCache[string](1000, log)
cache := NewLRUCache[string](1000, 1*time.Second, log)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -43,7 +43,7 @@ func BenchmarkLRUCache_Set(b *testing.B) {
// BenchmarkLRUCache_Get benchmarks getting values from the cache
func BenchmarkLRUCache_Get(b *testing.B) {
log := logger.New(logger.LevelInfo).WithComponent("benchmark")
cache := NewLRUCache[string](1000, log)
cache := NewLRUCache[string](1000, 1*time.Second, log)

// Pre-populate cache
for i := 0; i < 1000; i++ {
Expand All @@ -61,7 +61,7 @@ func BenchmarkLRUCache_Get(b *testing.B) {
// BenchmarkLRUCache_GetMiss benchmarks cache misses
func BenchmarkLRUCache_GetMiss(b *testing.B) {
log := logger.New(logger.LevelInfo).WithComponent("benchmark")
cache := NewLRUCache[string](1000, log)
cache := NewLRUCache[string](1000, 1*time.Second, log)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -73,7 +73,7 @@ func BenchmarkLRUCache_GetMiss(b *testing.B) {
// BenchmarkLRUCache_SetGetMixed benchmarks mixed operations
func BenchmarkLRUCache_SetGetMixed(b *testing.B) {
log := logger.New(logger.LevelInfo).WithComponent("benchmark")
cache := NewLRUCache[string](1000, log)
cache := NewLRUCache[string](1000, 1*time.Second, log)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -93,7 +93,7 @@ func BenchmarkLRUCache_DifferentSizes(b *testing.B) {
for _, size := range sizes {
b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
log := logger.New(logger.LevelInfo).WithComponent("benchmark")
cache := NewLRUCache[string](size, log)
cache := NewLRUCache[string](size, 1*time.Second, log)

// Pre-populate
for i := 0; i < size; i++ {
Expand All @@ -112,7 +112,7 @@ func BenchmarkLRUCache_DifferentSizes(b *testing.B) {
// Benchmark concurrent access
func BenchmarkLRUCache_Concurrent(b *testing.B) {
log := logger.New(logger.LevelInfo).WithComponent("benchmark")
cache := NewLRUCache[string](1000, log)
cache := NewLRUCache[string](1000, 1*time.Second, log)

// Pre-populate
for i := 0; i < 1000; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *ResponseCache) ValidateAndSetConfig(conf types.PluginConf) error {
}
p.config = conf
size := conf["size"].(int)
p.cache = cache.NewLRUCache[[]byte](size, p.logger)
p.cache = cache.NewLRUCache[[]byte](size, 1*time.Second, p.logger)
p.logger.Debugf("Initialized ResponseCache plugin with size %d", size)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/route/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewPathMatcher(logger *logger.Logger) *PathMatcher {
}, 0),
prefixRoutes: make(map[string][]*Route),
logger: l,
cache: cache.NewLRUCache[[]*Route](100, l),
cache: cache.NewLRUCache[[]*Route](100, 1*time.Second, l),
}
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (pm *PathMatcher) Clear() {
routes []*Route
}, 0)
pm.prefixRoutes = make(map[string][]*Route)
pm.cache = cache.NewLRUCache[[]*Route](100, pm.logger)
pm.cache = cache.NewLRUCache[[]*Route](100, 1*time.Second, pm.logger)
}

type MethodMatcher struct {
Expand Down