Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new experimental gc friendly flatten cache #712

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 265 additions & 6 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package rueidis

import (
"context"
"runtime"
"sync"
"time"
"unsafe"
)

// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
Expand Down Expand Up @@ -99,7 +101,7 @@ func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) {
val.setExpireAt(sxat)
}
a.store.Set(key+cmd, val)
flight.set(val, nil)
flight.setVal(val)
entries[cmd] = nil
}
a.mu.Unlock()
Expand All @@ -110,7 +112,7 @@ func (a *adapter) Cancel(key, cmd string, err error) {
a.mu.Lock()
entries := a.flights[key]
if flight, ok := entries[cmd].(*adapterEntry); ok {
flight.set(RedisMessage{}, err)
flight.setErr(err)
entries[cmd] = nil
}
a.mu.Unlock()
Expand Down Expand Up @@ -152,7 +154,7 @@ func (a *adapter) Close(err error) {
for _, entries := range flights {
for _, e := range entries {
if e != nil {
e.(*adapterEntry).set(RedisMessage{}, err)
e.(*adapterEntry).setErr(err)
}
}
}
Expand All @@ -165,16 +167,273 @@ type adapterEntry struct {
xat int64
}

func (a *adapterEntry) set(val RedisMessage, err error) {
a.err, a.val = err, val
func (a *adapterEntry) setVal(val RedisMessage) {
a.val = val
close(a.ch)
}

func (a *adapterEntry) setErr(err error) {
a.err = err
close(a.ch)
}

func (a *adapterEntry) Wait(ctx context.Context) (RedisMessage, error) {
ctxCh := ctx.Done()
if ctxCh == nil {
<-a.ch
return a.val, a.err
}
select {
case <-ctx.Done():
case <-ctxCh:
return RedisMessage{}, ctx.Err()
case <-a.ch:
return a.val, a.err
}
}

type flatentry struct {
ovfl *flatentry
next unsafe.Pointer
prev unsafe.Pointer
cmd string
key string
val []byte
ttl int64
size int64
mark int64
mu sync.RWMutex
}

func (f *flatentry) insert(e *flatentry) {
f.size += e.size
f.mu.Lock()
e.ovfl = f.ovfl
f.ovfl = e
f.mu.Unlock()
}

func (f *flatentry) find(cmd string, ts int64) ([]byte, bool) {
for next := f; next != nil; {
if ts >= next.ttl {
return nil, true
}
if cmd == next.cmd {
return next.val, false
}
next.mu.RLock()
ovfl := next.ovfl
next.mu.RUnlock()
next = ovfl
}
return nil, false
}

const lrBatchSize = 64
const flattEntrySize = unsafe.Sizeof(flatentry{})

type lrBatch struct {
m map[*flatentry]bool
}

func NewFlattenCache(limit int) CacheStore {
f := &flatten{
flights: make(map[string]*adapterEntry),
cache: make(map[string]*flatentry),
head: &flatentry{},
tail: &flatentry{},
size: 0,
limit: int64(limit),
}
f.head.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(f.head)
f.lrup = sync.Pool{New: func() any {
b := &lrBatch{m: make(map[*flatentry]bool, lrBatchSize)}
runtime.SetFinalizer(b, func(b *lrBatch) {
if len(b.m) >= 0 {
f.mu.Lock()
f.llTailBatch(b)
f.mu.Unlock()
}
})
return b
}}
return f
}

type flatten struct {
flights map[string]*adapterEntry
cache map[string]*flatentry
head *flatentry
tail *flatentry
lrup sync.Pool
mark int64
size int64
limit int64
mu sync.RWMutex
}

func (f *flatten) llAdd(e *flatentry) {
e.mark = f.mark
e.prev = f.tail.prev
e.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(e)
(*flatentry)(e.prev).next = unsafe.Pointer(e)
}

func (f *flatten) llDel(e *flatentry) {
(*flatentry)(e.prev).next = e.next
(*flatentry)(e.next).prev = e.prev
e.mark = 0
}

func (f *flatten) llTail(e *flatentry) {
f.llDel(e)
f.llAdd(e)
}

func (f *flatten) llTailBatch(b *lrBatch) {
for e, expired := range b.m {
if e.mark == f.mark {
if expired {
f.remove(e)
} else {
f.llTail(e)
}
}
}
clear(b.m)
}

func (f *flatten) remove(e *flatentry) {
f.size -= e.size
f.llDel(e)
delete(f.cache, e.key)
}

func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (RedisMessage, CacheEntry) {
f.mu.RLock()
e := f.cache[key]
f.mu.RUnlock()
ts := now.UnixMilli()
if v, expired := e.find(cmd, ts); v != nil || expired {
batch := f.lrup.Get().(*lrBatch)
batch.m[e] = expired
if len(batch.m) >= lrBatchSize {
f.mu.Lock()
f.llTailBatch(batch)
f.mu.Unlock()
}
f.lrup.Put(batch)
if v != nil {
var ret RedisMessage
_ = ret.CacheUnmarshalView(v)
return ret, nil
}
}
fk := key + cmd
f.mu.RLock()
af := f.flights[fk]
f.mu.RUnlock()
if af != nil {
return RedisMessage{}, af
}
f.mu.Lock()
e = f.cache[key]
v, expired := e.find(cmd, ts)
if v != nil {
f.llTail(e)
f.mu.Unlock()
var ret RedisMessage
_ = ret.CacheUnmarshalView(v)
return ret, nil
}
defer f.mu.Unlock()
if expired {
f.remove(e)
}
if af = f.flights[fk]; af != nil {
return RedisMessage{}, af
}
if f.flights != nil {
f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()}
}
return RedisMessage{}, nil
}

func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) {
fk := key + cmd
f.mu.RLock()
af := f.flights[fk]
f.mu.RUnlock()
if af != nil {
sxat = val.getExpireAt()
if af.xat < sxat || sxat == 0 {
sxat = af.xat
val.setExpireAt(sxat)
}
bs := val.CacheMarshal(nil)
fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize)}
f.mu.Lock()
if f.flights != nil {
delete(f.flights, fk)
f.size += fe.size
for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); {
e := (*flatentry)(ep)
f.remove(e)
ep = e.next
}
if e := f.cache[key]; e == nil {
fe.key = key
f.cache[key] = fe
f.llAdd(fe)
} else {
e.insert(fe)
}
}
f.mu.Unlock()
af.setVal(val)
}
return sxat
}

