Skip to content

Commit

Permalink
adding user configurable cacheExpiry time duration
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Aug 29, 2024
1 parent 8677bf7 commit 152437b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
42 changes: 33 additions & 9 deletions faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type vectorIndexCache struct {
closeCh chan struct{}
m sync.RWMutex
cache map[uint16]*cacheEntry

// expiry time for a cache entry during idle period
cacheExpiryTime time.Duration
lastAccessTime time.Time
}

func (vc *vectorIndexCache) Clear() {
Expand All @@ -52,12 +56,14 @@ func (vc *vectorIndexCache) Clear() {
vc.m.Unlock()
}

func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) {
func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte,
except *roaring.Bitmap, cacheExpiryTime time.Duration) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
vecIDsToExclude []int64, err error) {
var found bool
index, vecDocIDMap, vecIDsToExclude, found = vc.loadFromCache(fieldID, except)
if !found {
index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except)
index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except, cacheExpiryTime)
}
return index, vecDocIDMap, vecIDsToExclude, err
}
Expand All @@ -78,8 +84,10 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, except *roaring.Bitmap
return index, vecDocIDMap, vecIDsToExclude, true
}

func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *roaring.Bitmap) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) {
func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte,
except *roaring.Bitmap, cacheExpiryTime time.Duration) (
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
vecIDsToExclude []int64, err error) {
vc.m.Lock()
defer vc.m.Unlock()

Expand Down Expand Up @@ -123,15 +131,16 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r
return nil, nil, nil, err
}

vc.insertLOCKED(fieldID, index, vecDocIDMap)
vc.insertLOCKED(fieldID, index, vecDocIDMap, cacheExpiryTime)
return index, vecDocIDMap, vecIDsToExclude, nil
}

func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32) {
func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, index *faiss.IndexImpl,
vecDocIDMap map[int64]uint32, cacheExpiryTime time.Duration) {
// the first time we've hit the cache, try to spawn a monitoring routine
// which will reconcile the moving averages for all the fields being hit
if len(vc.cache) == 0 {
vc.cacheExpiryTime = cacheExpiryTime
go vc.monitor()
}

Expand Down Expand Up @@ -174,16 +183,31 @@ func (vc *vectorIndexCache) cleanup() bool {
entry.tracker.add(sample)

refCount := atomic.LoadInt64(&entry.refs)

// by default we don't consider expiry time, and the
// cacheExpiryTime is set iff its something that's been configured by user.
cacheEntryExpired := vc.cacheExpiryTime == 0 ||
(vc.cacheExpiryTime > 0 && time.Since(vc.lastAccessTime) > vc.cacheExpiryTime)

// the comparison threshold as of now is (1 - a). mathematically it
// means that there is only 1 query per second on average as per history.
// and in the current second, there were no queries performed against
// this index.
if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 {
if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 && cacheEntryExpired {
atomic.StoreUint64(&entry.tracker.sample, 0)
delete(vc.cache, fieldIDPlus1)
entry.close()
continue
}

// indicates that there are some live queries in the system so update the
// last access time which would be used to decide whether we cleanup the
// cache or not.
// considering the last access time which is delimited by the periodicity
// of the cache.
if refCount > 0 {
vc.lastAccessTime = time.Now()
}
atomic.StoreUint64(&entry.tracker.sample, 0)
}

Expand Down
10 changes: 9 additions & 1 deletion faiss_vector_posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"math"
"reflect"
"time"

"github.com/RoaringBitmap/roaring"
"github.com/RoaringBitmap/roaring/roaring64"
Expand Down Expand Up @@ -372,8 +373,15 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap
pos += n
}

var cacheExpiryTime time.Duration
if v, ok := sb.config["cacheExpiryTime"]; ok {
if v, err := parseToTimeDuration(v); err == nil {
cacheExpiryTime = v
}
}

vecIndex, vecDocIDMap, vectorIDsToExclude, err =
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except)
sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except, cacheExpiryTime)

if vecIndex != nil {
vecIndexSize = vecIndex.Size()
Expand Down
29 changes: 29 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2023 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zap

import (
"fmt"
"time"
)

func parseToTimeDuration(i interface{}) (time.Duration, error) {
switch v := i.(type) {
case string:
return time.ParseDuration(v)

default:
return 0, fmt.Errorf("expects a duration string")
}
}

0 comments on commit 152437b

Please sign in to comment.