From 0479ad69a5b26d3046482114703354e20feffb48 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 29 Aug 2024 11:59:53 +0530 Subject: [PATCH 1/4] introduce having config at zap layer --- build.go | 3 ++- new.go | 11 ++++++++--- segment.go | 15 ++++++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/build.go b/build.go index 99635739..8e19df52 100644 --- a/build.go +++ b/build.go @@ -160,7 +160,7 @@ func persistStoredFieldValues(fieldID int, func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, storedIndexOffset uint64, dictLocs []uint64, - sectionsIndexOffset uint64) (*SegmentBase, error) { + sectionsIndexOffset uint64, config map[string]interface{}) (*SegmentBase, error) { sb := &SegmentBase{ mem: mem, memCRC: memCRC, @@ -175,6 +175,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, dictLocs: dictLocs, fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), + config: config, } sb.updateSize() diff --git a/new.go b/new.go index 94079eaf..af97c952 100644 --- a/new.go +++ b/new.go @@ -42,11 +42,16 @@ var ValidateDocFields = func(field index.Field) error { // New creates an in-memory zap-encoded SegmentBase from a set of Documents func (z *ZapPlugin) New(results []index.Document) ( segment.Segment, uint64, error) { - return z.newWithChunkMode(results, DefaultChunkMode) + return z.newWithChunkMode(results, DefaultChunkMode, nil) +} + +func (z *ZapPlugin) NewEx(results []index.Document, config map[string]interface{}) ( + segment.Segment, uint64, error) { + return z.newWithChunkMode(results, DefaultChunkMode, config) } func (*ZapPlugin) newWithChunkMode(results []index.Document, - chunkMode uint32) (segment.Segment, uint64, error) { + chunkMode uint32, config map[string]interface{}) (segment.Segment, uint64, error) { s := interimPool.Get().(*interim) var br bytes.Buffer @@ -73,7 +78,7 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode, s.FieldsMap, s.FieldsInv, uint64(len(results)), - storedIndexOffset, dictOffsets, sectionsIndexOffset) + storedIndexOffset, dictOffsets, sectionsIndexOffset, config) // get the bytes written before the interim's reset() call // write it to the newly formed segment base. diff --git a/segment.go b/segment.go index 8dce0856..33c9f6a1 100644 --- a/segment.go +++ b/segment.go @@ -38,8 +38,18 @@ func init() { reflectStaticSizeSegmentBase = int(unsafe.Sizeof(sb)) } +// Open returns a zap impl of a segment along with keeping some config values in +// mind during the segment's lifetime. +func (z *ZapPlugin) OpenEx(path string, config map[string]interface{}) (segment.Segment, error) { + return z.open(path, config) +} + // Open returns a zap impl of a segment -func (*ZapPlugin) Open(path string) (segment.Segment, error) { +func (z *ZapPlugin) Open(path string) (segment.Segment, error) { + return z.open(path, nil) +} + +func (*ZapPlugin) open(path string, config map[string]interface{}) (segment.Segment, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -57,6 +67,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) { fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), + config: config, }, f: f, mm: mm, @@ -108,6 +119,8 @@ type SegmentBase struct { bytesRead uint64 bytesWritten uint64 + config map[string]interface{} // config for the segment + m sync.Mutex fieldFSTs map[uint16]*vellum.FST From 8677bf73ac4d2d60f86d090735298a7aa8558d8d Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 29 Aug 2024 19:17:31 +0530 Subject: [PATCH 2/4] unit test fixes --- build_test.go | 10 +++++----- dict_test.go | 2 +- merge_test.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/build_test.go b/build_test.go index 605350a3..d148cdab 100644 --- a/build_test.go +++ b/build_test.go @@ -48,28 +48,28 @@ func buildTestSegment() (*SegmentBase, uint64, error) { doc, } - seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil) return seg.(*SegmentBase), size, err } func buildTestSegmentMulti() (*SegmentBase, uint64, error) { results := buildTestAnalysisResultsMulti() - seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil) return seg.(*SegmentBase), size, err } func buildTestSegmentMultiWithChunkFactor(chunkFactor uint32) (*SegmentBase, uint64, error) { results := buildTestAnalysisResultsMulti() - seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor) + seg, size, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil) return seg.(*SegmentBase), size, err } func buildTestSegmentMultiWithDifferentFields(includeDocA, includeDocB bool) (*SegmentBase, uint64, error) { results := buildTestAnalysisResultsMultiWithDifferentFields(includeDocA, includeDocB) - seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode) + seg, size, err := zapPlugin.newWithChunkMode(results, DefaultChunkMode, nil) return seg.(*SegmentBase), size, err } @@ -152,7 +152,7 @@ func buildTestSegmentWithDefaultFieldMapping(chunkFactor uint32) ( doc, } - sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor) + sb, _, err := zapPlugin.newWithChunkMode(results, chunkFactor, nil) return sb.(*SegmentBase), fields, err } diff --git a/dict_test.go b/dict_test.go index cc1c4bf6..7a987b82 100644 --- a/dict_test.go +++ b/dict_test.go @@ -34,7 +34,7 @@ func buildTestSegmentForDict() (*SegmentBase, uint64, error) { doc, } - seg, size, err := zapPlugin.newWithChunkMode(results, 1024) + seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil) return seg.(*SegmentBase), size, err } diff --git a/merge_test.go b/merge_test.go index 3aea005f..f8420389 100644 --- a/merge_test.go +++ b/merge_test.go @@ -146,7 +146,7 @@ func testMergeWithEmptySegments(t *testing.T, before bool, numEmptySegments int) _ = os.RemoveAll("/tmp/" + fname) - emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024) + emptySegment, _, err := zapPlugin.newWithChunkMode([]index.Document{}, 1024, nil) if err != nil { t.Fatal(err) } @@ -644,7 +644,7 @@ func buildTestSegmentMultiHelper(docIds []string) (*SegmentBase, uint64, error) doc2, } - seg, size, err := zapPlugin.newWithChunkMode(results, 1024) + seg, size, err := zapPlugin.newWithChunkMode(results, 1024, nil) return seg.(*SegmentBase), size, err } From 3315b0f350a4ce92cd8861dd1e0b656d3a3ce5b6 Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Fri, 6 Sep 2024 11:54:57 +0530 Subject: [PATCH 3/4] passing config for Merge() API --- merge.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/merge.go b/merge.go index 490e9da0..5e61f6b2 100644 --- a/merge.go +++ b/merge.go @@ -35,9 +35,21 @@ const docDropped = math.MaxUint64 // sentinel docNum to represent a deleted doc // Merge takes a slice of segments and bit masks describing which // documents may be dropped, and creates a new segment containing the // remaining data. This new segment is built at the specified path. -func (*ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path string, +func (z *ZapPlugin) Merge(segments []seg.Segment, drops []*roaring.Bitmap, path string, closeCh chan struct{}, s seg.StatsReporter) ( [][]uint64, uint64, error) { + return z.merge(segments, drops, path, closeCh, s, nil) +} + +func (z *ZapPlugin) MergeEx(segments []seg.Segment, drops []*roaring.Bitmap, path string, + closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) ( + [][]uint64, uint64, error) { + return z.merge(segments, drops, path, closeCh, s, config) +} + +func (*ZapPlugin) merge(segments []seg.Segment, drops []*roaring.Bitmap, path string, + closeCh chan struct{}, s seg.StatsReporter, config map[string]interface{}) ( + [][]uint64, uint64, error) { segmentBases := make([]*SegmentBase, len(segments)) for segmenti, segment := range segments { switch segmentx := segment.(type) { From 4fa0a42546bd276d074228f0d7506af3b325ff8d Mon Sep 17 00:00:00 2001 From: Thejas-bhat Date: Thu, 12 Sep 2024 19:13:29 +0530 Subject: [PATCH 4/4] small correction to comment --- segment.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/segment.go b/segment.go index 33c9f6a1..e40f07aa 100644 --- a/segment.go +++ b/segment.go @@ -38,8 +38,8 @@ func init() { reflectStaticSizeSegmentBase = int(unsafe.Sizeof(sb)) } -// Open returns a zap impl of a segment along with keeping some config values in -// mind during the segment's lifetime. +// OpenEx returns a zap impl of a segment which tracks some config values during +// the its lifetime. func (z *ZapPlugin) OpenEx(path string, config map[string]interface{}) (segment.Segment, error) { return z.open(path, config) }