Skip to content

Commit

Permalink
enhance: cache store prefetch check exist
Browse files Browse the repository at this point in the history
  • Loading branch information
sjp00556 committed Sep 3, 2024
1 parent 44eebbc commit 1d90b18
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,9 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
if size == 0 || size > store.conf.BlockSize {
return
}
if store.bcache.exist(key) {
return
}
p := NewOffPage(size)
defer p.Release()
_ = store.load(key, p, true, true)
Expand Down
31 changes: 31 additions & 0 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,28 @@ func (cache *cacheStore) load(key string) (ReadCloser, error) {
return f, err
}

func (cache *cacheStore) exist(key string) bool {
cache.Lock()
if _, ok := cache.pages[key]; ok {
cache.Unlock()
return true
}
if cache.scanned {
k := cache.getCacheKey(key)
exist := cache.keys[k].atime > 0
cache.Unlock()
return exist
}
cache.Unlock()

var err error
err = cache.checkErr(func() error {
_, err = os.Stat(cache.cachePath(key))
return err
})
return err == nil
}

func (cache *cacheStore) cachePath(key string) string {
return filepath.Join(cache.dir, cacheDir, key)
}
Expand Down Expand Up @@ -1013,6 +1035,7 @@ type CacheManager interface {
cache(key string, p *Page, force, dropCache bool)
remove(key string, staging bool)
load(key string) (ReadCloser, error)
exist(key string) bool
uploaded(key string, size int)
stage(key string, data []byte, keepCache bool) (string, error)
removeStage(key string) error
Expand Down Expand Up @@ -1192,6 +1215,14 @@ func (m *cacheManager) load(key string) (ReadCloser, error) {
return r, err
}

func (m *cacheManager) exist(key string) bool {
store := m.getStore(key)
if store == nil {
return false
}
return store.exist(key)
}

func (m *cacheManager) remove(key string, staging bool) {
store := m.getStore(key)
if store != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ func TestCacheManager(t *testing.T) {
defer p1.Release()
m.cache(k1, p1, true, false)

exist := m.exist(k1)
require.True(t, exist)

s1 := m.getStore(k1)
require.NotNil(t, s1)

Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/mem_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (c *memcache) delete(key string, p *Page) {
}

func (c *memcache) remove(key string, staging bool) {
if c.capacity == 0 {
return
}
c.Lock()
defer c.Unlock()
if item, ok := c.pages[key]; ok {
Expand All @@ -112,6 +115,9 @@ func (c *memcache) remove(key string, staging bool) {
}

func (c *memcache) load(key string) (ReadCloser, error) {
if c.capacity == 0 {
return nil, errors.New("not found")
}
c.Lock()
defer c.Unlock()
if item, ok := c.pages[key]; ok {
Expand All @@ -121,6 +127,16 @@ func (c *memcache) load(key string) (ReadCloser, error) {
return nil, errors.New("not found")
}

func (c *memcache) exist(key string) bool {
if c.capacity == 0 {
return false
}
c.Lock()
_, ok := c.pages[key]
c.Unlock()
return ok
}

// locked
func (c *memcache) cleanup() {
var cnt int
Expand Down

0 comments on commit 1d90b18

Please sign in to comment.