func (f *flatten) Cancel(key, cmd string, err error) {
fk := key + cmd
f.mu.Lock()
defer f.mu.Unlock()
if af := f.flights[fk]; af != nil {
delete(f.flights, fk)
af.setErr(err)
}
}

func (f *flatten) Delete(keys []RedisMessage) {
f.mu.Lock()
defer f.mu.Unlock()
if keys == nil {
f.cache = make(map[string]*flatentry, len(f.cache))
f.head.next = unsafe.Pointer(f.tail)
f.tail.prev = unsafe.Pointer(f.head)
f.mark++
f.size = 0
} else {
for _, k := range keys {
if e := f.cache[k.string]; e != nil {
f.remove(e)
}
}
}
}

func (f *flatten) Close(err error) {
f.mu.Lock()
flights := f.flights
f.flights = nil
f.cache = nil
f.tail = nil
f.head = nil
f.mark++
f.mu.Unlock()
for _, entry := range flights {
entry.setErr(err)
}
}
5 changes: 5 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func TestCacheStore(t *testing.T) {
return NewSimpleCacheAdapter(&simple{store: map[string]RedisMessage{}})
})
})
t.Run("FlattenCache", func(t *testing.T) {
test(t, func() CacheStore {
return NewFlattenCache(DefaultCacheBytes)
})
})
}

type simple struct {
Expand Down
Loading