Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache expiry time #257

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func persistStoredFieldValues(fieldID int,
func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, dictLocs []uint64,
sectionsIndexOffset uint64) (*SegmentBase, error) {
sectionsIndexOffset uint64, config map[string]interface{}) (*SegmentBase, error) {
sb := &SegmentBase{
mem: mem,
memCRC: memCRC,
Expand All @@ -175,6 +175,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
dictLocs: dictLocs,
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
config: config,
}
sb.updateSize()

Expand Down
10 changes: 5 additions & 5 deletions build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,28 @@ func buildTestSegment() (*SegmentBase, uint64, error) {
doc,
}

seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode)
seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil)
return seg.(*SegmentBase), size, err
}

func buildTestSegmentMulti() (*SegmentBase, uint64, error) {
results := buildTestAnalysisResultsMulti()

seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode)
seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil)
return seg.(*SegmentBase), size, err
}

func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, uint64, error) {
results := buildTestAnalysisResultsMulti()

seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor)
seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil)
return seg.(*SegmentBase), size, err
}

func buildTestSegmentMultiWithDifferentFields(includeDocA, includeDocB bool) (*SegmentBase, uint64, error) {
results := buildTestAnalysisResultsMultiWithDifferentFields(includeDocA, includeDocB)

seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode)
seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil)
return seg.(*SegmentBase), size, err
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) (
doc,
}

sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor)
sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil)

return sb.(*SegmentBase), fields, err
}
2 changes: 1 addition & 1 deletion dict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func buildTestSegmentForDict() (*SegmentBase, uint64, error) {
doc,
}

seg, size, err := zapPlugin.newWithChunkMode(results, 1024)
seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil)
return seg.(*SegmentBase), size, err
}

Expand Down
42 changes: 33 additions & 9 deletions faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type vectorIndexCache struct {
closeCh chan struct{}
m sync.RWMutex
cache map[uint16]*cacheEntry

// expiry time for a cache entry during idle period
cacheExpiryTime time.Duration
lastAccessTime time.Time
}

func (vc *vectorIndexCache) Clear() {
Expand All @@ -52,12 +56,14 @@ func (vc *vectorIndexCache) Clear() {
vc.m.Unlock()
}

func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) {
func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
except *roaring.Bitmap, cacheExpiryTime time.Duration) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
vecIDsToExclude []int64, err error) {
var found bool
index, vecDocIDMap, vecIDsToExclude, found = vc.loadFromCache(fieldID, except)
if !found {
index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except)
index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except, cacheExpiryTime)
}
return index, vecDocIDMap, vecIDsToExclude, err
}
Expand All @@ -78,8 +84,10 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, except *roaring.Bitmap
return index, vecDocIDMap, vecIDsToExclude, true
}

func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) {
func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte,
except *roaring.Bitmap, cacheExpiryTime time.Duration) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
vecIDsToExclude []int64, err error) {
vc.m.Lock()
defer vc.m.Unlock()

Expand Down Expand Up @@ -123,15 +131,16 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r
return nil, nil, nil, err
}

vc.insertLOCKED(fieldID, index, vecDocIDMap)
vc.insertLOCKED(fieldID, index, vecDocIDMap, cacheExpiryTime)
return index, vecDocIDMap, vecIDsToExclude, nil
}

