diff --git a/faiss_vector_cache.go b/faiss_vector_cache.go index 893da2d5..121f5cde 100644 --- a/faiss_vector_cache.go +++ b/faiss_vector_cache.go @@ -38,6 +38,10 @@ type vectorIndexCache struct { closeCh chan struct{} m sync.RWMutex cache map[uint16]*cacheEntry + + // expiry time for a cache entry during idle period + cacheExpiryTime time.Duration + lastAccessTime time.Time } func (vc *vectorIndexCache) Clear() { @@ -52,12 +56,14 @@ func (vc *vectorIndexCache) Clear() { vc.m.Unlock() } -func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) { +func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, + except *roaring.Bitmap, cacheExpiryTime time.Duration) ( + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + vecIDsToExclude []int64, err error) { var found bool index, vecDocIDMap, vecIDsToExclude, found = vc.loadFromCache(fieldID, except) if !found { - index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except) + index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except, cacheExpiryTime) } return index, vecDocIDMap, vecIDsToExclude, err } @@ -78,8 +84,10 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, except *roaring.Bitmap return index, vecDocIDMap, vecIDsToExclude, true } -func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) { +func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, + except *roaring.Bitmap, cacheExpiryTime time.Duration) ( + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + vecIDsToExclude []int64, err error) { vc.m.Lock() defer vc.m.Unlock() @@ -123,15 +131,16 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r return nil, nil, nil, err } - vc.insertLOCKED(fieldID, index, vecDocIDMap) + vc.insertLOCKED(fieldID, index, vecDocIDMap, cacheExpiryTime) return index, vecDocIDMap, vecIDsToExclude, nil } -func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32) { +func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, index *faiss.IndexImpl, + vecDocIDMap map[int64]uint32, cacheExpiryTime time.Duration) { // the first time we've hit the cache, try to spawn a monitoring routine // which will reconcile the moving averages for all the fields being hit if len(vc.cache) == 0 { + vc.cacheExpiryTime = cacheExpiryTime go vc.monitor() } @@ -174,16 +183,31 @@ func (vc *vectorIndexCache) cleanup() bool { entry.tracker.add(sample) refCount := atomic.LoadInt64(&entry.refs) + + // true indicates by default we don't consider expiry time, and the + // cacheExpiryTime is set iff its something that's been configured by user. + cacheEntryExpired := true || + vc.cacheExpiryTime > 0 && time.Since(vc.lastAccessTime) > vc.cacheExpiryTime + // the comparison threshold as of now is (1 - a). mathematically it // means that there is only 1 query per second on average as per history. // and in the current second, there were no queries performed against // this index. - if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 { + if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 && cacheEntryExpired { atomic.StoreUint64(&entry.tracker.sample, 0) delete(vc.cache, fieldIDPlus1) entry.close() continue } + + // indicates that there are some live queries in the system so update the + // last access time which would be used to decide whether we cleanup the + // cache or not. + // considering the last access time which is delimited by the periodicity + // of the cache. + if refCount > 0 { + vc.lastAccessTime = time.Now() + } atomic.StoreUint64(&entry.tracker.sample, 0) } diff --git a/faiss_vector_posting.go b/faiss_vector_posting.go index e4275d76..191f321c 100644 --- a/faiss_vector_posting.go +++ b/faiss_vector_posting.go @@ -22,6 +22,7 @@ import ( "encoding/json" "math" "reflect" + "time" "github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring/roaring64" @@ -372,8 +373,15 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap pos += n } + var cacheExpiryTime time.Duration + if v, ok := sb.config["cacheExpiryTime"]; ok { + if v, err := parseToTimeDuration(v); err == nil { + cacheExpiryTime = v + } + } + vecIndex, vecDocIDMap, vectorIDsToExclude, err = - sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except) + sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except, cacheExpiryTime) if vecIndex != nil { vecIndexSize = vecIndex.Size() diff --git a/util.go b/util.go new file mode 100644 index 00000000..75e016ee --- /dev/null +++ b/util.go @@ -0,0 +1,29 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package zap + +import ( + "fmt" + "time" +) + +func parseToTimeDuration(i interface{}) (time.Duration, error) { + switch v := i.(type) { + case string: + return time.ParseDuration(v) + + default: + return 0, fmt.Errorf("expects a duration string") + } +}