diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 0df941c..5ae0207 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -21,16 +21,3 @@ jobs: - name: Run test run: go test -race ./... - - lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.22 - - - name: golangci-lint - uses: golangci/golangci-lint-action@v2 diff --git a/bench_test.go b/bench_test.go index 013674e..9a5b2b2 100644 --- a/bench_test.go +++ b/bench_test.go @@ -11,7 +11,7 @@ import ( func BenchmarkFetchAndEvictParallel(b *testing.B) { b.StopTimer() - c := evcache.New[uint64, int](0) + c := evcache.New[uint64, int]() index := uint64(0) errFetch := errors.New("error fetching") @@ -21,7 +21,7 @@ func BenchmarkFetchAndEvictParallel(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { if idx := atomic.AddUint64(&index, 1); idx%2 == 0 { - _, _ = c.Fetch(0, 0, func() (int, error) { + _, _ = c.Fetch(0, func() (int, error) { if idx%4 == 0 { return 0, errFetch } @@ -37,14 +37,16 @@ func BenchmarkFetchAndEvictParallel(b *testing.B) { func BenchmarkFetchExists(b *testing.B) { b.StopTimer() - c := evcache.New[uint64, int](0) - c.LoadOrStore(0, 0, 0) + c := evcache.New[uint64, int]() + c.Fetch(0, func() (int, error) { + return 0, nil + }) b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - _, _ = c.Fetch(0, 0, func() (int, error) { + _, _ = c.Fetch(0, func() (int, error) { panic("unexpected fetch callback") }) } @@ -53,13 +55,13 @@ func BenchmarkFetchExists(b *testing.B) { func BenchmarkFetchNotExists(b *testing.B) { b.StopTimer() - c := evcache.New[int, int](0) + c := evcache.New[int, int]() b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - _, _ = c.Fetch(i, 0, func() (int, error) { + _, _ = c.Fetch(i, func() (int, error) { return 0, nil }) } diff --git a/cache.go b/cache.go index 8bce266..90dea91 100644 --- a/cache.go +++ b/cache.go @@ -8,149 +8,141 @@ import ( "time" "github.com/mgnsk/evcache/v3/internal/backend" - "github.com/mgnsk/ringlist" ) -// Cache is an in-memory TTL cache with optional capacity. -type Cache[K comparable, V any] struct { - backend *backend.Backend[K, V] +// Available cache eviction policies. +const ( + // FIFO policy orders recods in FIFO order. + FIFO = backend.FIFO + // LFU policy orders records in LFU order. + LFU = backend.LFU + // LRU policy orders records in LRU order. + LRU = backend.LRU +) + +// Option is a cache configuration option. +type Option interface { + apply(*cacheOptions) } -// New creates an empty cache. -func New[K comparable, V any](capacity int) *Cache[K, V] { - c := &Cache[K, V]{ - backend: backend.NewBackend[K, V](capacity), - } +type cacheOptions struct { + policy string + capacity int + ttl time.Duration + debounce time.Duration +} - runtime.SetFinalizer(c, func(c *Cache[K, V]) { - c.backend.Close() +// WithCapacity option configures the cache with specified capacity. +func WithCapacity(capacity int) Option { + return funcOption(func(opts *cacheOptions) { + opts.capacity = capacity }) - - return c } -// Available cache eviction policies. -const ( - Default = "default" - LRU = "lru" - LFU = "lfu" -) - -// WithPolicy configures the cache with specified eviction policy. -func (c *Cache[K, V]) WithPolicy(policy string) *Cache[K, V] { - if policy != "" { +// WithPolicy option configures the cache with specified eviction policy. +func WithPolicy(policy string) Option { + return funcOption(func(opts *cacheOptions) { switch policy { - case Default: - case LRU: - c.backend.Policy = backend.LRU - case LFU: - c.backend.Policy = backend.LFU + case FIFO, LRU, LFU: + opts.policy = policy + default: panic("evcache: invalid eviction policy '" + policy + "'") } - } - return c + }) } -// Exists returns whether a value in the cache exists for key. -func (c *Cache[K, V]) Exists(key K) bool { - _, ok := c.backend.Load(key) - return ok +// WithTTL option configures the cache with specified default TTL. +func WithTTL(ttl time.Duration) Option { + return funcOption(func(opts *cacheOptions) { + opts.ttl = ttl + }) } -// Get returns the value stored in the cache for key. -func (c *Cache[K, V]) Get(key K) (value V, exists bool) { - if elem, ok := c.backend.Load(key); ok { - return elem.Value.Value, true - } - - var zero V - return zero, false +// WithExpiryDebounce returns an option that configures the cache with specified expiry eviction debounce duration. +func WithExpiryDebounce(debounce time.Duration) Option { + return funcOption(func(opts *cacheOptions) { + opts.debounce = debounce + }) } -// Range calls f for each key and value present in the cache in no particular order or consistency. -// If f returns false, Range stops the iteration. It skips values that are currently being Fetched. -// -// Range is allowed to modify the cache. -func (c *Cache[K, V]) Range(f func(key K, value V) bool) { - c.backend.Range(func(elem *ringlist.Element[backend.Record[K, V]]) bool { - return f(elem.Value.Key, elem.Value.Value) - }) +type funcOption func(*cacheOptions) + +func (o funcOption) apply(opts *cacheOptions) { + o(opts) } -// Len returns the number of keys in the cache. -func (c *Cache[K, V]) Len() int { - return c.backend.Len() +// Cache is an in-memory cache. +type Cache[K comparable, V any] struct { + backend *backend.Backend[K, V] + ttl time.Duration } -// Evict a key and return its value. -func (c *Cache[K, V]) Evict(key K) (value V, ok bool) { - if value, ok := c.backend.Evict(key); ok { - return value, true +// New creates a new empty cache. +func New[K comparable, V any](opt ...Option) *Cache[K, V] { + opts := cacheOptions{ + debounce: time.Second, } - var zero V - return zero, false -} + for _, o := range opt { + o.apply(&opts) + } -// LoadOrStore loads or stores a value for key. If the key is being Fetched, LoadOrStore -// blocks until Fetch returns. -func (c *Cache[K, V]) LoadOrStore(key K, ttl time.Duration, value V) (old V, loaded bool) { - loaded = true + be := &backend.Backend[K, V]{} + be.Init(opts.capacity, opts.policy, opts.ttl, opts.debounce) - v, _ := c.Fetch(key, ttl, func() (V, error) { - loaded = false - return value, nil + c := &Cache[K, V]{ + backend: be, + ttl: opts.ttl, + } + + runtime.SetFinalizer(c, func(c *Cache[K, V]) { + c.backend.Close() }) - return v, loaded + return c } -// MustFetch fetches a value or panics if f panics. -func (c *Cache[K, V]) MustFetch(key K, ttl time.Duration, f func() V) (value V) { - v, _ := c.TryFetch(key, func() (V, time.Duration, error) { - value := f() - return value, ttl, nil - }) - return v +// Keys returns initialized cache keys in the sort order specified by policy. +func (c *Cache[K, V]) Keys() []K { + return c.backend.Keys() } -// Fetch loads or stores a value for key. If a value exists, f will not be called, -// otherwise f will be called to fetch the new value. It panics if f panics. -// Concurrent Fetches for the same key will block each other and return a single result. -func (c *Cache[K, V]) Fetch(key K, ttl time.Duration, f func() (V, error)) (value V, err error) { - return c.TryFetch(key, func() (V, time.Duration, error) { - value, err := f() - return value, ttl, err - }) +// Len returns the number of keys in the cache. +func (c *Cache[K, V]) Len() int { + return c.backend.Len() } -// TryFetch is like Fetch but allows the TTL to be returned alongside the value from callback. -func (c *Cache[K, V]) TryFetch(key K, f func() (V, time.Duration, error)) (value V, err error) { - newElem := c.backend.Reserve(key) - - if elem, loaded := c.backend.LoadOrStore(newElem); loaded { - c.backend.Release(newElem) - return elem.Value.Value, nil - } - - defer func() { - if r := recover(); r != nil { - c.backend.Discard(newElem) +// Load an element from the cache. +func (c *Cache[K, V]) Load(key K) (value V, loaded bool) { + return c.backend.Load(key) +} - panic(r) - } - }() +// Evict a key and return its value. +func (c *Cache[K, V]) Evict(key K) (value V, ok bool) { + return c.backend.Evict(key) +} - value, ttl, err := f() - if err != nil { - c.backend.Discard(newElem) +// Store an element. +func (c *Cache[K, V]) Store(key K, value V) { + c.backend.Store(key, value) +} - var zero V - return zero, err - } +// StoreTTL stores an element with specified TTL. +func (c *Cache[K, V]) StoreTTL(key K, value V, ttl time.Duration) { + c.backend.StoreTTL(key, value, ttl) +} - c.backend.Initialize(newElem, value, ttl) +// Fetch loads or stores a value for key with the default TTL. +// If a value exists, f will not be called, otherwise f will be called to fetch the new value. +// It panics if f panics. Concurrent fetches for the same key will block and return a single result. +func (c *Cache[K, V]) Fetch(key K, f func() (V, error)) (value V, err error) { + return c.backend.Fetch(key, f) +} - return value, nil +// FetchTTL loads or stores a value for key with the specified TTL. +// If a value exists, f will not be called, otherwise f will be called to fetch the new value. +// It panics if f panics. Concurrent fetches for the same key will block and return a single result. +func (c *Cache[K, V]) FetchTTL(key K, f func() (V, time.Duration, error)) (value V, err error) { + return c.backend.FetchTTL(key, f) } diff --git a/cache_test.go b/cache_test.go index 71e5e77..8328588 100644 --- a/cache_test.go +++ b/cache_test.go @@ -1,238 +1,18 @@ package evcache_test import ( - "fmt" "runtime" "testing" - "time" "github.com/mgnsk/evcache/v3" - . "github.com/mgnsk/evcache/v3/internal/testing" ) -func TestLoadOrStoreNotExists(t *testing.T) { - c := evcache.New[string, int](0) - - _, loaded := c.LoadOrStore("key", 0, 1) - Equal(t, loaded, false) - Equal(t, c.Exists("key"), true) - Equal(t, c.Len(), 1) -} - -func TestLoadOrStoreExists(t *testing.T) { - c := evcache.New[string, int](0) - - c.LoadOrStore("key", 0, 1) - - v, loaded := c.LoadOrStore("key", 0, 2) - Equal(t, loaded, true) - Equal(t, v, 1) -} - -func TestFetchCallbackBlocks(t *testing.T) { - c := evcache.New[string, string](0) - - done := make(chan struct{}) - fetchStarted := make(chan struct{}) - defer close(done) - - go func() { - _, _ = c.Fetch("key", 0, func() (string, error) { - close(fetchStarted) - <-done - return "", nil - }) - }() - - <-fetchStarted - - t.Run("non-blocking Exists", func(t *testing.T) { - Equal(t, c.Exists("key"), false) - }) - - t.Run("non-blocking Evict", func(t *testing.T) { - _, ok := c.Evict("key") - Equal(t, ok, false) - }) - - t.Run("non-blocking Get", func(t *testing.T) { - _, ok := c.Get("key") - Equal(t, ok, false) - }) - - t.Run("autoexpiry for other keys works", func(t *testing.T) { - c.LoadOrStore("key1", time.Millisecond, "value1") - - EventuallyTrue(t, func() bool { - return !c.Exists("key1") - }) - }) - - t.Run("non-blocking Range", func(t *testing.T) { - c.LoadOrStore("key1", 0, "value1") - - n := 0 - c.Range(func(key, value string) bool { - if key == "key" { - t.Fatal("expected to skip key") - } - - v, ok := c.Evict(key) - Equal(t, ok, true) - Equal(t, v, value) - Equal(t, c.Len(), 0) - - n++ - return true - }) - - Equal(t, n, 1) - }) -} - -func TestFetchCallbackPanic(t *testing.T) { - c := evcache.New[string, string](0) - - func() { - defer func() { - _ = recover() - }() - - _, _ = c.Fetch("key", 0, func() (string, error) { - panic("failed") - }) - }() - - // Fetching again does not deadlock as the uninitialized value was cleaned up. - v, err := c.Fetch("key", 0, func() (string, error) { - return "new value", nil - }) - - Must(t, err) - Equal(t, v, "new value") -} - -func TestConcurrentFetch(t *testing.T) { - t.Run("returns error", func(t *testing.T) { - c := evcache.New[string, string](0) - - errCh := make(chan error) - fetchStarted := make(chan struct{}) - - go func() { - _, _ = c.Fetch("key", 0, func() (string, error) { - close(fetchStarted) - return "", <-errCh - }) - }() - - <-fetchStarted - errCh <- fmt.Errorf("error fetching value") - - v, err := c.Fetch("key", 0, func() (string, error) { - return "value", nil - }) - - Must(t, err) - Equal(t, v, "value") - }) - - t.Run("returns value", func(t *testing.T) { - c := evcache.New[string, string](0) - - valueCh := make(chan string) - fetchStarted := make(chan struct{}) - - go func() { - _, _ = c.Fetch("key", 0, func() (string, error) { - close(fetchStarted) - return <-valueCh, nil - }) - }() - - <-fetchStarted - valueCh <- "value" - - v, err := c.Fetch("key", 0, func() (string, error) { - return "value1", nil - }) - - Must(t, err) - Equal(t, v, "value") - }) -} - -func TestEvict(t *testing.T) { - c := evcache.New[string, string](0) - - c.LoadOrStore("key", 0, "value") - - v, ok := c.Evict("key") - - Equal(t, ok, true) - Equal(t, v, "value") - Equal(t, c.Exists("key"), false) -} - -func TestOverflow(t *testing.T) { - capacity := 100 - c := evcache.New[int, int](capacity) - - for i := 0; i < 2*capacity; i++ { - c.LoadOrStore(i, 0, 0) - } - - Equal(t, c.Len(), capacity) -} - -func TestExpire(t *testing.T) { - c := evcache.New[int, int](0) - - n := 10 - for i := 0; i < n; i++ { - // Store records in descending TTL order. - d := time.Duration(n-i) * time.Millisecond - _, _ = c.LoadOrStore(i, d, 0) - } - - EventuallyTrue(t, func() bool { - return c.Len() == 0 - }) -} - -func TestExpireEdgeCase(t *testing.T) { - c := evcache.New[int, *string](0) - - v1 := new(string) - - _, loaded := c.LoadOrStore(0, 10*time.Millisecond, v1) - Equal(t, loaded, false) - Equal(t, c.Len(), 1) - - // Wait until v1 expires. - EventuallyTrue(t, func() bool { - return c.Len() == 0 - }) - - // Assert that after v1 expires, v2 with a longer TTL than v1, can expire, - // specifically that backend's GC loop resets earliestExpireAt to zero, - // so that LoadOrStore schedules the GC loop. - v2 := new(string) - _, loaded = c.LoadOrStore(1, 100*time.Millisecond, v2) - Equal(t, loaded, false) - Equal(t, c.Len(), 1) - - EventuallyTrue(t, func() bool { - return c.Len() == 0 - }) -} - func TestCacheGoGC(t *testing.T) { capacity := 1_000_000 - c := evcache.New[int, byte](capacity) + c := evcache.New[int, byte](evcache.WithCapacity(capacity)) for i := range capacity { - c.LoadOrStore(i, 0, 0) + c.Store(i, 0) } var stats runtime.MemStats diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 34f9b49..093079e 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -11,33 +11,43 @@ import ( "github.com/mgnsk/ringlist" ) +// Available cache eviction policies. +const ( + FIFO = "" + LFU = "lfu" + LRU = "lru" +) + // Backend implements cache backend. type Backend[K comparable, V any] struct { - Policy Policy - timer *time.Timer done chan struct{} + timer *time.Timer // timer until the next element expiry. xmap map[K]*ringlist.Element[Record[K, V]] // map of uninitialized and initialized elements list ringlist.List[Record[K, V]] // list of initialized elements - pool sync.Pool // pool of elements + lastGCAt int64 earliestExpireAt int64 cap int + policy string + defaultTTL time.Duration + debounce time.Duration lastLen int numDeleted uint64 gcStarted bool mu sync.Mutex } -// NewBackend creates a new cache backend. -func NewBackend[K comparable, V any](capacity int) *Backend[K, V] { +// Init initializes the cache. +func (b *Backend[K, V]) Init(capacity int, policy string, defaultTTL time.Duration, debounce time.Duration) { t := time.NewTimer(0) <-t.C - return &Backend[K, V]{ - timer: t, - done: make(chan struct{}), - xmap: make(map[K]*ringlist.Element[Record[K, V]], capacity), - cap: capacity, - } + b.timer = t + b.done = make(chan struct{}) + b.xmap = make(map[K]*ringlist.Element[Record[K, V]], capacity) + b.cap = capacity + b.policy = policy + b.defaultTTL = defaultTTL + b.debounce = debounce } // Close stops the backend cleanup loop @@ -54,168 +64,220 @@ func (b *Backend[K, V]) Len() int { return b.list.Len() } -// Reserve a new uninitialized element. -func (b *Backend[K, V]) Reserve(key K) *ringlist.Element[Record[K, V]] { - elem, ok := b.pool.Get().(*ringlist.Element[Record[K, V]]) - if !ok { - elem = ringlist.NewElement(Record[K, V]{}) - } - - elem.Value.Key = key - elem.Value.wg.Add(1) +// Load an initialized element. +func (b *Backend[K, V]) Load(key K) (value V, ok bool) { + b.mu.Lock() + defer b.mu.Unlock() - return elem -} + if elem, ok := b.xmap[key]; ok && elem.Value.state == stateInitialized { + b.hit(elem) + return elem.Value.Value, true + } -// Release a reserved uninitialized element. -func (b *Backend[K, V]) Release(elem *ringlist.Element[Record[K, V]]) { - elem.Value.Key = *new(K) - elem.Value.wg.Done() - b.pool.Put(elem) + var zero V + return zero, false } -// Discard a reserved uninitialized element. -func (b *Backend[K, V]) Discard(elem *ringlist.Element[Record[K, V]]) { +// Keys returns initialized cache keys in no particular order or consistency. +func (b *Backend[K, V]) Keys() []K { b.mu.Lock() - delete(b.xmap, elem.Value.Key) - elem.Value.wg.Done() - b.mu.Unlock() + defer b.mu.Unlock() + + keys := make([]K, 0, len(b.xmap)) + b.list.Do(func(elem *ringlist.Element[Record[K, V]]) bool { + keys = append(keys, elem.Value.Key) + return true + }) + + return keys } -// Initialize a previously stored uninitialized element. -func (b *Backend[K, V]) Initialize(elem *ringlist.Element[Record[K, V]], value V, ttl time.Duration) { +// Evict an element. +func (b *Backend[K, V]) Evict(key K) (V, bool) { b.mu.Lock() defer b.mu.Unlock() - defer elem.Value.wg.Done() - - elem.Value.Value = value - if ttl > 0 { - elem.Value.deadline = time.Now().Add(ttl).UnixNano() - } - - elem.Value.initialized = true - b.list.PushBack(elem) + var zero V - if n := b.overflow(); n > 0 { - b.delete(b.list.Front()) + if elem, ok := b.xmap[key]; ok && elem.Value.state == stateInitialized { + b.delete(elem) + return elem.Value.Value, true } - if elem.Value.deadline > 0 { - b.startGCOnce() - if b.earliestExpireAt == 0 || elem.Value.deadline < b.earliestExpireAt { - b.earliestExpireAt = elem.Value.deadline - b.timer.Reset(ttl) - } - } + return zero, false } -func (b *Backend[K, V]) hit(elem *ringlist.Element[Record[K, V]]) { - switch b.Policy { - case Default: - case LFU: - b.list.MoveAfter(elem, elem.Next()) - case LRU: - b.list.MoveAfter(elem, b.list.Back()) - } +// Store an element. +func (b *Backend[K, V]) Store(key K, value V) { + b.StoreTTL(key, value, b.defaultTTL) } -// Load an initialized element. -func (b *Backend[K, V]) Load(key K) (value *ringlist.Element[Record[K, V]], ok bool) { +// StoreTTL stores an element with specified TTL. +func (b *Backend[K, V]) StoreTTL(key K, value V, ttl time.Duration) { b.mu.Lock() defer b.mu.Unlock() - if elem, ok := b.xmap[key]; ok && elem.Value.initialized { - b.hit(elem) - return elem, true + if elem, ok := b.xmap[key]; ok { + switch elem.Value.state { + case stateInitialized: + b.delete(elem) + + default: + delete(b.xmap, key) + elem.Value.state = stateOverwritten + } } - return nil, false + // Note: unlike Fetch, Store never lets map readers + // see uninitialized elements. + + new := ringlist.NewElement(Record[K, V]{}) + deadline := b.prepareDeadline(ttl) + new.Value.Initialize(key, value, deadline) + b.push(new) + b.xmap[key] = new +} + +// Fetch loads or stores a value for key with the default TTL. +func (b *Backend[K, V]) Fetch(key K, f func() (V, error)) (value V, err error) { + return b.FetchTTL(key, func() (V, time.Duration, error) { + value, err := f() + return value, b.defaultTTL, err + }) } -// LoadOrStore loads or stores an element. -func (b *Backend[K, V]) LoadOrStore(new *ringlist.Element[Record[K, V]]) (old *ringlist.Element[Record[K, V]], loaded bool) { +// FetchTTL loads or stores a value for key with the specified TTL. +func (b *Backend[K, V]) FetchTTL(key K, f func() (V, time.Duration, error)) (value V, err error) { tryLoadStore: b.mu.Lock() - if elem, ok := b.xmap[new.Value.Key]; ok { - if elem.Value.initialized { + if elem, ok := b.xmap[key]; ok { + if elem.Value.state == stateInitialized { b.hit(elem) b.mu.Unlock() - return elem, true + return elem.Value.Value, nil } b.mu.Unlock() elem.Value.wg.Wait() + goto tryLoadStore } - b.xmap[new.Value.Key] = new - + new := ringlist.NewElement(Record[K, V]{}) + new.Value.wg.Add(1) + b.xmap[key] = new b.mu.Unlock() - return new, false -} + defer new.Value.wg.Done() + + defer func() { + if r := recover(); r != nil { + b.mu.Lock() + defer b.mu.Unlock() + + if new.Value.state == stateUninitialized { + delete(b.xmap, key) + } + + panic(r) + } + }() + + value, ttl, err := f() -// Range iterates over initialized cache elements in no particular order or consistency. -func (b *Backend[K, V]) Range(f func(r *ringlist.Element[Record[K, V]]) bool) { b.mu.Lock() - keys := make([]K, 0, len(b.xmap)) - for key := range b.xmap { - keys = append(keys, key) + defer b.mu.Unlock() + + if new.Value.state == stateOverwritten { + // Already deleted from map by Store(). + return value, err } - b.mu.Unlock() - for _, key := range keys { - b.mu.Lock() - elem, ok := b.xmap[key] - initialized := ok && elem.Value.initialized - b.mu.Unlock() - if initialized && !f(elem) { - return + if err != nil { + delete(b.xmap, key) + + return value, err + } + + deadline := b.prepareDeadline(ttl) + b.push(new) + new.Value.Initialize(key, value, deadline) + + return value, nil +} + +func (b *Backend[K, V]) prepareDeadline(ttl time.Duration) int64 { + var deadline int64 + + if ttl > 0 { + b.onceStartCleanupLoop() + + now := time.Now() + deadline = now.Add(ttl).UnixNano() + deadline = b.debounceDeadline(deadline) + + if b.earliestExpireAt == 0 || deadline < b.earliestExpireAt { + b.earliestExpireAt = deadline + after := time.Duration(deadline - now.UnixNano()) + b.timer.Reset(after) } } + + return deadline } -// Evict an element. -func (b *Backend[K, V]) Evict(key K) (V, bool) { - b.mu.Lock() - defer b.mu.Unlock() +func (b *Backend[K, V]) debounceDeadline(deadline int64) int64 { + if until := deadline - b.lastGCAt; until < b.debounce.Nanoseconds() { + deadline += b.debounce.Nanoseconds() - until + } - var zero V + return deadline +} - if elem, ok := b.xmap[key]; ok && elem.Value.initialized { - b.delete(elem) - return elem.Value.Value, true +func (b *Backend[K, V]) hit(elem *ringlist.Element[Record[K, V]]) { + switch b.policy { + case LFU: + b.list.MoveAfter(elem, elem.Next()) + case LRU: + b.list.MoveAfter(elem, b.list.Back()) + default: } +} - return zero, false +func (b *Backend[K, V]) push(elem *ringlist.Element[Record[K, V]]) { + b.list.PushBack(elem) + + if n := b.getOverflow(); n > 0 { + b.delete(b.list.Front()) + } } -// overflow returns the number of overflowed elements. -func (b *Backend[K, V]) overflow() int { +// getOverflow returns the number of overflowed elements. +func (b *Backend[K, V]) getOverflow() int { if b.cap > 0 && b.list.Len() > b.cap { return b.list.Len() - b.cap } return 0 } +// delete an initialized element. func (b *Backend[K, V]) delete(elem *ringlist.Element[Record[K, V]]) { delete(b.xmap, elem.Value.Key) b.list.Remove(elem) b.numDeleted++ if b.lastLen == 0 { - b.lastLen = b.list.Len() + b.lastLen = len(b.xmap) } if b.numDeleted > uint64(b.lastLen)/2 { b.numDeleted = 0 - b.lastLen = b.list.Len() + b.lastLen = len(b.xmap) b.xmap = maps.Clone(b.xmap) } } -func (b *Backend[K, V]) startGCOnce() { +func (b *Backend[K, V]) onceStartCleanupLoop() { if b.gcStarted { return } @@ -223,48 +285,54 @@ func (b *Backend[K, V]) startGCOnce() { b.gcStarted = true go func() { + defer b.timer.Stop() + for { select { case <-b.done: - b.timer.Stop() return + case now := <-b.timer.C: - b.RunGC(now.UnixNano()) + b.doCleanup(now.UnixNano()) } } }() } -func (b *Backend[K, V]) RunGC(now int64) { +func (b *Backend[K, V]) doCleanup(nowNano int64) { b.mu.Lock() defer b.mu.Unlock() var ( + expired []*ringlist.Element[Record[K, V]] earliest int64 - deleted []*ringlist.Element[Record[K, V]] ) b.list.Do(func(elem *ringlist.Element[Record[K, V]]) bool { deadline := elem.Value.deadline - if deadline > 0 && deadline < now { - deleted = append(deleted, elem) - return true - } - - if deadline > 0 && (earliest == 0 || deadline < earliest) { + if deadline > 0 && deadline < nowNano { + expired = append(expired, elem) + } else if deadline > 0 && (earliest == 0 || deadline < earliest) { earliest = deadline } return true }) - for _, elem := range deleted { + for _, elem := range expired { b.delete(elem) } - b.earliestExpireAt = earliest - if earliest > 0 { - b.timer.Reset(time.Duration(earliest - now)) + b.lastGCAt = nowNano + + switch earliest { + case 0: + b.earliestExpireAt = 0 + + default: + earliest = b.debounceDeadline(earliest) + b.earliestExpireAt = earliest + b.timer.Reset(time.Duration(earliest - nowNano)) } } diff --git a/internal/backend/backend_test.go b/internal/backend/backend_test.go index f6ccf06..a2ee705 100644 --- a/internal/backend/backend_test.go +++ b/internal/backend/backend_test.go @@ -1,72 +1,256 @@ package backend_test import ( + "fmt" "maps" "runtime" + "sync" "testing" + "time" "github.com/mgnsk/evcache/v3/internal/backend" . "github.com/mgnsk/evcache/v3/internal/testing" - "github.com/mgnsk/ringlist" ) -func TestReallocUninitializedRecords(t *testing.T) { - size := 1000 +func TestFetchCallbackBlocks(t *testing.T) { + var b backend.Backend[string, string] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) - t.Run("realloc", func(t *testing.T) { - b := backend.NewBackend[int, int](0) + wg := sync.WaitGroup{} + done := make(chan struct{}) + onceDone := sync.OnceFunc(func() { + close(done) + }) + fetchStarted := make(chan struct{}) - t.Log("filling the cache") - for i := 0; i < size; i++ { - elem, _ := b.LoadOrStore(b.Reserve(i)) - b.Initialize(elem, i, 0) - } + t.Cleanup(func() { + onceDone() + wg.Wait() + }) - t.Log("asserting number of initialized elements is correct") - assertCacheLen(t, b, size) + wg.Add(1) + go func() { + defer wg.Done() - var elem *ringlist.Element[backend.Record[int, int]] + _, _ = b.Fetch("key", func() (string, error) { + t.Log("Fetch started") + close(fetchStarted) - t.Log("storing a new uninitialized element") - elem = b.Reserve(size) - _, loaded := b.LoadOrStore(elem) - Equal(t, loaded, false) + <-done - t.Log("asserting number of initialized has not changed") - assertCacheLen(t, b, size) + return "", nil + }) + }() + + <-fetchStarted + + t.Run("assert cache empty", func(t *testing.T) { + Equal(t, b.Len(), 0) + }) - t.Log("evicting half of records") - for i := 0; i < size/2; i++ { - _, ok := b.Evict(i) - Equal(t, ok, true) + t.Run("non-blocking Evict", func(t *testing.T) { + _, ok := b.Evict("key") + Equal(t, ok, false) + }) + + t.Run("autoexpiry for other keys works", func(t *testing.T) { + b.FetchTTL("key1", func() (string, time.Duration, error) { + return "value1", time.Millisecond, nil + }) + + EventuallyTrue(t, func() bool { + return b.Len() == 0 + }) + }) + + t.Run("non-blocking Keys", func(t *testing.T) { + b.Fetch("key1", func() (string, error) { + return "value1", nil + }) + + keys := b.Keys() + Equal(t, len(keys), 1) + Equal(t, keys[0], "key1") + }) + + t.Run("Store discards the key", func(t *testing.T) { + b.Store("key", "value1") + + value, loaded := b.Load("key") + Equal(t, loaded, true) + Equal(t, value, "value1") + + t.Log("assert that overwritten value exists after Fetch returns") + + // TODO: test setup scope + onceDone() + wg.Wait() + + value, loaded = b.Load("key") + Equal(t, loaded, true) + Equal(t, value, "value1") + }) +} + +func TestFetchCallbackPanic(t *testing.T) { + var b backend.Backend[string, string] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + func() { + defer func() { + _ = recover() + }() + + _, _ = b.Fetch("key", func() (string, error) { + panic("failed") + }) + }() + + done := make(chan struct{}) + go func() { + defer close(done) + + v, err := b.Fetch("key", func() (string, error) { + return "new value", nil + }) + + Must(t, err) + Equal(t, v, "new value") + }() + + t.Log("assert that fetching again does not deadlock due to uninitialized value was cleaned up") + + EventuallyTrue(t, func() bool { + select { + case <-done: + return true + default: + return false } + }) +} + +func TestConcurrentFetch(t *testing.T) { + t.Run("returns error", func(t *testing.T) { + var b backend.Backend[string, string] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + errCh := make(chan error) + fetchStarted := make(chan struct{}) + + go func() { + _, _ = b.Fetch("key", func() (string, error) { + close(fetchStarted) + return "", <-errCh + }) + }() + + <-fetchStarted + errCh <- fmt.Errorf("error fetching value") + + v, err := b.Fetch("key", func() (string, error) { + return "value", nil + }) + + Must(t, err) + Equal(t, v, "value") + }) + + t.Run("returns value", func(t *testing.T) { + var b backend.Backend[string, string] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + valueCh := make(chan string) + fetchStarted := make(chan struct{}) - t.Log("asserting number of initialized elements is correct") - assertCacheLen(t, b, size/2) + go func() { + _, _ = b.Fetch("key", func() (string, error) { + close(fetchStarted) + return <-valueCh, nil + }) + }() - // Initialize the element. - b.Initialize(elem, 0, 0) + <-fetchStarted + valueCh <- "value" - t.Log("asserting number of initialized elements has changed") - assertCacheLen(t, b, size/2+1) + v, err := b.Fetch("key", func() (string, error) { + return "value1", nil + }) + + Must(t, err) + Equal(t, v, "value") }) } -func TestDeleteUninitializedElement(t *testing.T) { - b := backend.NewBackend[int, int](0) +func TestExpiryLoopDebounce(t *testing.T) { + debounce := 100 * time.Millisecond + n := 10 - t.Log("storing a new uninitialized element") - elem := b.Reserve(0) - _, loaded := b.LoadOrStore(elem) - Equal(t, loaded, false) - assertCacheLen(t, b, 0) + getHalfTimeLength := func(b *backend.Backend[int, int]) int { + itemTTL := debounce / time.Duration(n) - _, evicted := b.Evict(0) - Equal(t, evicted, false) - assertCacheLen(t, b, 0) + // Store elements with 10, 20, 30, ... ms TTL. + for i := 0; i < n; i++ { + b.StoreTTL(i, 0, time.Duration(i+1)*itemTTL) + } + + time.Sleep(debounce / 2) + + return b.Len() + } + + var debounceDisabledLen int + { + // TODO: t.Cleanup(b.Close) everywhere + var b backend.Backend[int, int] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + debounceDisabledLen = getHalfTimeLength(&b) + } + + var debounceEnabledLen int + { + var b backend.Backend[int, int] + b.Init(0, "", 0, 100*time.Millisecond) + t.Cleanup(b.Close) + + debounceEnabledLen = getHalfTimeLength(&b) + } - b.Discard(elem) - assertCacheLen(t, b, 0) + t.Log("assert that debounce disabled expires elements earlier than debounce enabled") + Equal(t, debounceDisabledLen < debounceEnabledLen, true) +} + +func TestEvict(t *testing.T) { + var b backend.Backend[string, string] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + b.Store("key", "value") + Equal(t, b.Len(), 1) + + v, ok := b.Evict("key") + Equal(t, ok, true) + Equal(t, v, "value") + Equal(t, b.Len(), 0) +} + +func TestOverflow(t *testing.T) { + capacity := 100 + + var b backend.Backend[int, int] + b.Init(capacity, "", 0, 0) + t.Cleanup(b.Close) + + for i := 0; i < 2*capacity; i++ { + b.Store(i, 0) + Equal(t, b.Len() <= capacity, true) + } } func TestOverflowEvictionOrder(t *testing.T) { @@ -75,12 +259,14 @@ func TestOverflowEvictionOrder(t *testing.T) { t.Run("insert order", func(t *testing.T) { t.Parallel() - b := backend.NewBackend[int, int](capacity) + var b backend.Backend[int, int] + b.Init(capacity, "", 0, 0) + t.Cleanup(b.Close) - fillCache(t, b, capacity) + fillCache(t, &b, capacity) // Overflow the cache and catch evicted keys. - keys := overflowAndCollectKeys(t, b, capacity) + keys := overflowAndCollectKeys(t, &b, capacity) Equal(t, len(keys), capacity) Equal(t, keys, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) @@ -89,16 +275,17 @@ func TestOverflowEvictionOrder(t *testing.T) { t.Run("LFU order", func(t *testing.T) { t.Parallel() - b := backend.NewBackend[int, int](capacity) - b.Policy = backend.LFU + var b backend.Backend[int, int] + b.Init(capacity, backend.LFU, 0, 0) + t.Cleanup(b.Close) - fillCache(t, b, capacity) + fillCache(t, &b, capacity) t.Log("creating LFU test usage") - createLFUTestUsage(t, b, capacity) + createLFUTestUsage(t, &b, capacity) // Overflow the cache and catch evicted keys. - keys := overflowAndCollectKeys(t, b, capacity) + keys := overflowAndCollectKeys(t, &b, capacity) Equal(t, len(keys), capacity) Equal(t, keys, []int{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}) @@ -107,22 +294,66 @@ func TestOverflowEvictionOrder(t *testing.T) { t.Run("LRU order", func(t *testing.T) { t.Parallel() - b := backend.NewBackend[int, int](capacity) - b.Policy = backend.LRU + var b backend.Backend[int, int] + b.Init(capacity, backend.LRU, 0, 0) + t.Cleanup(b.Close) - fillCache(t, b, capacity) + fillCache(t, &b, capacity) t.Log("creating LRU test usage") - createLRUTestUsage(t, b, capacity) + createLRUTestUsage(t, &b, capacity) // Overflow the cache and catch evicted keys. - keys := overflowAndCollectKeys(t, b, capacity) + keys := overflowAndCollectKeys(t, &b, capacity) Equal(t, len(keys), capacity) Equal(t, keys, []int{9, 8, 7, 6, 5, 4, 3, 2, 1, 0}) }) } +func TestExpire(t *testing.T) { + var b backend.Backend[int, int] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + n := 10 + for i := 0; i < n; i++ { + // Store records in descending TTL order. + b.StoreTTL(i, 0, time.Duration(n-1)*time.Millisecond) + } + + EventuallyTrue(t, func() bool { + return b.Len() == 0 + }) +} + +func TestExpireEdgeCase(t *testing.T) { + var b backend.Backend[int, *string] + b.Init(0, "", 0, 0) + t.Cleanup(b.Close) + + v1 := new(string) + + b.StoreTTL(0, v1, 10*time.Millisecond) + Equal(t, b.Len(), 1) + + // Wait until v1 expires. + EventuallyTrue(t, func() bool { + return b.Len() == 0 + }) + + // Assert that after v1 expires, v2 with a longer TTL than v1, can expire, + // specifically that backend's GC loop resets earliestExpireAt to zero, + // so that LoadOrStore schedules the GC loop. + v2 := new(string) + b.StoreTTL(1, v2, 100*time.Millisecond) + Equal(t, b.Len(), 1) + + EventuallyTrue(t, func() bool { + return b.Len() == 0 + }) +} + func TestMapShrink(t *testing.T) { n := 100000 @@ -181,30 +412,15 @@ func getMemStats() uint64 { return m.Alloc } -func getMapRangeCount[K comparable, V any](b *backend.Backend[K, V]) int { - n := 0 - - b.Range(func(*ringlist.Element[backend.Record[K, V]]) bool { - n++ - return true - }) - - return n -} - func assertCacheLen[K comparable, V any](t *testing.T, be *backend.Backend[K, V], n int) { t.Helper() Equal(t, be.Len(), n) - Equal(t, getMapRangeCount(be), n) } func fillCache[V any](t *testing.T, b *backend.Backend[int, V], capacity int) { for i := 0; i < capacity; i++ { - elem := b.Reserve(i) - _, loaded := b.LoadOrStore(elem) - Equal(t, loaded, false) - b.Initialize(elem, *new(V), 0) + b.Store(i, *new(V)) } assertCacheLen(t, b, capacity) @@ -247,27 +463,23 @@ func overflowAndCollectKeys(t *testing.T, b *backend.Backend[int, int], capacity // Collect all cache keys, then overflow the cache and observe which key disappears. t.Log("collecting current cache state") oldKeys := map[int]struct{}{} - b.Range(func(elem *ringlist.Element[backend.Record[int, int]]) bool { - oldKeys[elem.Value.Key] = struct{}{} - return true - }) + for _, key := range b.Keys() { + oldKeys[key] = struct{}{} + } Equal(t, len(oldKeys), capacity) t.Logf("store: %v", i) - elem := b.Reserve(i) - _, loaded := b.LoadOrStore(elem) - Equal(t, loaded, false) - b.Initialize(elem, 0, 0) + b.Store(i, 0) t.Logf("expected overflowed element was evicted") assertCacheLen(t, b, capacity) t.Log("collecting new cache state") newKeys := map[int]struct{}{} - b.Range(func(elem *ringlist.Element[backend.Record[int, int]]) bool { - newKeys[elem.Value.Key] = struct{}{} - return true - }) + for _, key := range b.Keys() { + newKeys[key] = struct{}{} + } + Equal(t, len(oldKeys), capacity) t.Log("determining the evicted key") var missingKeys []int diff --git a/internal/backend/benchmark_test.go b/internal/backend/benchmark_test.go index 87c7473..5ba12b7 100644 --- a/internal/backend/benchmark_test.go +++ b/internal/backend/benchmark_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/mgnsk/evcache/v3/internal/backend" - . "github.com/mgnsk/evcache/v3/internal/testing" ) func BenchmarkSliceLoop(b *testing.B) { @@ -141,18 +140,17 @@ func BenchmarkBackendGC(b *testing.B) { } { b.Run(fmt.Sprint(n), newTimePerElementBench( func() (*backend.Backend[int, int], int) { - be := backend.NewBackend[int, int](n) + var be backend.Backend[int, int] + be.Init(n, "", 0, 0) for i := 0; i < n; i++ { - elem := be.Reserve(i) - _, loaded := be.LoadOrStore(elem) - Equal(b, loaded, false) - be.Initialize(elem, 0, 0) + be.Store(i, 0) } - return be, be.Len() + return &be, be.Len() }, func(be *backend.Backend[int, int]) { - be.RunGC(0) + keys := be.Keys() + runtime.KeepAlive(keys) }, )) } @@ -167,20 +165,18 @@ func BenchmarkBackendGCLFU(b *testing.B) { } { b.Run(fmt.Sprint(n), newTimePerElementBench( func() (*backend.Backend[int, int], int) { - be := backend.NewBackend[int, int](n) - be.Policy = backend.LFU + var be backend.Backend[int, int] + be.Init(n, backend.LFU, 0, 0) for i := 0; i < n; i++ { - elem := be.Reserve(i) - _, loaded := be.LoadOrStore(elem) - Equal(b, loaded, false) - be.Initialize(elem, 0, 0) + be.Store(i, 0) } - return be, be.Len() + return &be, be.Len() }, func(be *backend.Backend[int, int]) { - be.RunGC(0) + keys := be.Keys() + runtime.KeepAlive(keys) }, )) } @@ -195,20 +191,18 @@ func BenchmarkBackendGCLRU(b *testing.B) { } { b.Run(fmt.Sprint(n), newTimePerElementBench( func() (*backend.Backend[int, int], int) { - be := backend.NewBackend[int, int](n) - be.Policy = backend.LRU + var be backend.Backend[int, int] + be.Init(n, backend.LRU, 0, 0) for i := 0; i < n; i++ { - elem := be.Reserve(i) - _, loaded := be.LoadOrStore(elem) - Equal(b, loaded, false) - be.Initialize(elem, 0, 0) + be.Store(i, 0) } - return be, be.Len() + return &be, be.Len() }, func(be *backend.Backend[int, int]) { - be.RunGC(0) + keys := be.Keys() + runtime.KeepAlive(keys) }, )) } diff --git a/internal/backend/element.go b/internal/backend/element.go deleted file mode 100644 index ada5383..0000000 --- a/internal/backend/element.go +++ /dev/null @@ -1,14 +0,0 @@ -package backend - -import ( - "sync" -) - -// Record is a cache record. -type Record[K comparable, V any] struct { - Key K - Value V - deadline int64 - initialized bool - wg sync.WaitGroup -} diff --git a/internal/backend/policy.go b/internal/backend/policy.go deleted file mode 100644 index 4be4449..0000000 --- a/internal/backend/policy.go +++ /dev/null @@ -1,11 +0,0 @@ -package backend - -// Policy is a cache eviction policy. -type Policy int - -// Available cache eviction policies. -const ( - Default Policy = iota - LFU - LRU -) diff --git a/internal/backend/record.go b/internal/backend/record.go new file mode 100644 index 0000000..3dae2dd --- /dev/null +++ b/internal/backend/record.go @@ -0,0 +1,28 @@ +package backend + +import ( + "sync" +) + +const ( + stateUninitialized = iota + stateInitialized + stateOverwritten +) + +// Record is a cache record. +type Record[K comparable, V any] struct { + Key K + Value V + deadline int64 + state int + wg sync.WaitGroup +} + +// Initialize the record with a value. +func (r *Record[K, V]) Initialize(key K, value V, deadline int64) { + r.Key = key + r.Value = value + r.deadline = deadline + r.state = stateInitialized +}