diff --git a/build.go b/build.go index 99635739..8e19df52 100644 --- a/build.go +++ b/build.go @@ -160,7 +160,7 @@ func persistStoredFieldValues(fieldID int, func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, storedIndexOffset uint64, dictLocs []uint64, - sectionsIndexOffset uint64) (*SegmentBase, error) { + sectionsIndexOffset uint64, config map[string]interface{}) (*SegmentBase, error) { sb := &SegmentBase{ mem: mem, memCRC: memCRC, @@ -175,6 +175,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, dictLocs: dictLocs, fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), + config: config, } sb.updateSize() diff --git a/build_test.go b/build_test.go index 605350a3..d148cdab 100644 --- a/build_test.go +++ b/build_test.go @@ -48,28 +48,28 @@ func buildTestSegment() (*SegmentBase, uint64, error) { doc, } - seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil) return seg.(*SegmentBase), size, err } func buildTestSegmentMulti() (*SegmentBase, uint64, error) { results := buildTestAnalysisResultsMulti() - seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil) return seg.(*SegmentBase), size, err } func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, uint64, error) { results := buildTestAnalysisResultsMulti() - seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor) + seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil) return seg.(*SegmentBase), size, err } func buildTestSegmentMultiWithDifferentFields(includeDocA, includeDocB bool) (*SegmentBase, uint64, error) { results := buildTestAnalysisResultsMultiWithDifferentFields(includeDocA, includeDocB) - seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil) return seg.(*SegmentBase), size, err } @@ -152,7 +152,7 @@ func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) ( doc, } - sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor) + sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil) return sb.(*SegmentBase), fields, err } diff --git a/dict_test.go b/dict_test.go index cc1c4bf6..7a987b82 100644 --- a/dict_test.go +++ b/dict_test.go @@ -34,7 +34,7 @@ func buildTestSegmentForDict() (*SegmentBase, uint64, error) { doc, } - seg, size, err := zapPlugin.newWithChunkMode(results, 1024) + seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil) return seg.(*SegmentBase), size, err } diff --git a/faiss_vector_cache.go b/faiss_vector_cache.go index 893da2d5..51a258b0 100644 --- a/faiss_vector_cache.go +++ b/faiss_vector_cache.go @@ -38,6 +38,10 @@ type vectorIndexCache struct { closeCh chan struct{} m sync.RWMutex cache map[uint16]*cacheEntry + + // expiry time for a cache entry during idle period + cacheExpiryTime time.Duration + lastAccessTime time.Time } func (vc *vectorIndexCache) Clear() { @@ -52,12 +56,14 @@ func (vc *vectorIndexCache) Clear() { vc.m.Unlock() } -func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) { +func (vc *vectorIndexCache) loadOrCreate(fieldID uint16, mem []byte, + except *roaring.Bitmap, cacheExpiryTime time.Duration) ( + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + vecIDsToExclude []int64, err error) { var found bool index, vecDocIDMap, vecIDsToExclude, found = vc.loadFromCache(fieldID, except) if !found { - index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except) + index, vecDocIDMap, vecIDsToExclude, err = vc.createAndCache(fieldID, mem, except, cacheExpiryTime) } return index, vecDocIDMap, vecIDsToExclude, err } @@ -78,8 +84,10 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, except *roaring.Bitmap return index, vecDocIDMap, vecIDsToExclude, true } -func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *roaring.Bitmap) ( - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, vecIDsToExclude []int64, err error) { +func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, + except *roaring.Bitmap, cacheExpiryTime time.Duration) ( + index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, + vecIDsToExclude []int64, err error) { vc.m.Lock() defer vc.m.Unlock() @@ -123,15 +131,16 @@ func (vc *vectorIndexCache) createAndCache(fieldID uint16, mem []byte, except *r return nil, nil, nil, err } - vc.insertLOCKED(fieldID, index, vecDocIDMap) + vc.insertLOCKED(fieldID, index, vecDocIDMap, cacheExpiryTime) return index, vecDocIDMap, vecIDsToExclude, nil } -func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, - index *faiss.IndexImpl, vecDocIDMap map[int64]uint32) { +func (vc *vectorIndexCache) insertLOCKED(fieldIDPlus1 uint16, index *faiss.IndexImpl, + vecDocIDMap map[int64]uint32, cacheExpiryTime time.Duration) { // the first time we've hit the cache, try to spawn a monitoring routine // which will reconcile the moving averages for all the fields being hit if len(vc.cache) == 0 { + vc.cacheExpiryTime = cacheExpiryTime go vc.monitor() } @@ -174,16 +183,31 @@ func (vc *vectorIndexCache) cleanup() bool { entry.tracker.add(sample) refCount := atomic.LoadInt64(&entry.refs) + + // by default we don't consider expiry time, and the + // cacheExpiryTime is set iff its something that's been configured by user. + cacheEntryExpired := vc.cacheExpiryTime == 0 || + (vc.cacheExpiryTime > 0 && time.Since(vc.lastAccessTime) > vc.cacheExpiryTime) + // the comparison threshold as of now is (1 - a). mathematically it // means that there is only 1 query per second on average as per history. // and in the current second, there were no queries performed against // this index. - if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 { + if entry.tracker.avg <= (1-entry.tracker.alpha) && refCount <= 0 && cacheEntryExpired { atomic.StoreUint64(&entry.tracker.sample, 0) delete(vc.cache, fieldIDPlus1) entry.close() continue } + + // indicates that there are some live queries in the system so update the + // last access time which would be used to decide whether we cleanup the + // cache or not. + // considering the last access time which is delimited by the periodicity + // of the cache. + if refCount > 0 { + vc.lastAccessTime = time.Now() + } atomic.StoreUint64(&entry.tracker.sample, 0) } diff --git a/faiss_vector_posting.go b/faiss_vector_posting.go index e4275d76..191f321c 100644 --- a/faiss_vector_posting.go +++ b/faiss_vector_posting.go @@ -22,6 +22,7 @@ import ( "encoding/json" "math" "reflect" + "time" "github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring/roaring64" @@ -372,8 +373,15 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap pos += n } + var cacheExpiryTime time.Duration + if v, ok := sb.config["cacheExpiryTime"]; ok { + if v, err := parseToTimeDuration(v); err == nil { + cacheExpiryTime = v + } + } + vecIndex, vecDocIDMap, vectorIDsToExclude, err = - sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except) + sb.vecIndexCache.loadOrCreate(fieldIDPlus1, sb.mem[pos:], except, cacheExpiryTime) if vecIndex != nil { vecIndexSize = vecIndex.Size() diff --git a/merge_test.go b/merge_test.go index 3aea005f..f8420389 100644 --- a/merge_test.go +++ b/merge_test.go @@ -146,7 +146,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) _ = os.RemoveAll("/tmp/" + fname) - emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024) + emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024, nil) if err != nil { t.Fatal(err) } @@ -644,7 +644,7 @@ func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, uint64, error) doc2, } - seg, size, err := zapPlugin.newWithChunkMode(results, 1024) + seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil) return seg.(*SegmentBase), size, err } diff --git a/new.go b/new.go index 94079eaf..af97c952 100644 --- a/new.go +++ b/new.go @@ -42,11 +42,16 @@ var ValidateDocFields = func(field index.Field) error { // New creates an in-memory zap-encoded SegmentBase from a set of Documents func (z *ZapPlugin) New(results []index.Document) ( segment.Segment, uint64, error) { - return z.newWithChunkMode(results, DefaultChunkMode) + return z.newWithChunkMode(results, DefaultChunkMode, nil) +} + +func (z *ZapPlugin) NewEx(results []index.Document, config map[string]interface{}) ( + segment.Segment, uint64, error) { + return z.newWithChunkMode(results, DefaultChunkMode, config) } func (*ZapPlugin) newWithChunkMode(results []index.Document, - chunkMode uint32) (segment.Segment, uint64, error) { + chunkMode uint32, config map[string]interface{}) (segment.Segment, uint64, error) { s := interimPool.Get().(*interim) var br bytes.Buffer @@ -73,7 +78,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode, s.FieldsMap, s.FieldsInv, uint64(len(results)), - storedIndexOffset, dictOffsets, sectionsIndexOffset) + storedIndexOffset, dictOffsets, sectionsIndexOffset, config) // get the bytes written before the interim's reset() call // write it to the newly formed segment base. diff --git a/segment.go b/segment.go index 8dce0856..33c9f6a1 100644 --- a/segment.go +++ b/segment.go @@ -38,8 +38,18 @@ func init() { reflectStaticSizeSegmentBase = int(unsafe.Sizeof(sb)) } +// Open returns a zap impl of a segment along with keeping some config values in +// mind during the segment's lifetime. +func (z *ZapPlugin) OpenEx(path string, config map[string]interface{}) (segment.Segment, error) { + return z.open(path, config) +} + // Open returns a zap impl of a segment -func (*ZapPlugin) Open(path string) (segment.Segment, error) { +func (z *ZapPlugin) Open(path string) (segment.Segment, error) { + return z.open(path, nil) +} + +func (*ZapPlugin) open(path string, config map[string]interface{}) (segment.Segment, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -57,6 +67,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), + config: config, }, f: f, mm: mm, @@ -108,6 +119,8 @@ type SegmentBase struct { bytesRead uint64 bytesWritten uint64 + config map[string]interface{} // config for the segment + m sync.Mutex fieldFSTs map[uint16]*vellum.FST diff --git a/util.go b/util.go new file mode 100644 index 00000000..75e016ee --- /dev/null +++ b/util.go @@ -0,0 +1,29 @@ +// Copyright (c) 2023 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package zap + +import ( + "fmt" + "time" +) + +func parseToTimeDuration(i interface{}) (time.Duration, error) { + switch v := i.(type) { + case string: + return time.ParseDuration(v) + + default: + return 0, fmt.Errorf("expects a duration string") + } +}