func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32) {
func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, index *faiss.IndexImpl,
vecDocIDMap map[int64]uint32, cacheExpiryTime time.Duration) {
// the first time we've hit the cache, try to spawn a monitoring routine
// which will reconcile the moving averages for all the fields being hit
if len(vc.cache) == 0 {
vc.cacheExpiryTime = cacheExpiryTime
go vc.monitor()
}

Expand Down Expand Up @@ -174,16 +183,31 @@ func (vc *vectorIndexCache) cleanup() bool {
entry.tracker.add(sample)

refCount := atomic.LoadInt64(&entry.refs)

// by default we don't consider expiry time, and the
// cacheExpiryTime is set iff its something that's been configured by user.
cacheEntryExpired := vc.cacheExpiryTime == 0 ||
(vc.cacheExpiryTime > 0 && time.Since(vc.lastAccessTime) > vc.cacheExpiryTime)

// the comparison threshold as of now is (1 - a). mathematically it
// means that there is only 1 query per second on average as per history.
// and in the current second, there were no queries performed against
// this index.
if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 {
if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 && cacheEntryExpired {
atomic.StoreUint64(&entry.tracker.sample, 0)
delete(vc.cache, fieldIDPlus1)
entry.close()
continue
}

// indicates that there are some live queries in the system so update the
// last access time which would be used to decide whether we cleanup the
// cache or not.
// considering the last access time which is delimited by the periodicity
// of the cache.
if refCount > 0 {
vc.lastAccessTime = time.Now()
}
atomic.StoreUint64(&entry.tracker.sample, 0)
}

Expand Down
10 changes: 9 additions & 1 deletion faiss_vector_posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"math"
"reflect"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/RoaringBitmap/roaring/roaring64"
Expand Down Expand Up @@ -372,8 +373,15 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap
pos += n
}

var cacheExpiryTime time.Duration
if v, ok := sb.config["cacheExpiryTime"]; ok {
if v, err := parseToTimeDuration(v); err == nil {
cacheExpiryTime = v
}
}

vecIndex, vecDocIDMap, vectorIDsToExclude, err =
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except)
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except, cacheExpiryTime)

if vecIndex != nil {
vecIndexSize = vecIndex.Size()
Expand Down
4 changes: 2 additions & 2 deletions merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int)

_ = os.RemoveAll("/tmp/" + fname)

emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024)
emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -644,7 +644,7 @@ func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, uint64, error)
doc2,
}

seg, size, err := zapPlugin.newWithChunkMode(results, 1024)
seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil)
return seg.(*SegmentBase), size, err
}

Expand Down
11 changes: 8 additions & 3 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,16 @@ var ValidateDocFields = func(field index.Field) error {
// New creates an in-memory zap-encoded SegmentBase from a set of Documents
func (z *ZapPlugin) New(results []index.Document) (
segment.Segment, uint64, error) {
return z.newWithChunkMode(results, DefaultChunkMode)
return z.newWithChunkMode(results, DefaultChunkMode, nil)
}

func (z *ZapPlugin) NewEx(results []index.Document, config map[string]interface{}) (
segment.Segment, uint64, error) {
return z.newWithChunkMode(results, DefaultChunkMode, config)
}

func (*ZapPlugin) newWithChunkMode(results []index.Document,
chunkMode uint32) (segment.Segment, uint64, error) {
chunkMode uint32, config map[string]interface{}) (segment.Segment, uint64, error) {
s := interimPool.Get().(*interim)

var br bytes.Buffer
Expand All @@ -73,7 +78,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,

sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
s.FieldsMap, s.FieldsInv, uint64(len(results)),
storedIndexOffset, dictOffsets, sectionsIndexOffset)
storedIndexOffset, dictOffsets, sectionsIndexOffset, config)

// get the bytes written before the interim's reset() call
// write it to the newly formed segment base.
Expand Down
15 changes: 14 additions & 1 deletion segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ func init() {
reflectStaticSizeSegmentBase = int(unsafe.Sizeof(sb))
}

// Open returns a zap impl of a segment along with keeping some config values in
// mind during the segment's lifetime.
func (z *ZapPlugin) OpenEx(path string, config map[string]interface{}) (segment.Segment, error) {
return z.open(path, config)
}

// Open returns a zap impl of a segment
func (*ZapPlugin) Open(path string) (segment.Segment, error) {
func (z *ZapPlugin) Open(path string) (segment.Segment, error) {
return z.open(path, nil)
}

func (*ZapPlugin) open(path string, config map[string]interface{}) (segment.Segment, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
Expand All @@ -57,6 +67,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) {
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
config: config,
},
f: f,
mm: mm,
Expand Down Expand Up @@ -108,6 +119,8 @@ type SegmentBase struct {
bytesRead uint64
bytesWritten uint64

config map[string]interface{} // config for the segment

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST

Expand Down
29 changes: 29 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2023 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap

import (
"fmt"
"time"
)

func parseToTimeDuration(i interface{}) (time.Duration, error) {
switch v := i.(type) {
case string:
return time.ParseDuration(v)

default:
return 0, fmt.Errorf("expects a duration string")
}
}
Loading