Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-57888: Updated Merge Process to Support Index Update #280

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"os"

index "github.com/blevesearch/bleve_index_api"
"github.com/blevesearch/vellum"
)

Expand Down Expand Up @@ -169,6 +170,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
sectionsIndexOffset: sectionsIndexOffset,
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
docValueOffset: 0, // docValueOffsets identified automatically by the section
updatedFields: make(map[string]*index.UpdateFieldInfo),
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
synIndexCache: newSynonymIndexCache(),
Expand Down
59 changes: 52 additions & 7 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"

"github.com/RoaringBitmap/roaring/v2"
index "github.com/blevesearch/bleve_index_api"
seg "github.com/blevesearch/scorch_segment_api/v2"
"github.com/golang/snappy"
)
Expand Down Expand Up @@ -109,6 +110,20 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
return newDocNums, uint64(cr.Count()), nil
}

// Remove fields that have been completely deleted from fieldsInv
func filterFields(fieldsInv []string, fieldInfo map[string]*index.UpdateFieldInfo) []string {
rv := make([]string, 0)
for _, field := range fieldsInv {
if val, ok := fieldInfo[field]; ok {
if val.Deleted {
continue
}
}
rv = append(rv, field)
}
return rv
}

func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
Expand All @@ -117,6 +132,8 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,

var fieldsSame bool
fieldsSame, fieldsInv = mergeFields(segments)
updatedFields := mergeUpdatedFields(segments)
fieldsInv = filterFields(fieldsInv, updatedFields)
fieldsMap = mapFields(fieldsInv)

numDocs = computeNewDocCount(segments, drops)
Expand All @@ -130,15 +147,16 @@ func mergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
// offsets in the fields section index of the file (the final merged file).
mergeOpaque := map[int]resetable{}
args := map[string]interface{}{
"chunkMode": chunkMode,
"fieldsSame": fieldsSame,
"fieldsMap": fieldsMap,
"numDocs": numDocs,
"chunkMode": chunkMode,
"fieldsSame": fieldsSame,
"fieldsMap": fieldsMap,
"numDocs": numDocs,
"updatedFields": updatedFields,
}

if numDocs > 0 {
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh)
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh, updatedFields)
if err != nil {
return nil, 0, 0, nil, nil, 0, err
}
Expand Down Expand Up @@ -358,7 +376,7 @@ type varintEncoder func(uint64) (int, error)

func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
fieldsMap map[string]uint16, fieldsInv []string, fieldsSame bool, newSegDocCount uint64,
w *CountHashWriter, closeCh chan struct{}) (uint64, [][]uint64, error) {
w *CountHashWriter, closeCh chan struct{}, updatedFields map[string]*index.UpdateFieldInfo) (uint64, [][]uint64, error) {
var rv [][]uint64 // The remapped or newDocNums for each segment.

var newDocNum uint64
Expand Down Expand Up @@ -397,7 +415,8 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,
// optimize when the field mapping is the same across all
// segments and there are no deletions, via byte-copying
// of stored docs bytes directly to the writer
if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) {
// cannot copy directly if fields might have been deleted
if fieldsSame && (dropsI == nil || dropsI.GetCardinality() == 0) && len(updatedFields) == 0 {
err := segment.copyStoredDocs(newDocNum, docNumOffsets, w)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -471,6 +490,12 @@ func mergeStoredAndRemap(segments []*SegmentBase, drops []*roaring.Bitmap,

// now walk the non-"_id" fields in order
for fieldID := 1; fieldID < len(fieldsInv); fieldID++ {
if val, ok := updatedFields[fieldsInv[fieldID]]; ok {
// early exit when stored value is supposed to be deleted
if val.Store {
continue
}
}
storedFieldValues := vals[fieldID]

stf := typs[fieldID]
Expand Down Expand Up @@ -606,6 +631,26 @@ func mergeFields(segments []*SegmentBase) (bool, []string) {
return fieldsSame, rv
}

// Combine updateFieldInfo from all segments
func mergeUpdatedFields(segments []*SegmentBase) map[string]*index.UpdateFieldInfo {
fieldInfo := make(map[string]*index.UpdateFieldInfo)

for _, segment := range segments {
for field, info := range segment.updatedFields {
if _, ok := fieldInfo[field]; !ok {
fieldInfo[field] = info
} else {
fieldInfo[field].Deleted = fieldInfo[field].Deleted || info.Deleted
fieldInfo[field].Index = fieldInfo[field].Index || info.Index
fieldInfo[field].Store = fieldInfo[field].Store || info.Store
fieldInfo[field].DocValues = fieldInfo[field].Store || info.DocValues
}
}

}
return fieldInfo
}

func isClosed(closeCh chan struct{}) bool {
select {
case <-closeCh:
Expand Down
17 changes: 14 additions & 3 deletions section_faiss_vector_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (v *faissVectorIndexSection) Merge(opaque map[int]resetable, segments []*Se
if _, ok := sb.fieldsMap[fieldName]; !ok {
continue
}
// early exit if index data is supposed to be deleted
if info, ok := vo.updatedFields[fieldName]; ok && info.Index {
continue
}

// check if the section address is a valid one for "fieldName" in the
// segment sb. the local fieldID (fetched by the fieldsMap of the sb)
Expand Down Expand Up @@ -686,9 +690,10 @@ func (v *faissVectorIndexSection) getvectorIndexOpaque(opaque map[int]resetable)

func (v *faissVectorIndexSection) InitOpaque(args map[string]interface{}) resetable {
rv := &vectorIndexOpaque{
fieldAddrs: make(map[uint16]int),
vecIDMap: make(map[int64]*vecInfo),
vecFieldMap: make(map[uint16]*indexContent),
fieldAddrs: make(map[uint16]int),
vecIDMap: make(map[int64]*vecInfo),
vecFieldMap: make(map[uint16]*indexContent),
updatedFields: make(map[string]*index.UpdateFieldInfo),
}
for k, v := range args {
rv.Set(k, v)
Expand Down Expand Up @@ -727,6 +732,8 @@ type vectorIndexOpaque struct {
// index to be build.
vecFieldMap map[uint16]*indexContent

updatedFields map[string]*index.UpdateFieldInfo

tmp0 []byte
}

Expand Down Expand Up @@ -773,4 +780,8 @@ func (v *vectorIndexOpaque) Reset() (err error) {
}

func (v *vectorIndexOpaque) Set(key string, val interface{}) {
switch key {
case "updatedFields":
v.updatedFields = val.(map[string]*index.UpdateFieldInfo)
}
}
24 changes: 19 additions & 5 deletions section_inverted_text_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (i *invertedTextIndexSection) AddrForField(opaque map[int]resetable, fieldI
func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
fieldsInv []string, fieldsMap map[string]uint16, fieldsSame bool,
newDocNumsIn [][]uint64, newSegDocCount uint64, chunkMode uint32,
w *CountHashWriter, closeCh chan struct{}) (map[int]int, uint64, error) {
updatedFields map[string]*index.UpdateFieldInfo, w *CountHashWriter,
closeCh chan struct{}) (map[int]int, uint64, error) {
var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
var bufLoc []uint64

Expand Down Expand Up @@ -125,6 +126,10 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.
if isClosed(closeCh) {
return nil, 0, seg.ErrClosed
}
// early exit if index data is supposed to be deleted
if info, ok := updatedFields[fieldName]; ok && info.Index {
continue
}

dict, err2 := segment.dictionary(fieldName)
if err2 != nil {
Expand Down Expand Up @@ -244,7 +249,8 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.

postItr = postings.iterator(true, true, true, postItr)

if fieldsSame {
// can only safely copy data if no field data has been deleted
if fieldsSame && len(updatedFields) == 0 {
// can optimize by copying freq/norm/loc bytes directly
lastDocNum, lastFreq, lastNorm, err = mergeTermFreqNormLocsByCopying(
term, postItr, newDocNums[itrI], newRoaring,
Expand Down Expand Up @@ -317,7 +323,10 @@ func mergeAndPersistInvertedSection(segments []*SegmentBase, dropsIn []*roaring.
if isClosed(closeCh) {
return nil, 0, seg.ErrClosed
}

// early exit if docvalues data is supposed to be deleted
if info, ok := updatedFields[fieldName]; ok && info.DocValues {
continue
}
fieldIDPlus1 := uint16(segment.fieldsMap[fieldName])
if dvIter, exists := segment.fieldDvReaders[SectionInvertedTextIndex][fieldIDPlus1-1]; exists &&
dvIter != nil {
Expand Down Expand Up @@ -398,7 +407,7 @@ func (i *invertedTextIndexSection) Merge(opaque map[int]resetable, segments []*S
w *CountHashWriter, closeCh chan struct{}) error {
io := i.getInvertedIndexOpaque(opaque)
fieldAddrs, _, err := mergeAndPersistInvertedSection(segments, drops, fieldsInv,
io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, w, closeCh)
io.FieldsMap, io.fieldsSame, newDocNumsIn, io.numDocs, io.chunkMode, io.updatedFields, w, closeCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -905,7 +914,8 @@ func (i *invertedIndexOpaque) getOrDefineField(fieldName string) int {

func (i *invertedTextIndexSection) InitOpaque(args map[string]interface{}) resetable {
rv := &invertedIndexOpaque{
fieldAddrs: map[int]int{},
fieldAddrs: map[int]int{},
updatedFields: make(map[string]*index.UpdateFieldInfo),
}
for k, v := range args {
rv.Set(k, v)
Expand Down Expand Up @@ -969,6 +979,8 @@ type invertedIndexOpaque struct {

fieldAddrs map[int]int

updatedFields map[string]*index.UpdateFieldInfo

fieldsSame bool
numDocs uint64
}
Expand Down Expand Up @@ -1035,5 +1047,7 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) {
i.FieldsMap = val.(map[string]uint16)
case "numDocs":
i.numDocs = val.(uint64)
case "updatedFields":
i.updatedFields = val.(map[string]*index.UpdateFieldInfo)
}
}
15 changes: 12 additions & 3 deletions section_synonym_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type synonymIndexOpaque struct {

// A map linking thesaurus IDs to their corresponding thesaurus' file offsets
thesaurusAddrs map[int]int

updatedFields map[string]*index.UpdateFieldInfo
}

// Set the fieldsMap and results in the synonym index opaque before the section processes a synonym field.
Expand All @@ -108,6 +110,8 @@ func (so *synonymIndexOpaque) Set(key string, value interface{}) {
so.results = value.([]index.Document)
case "fieldsMap":
so.FieldsMap = value.(map[string]uint16)
case "updatedFields":
so.updatedFields = value.(map[string]*index.UpdateFieldInfo)
}
}

Expand Down Expand Up @@ -399,6 +403,7 @@ func (s *synonymIndexSection) getSynonymIndexOpaque(opaque map[int]resetable) *s
func (s *synonymIndexSection) InitOpaque(args map[string]interface{}) resetable {
rv := &synonymIndexOpaque{
thesaurusAddrs: map[int]int{},
updatedFields: make(map[string]*index.UpdateFieldInfo),
}
for k, v := range args {
rv.Set(k, v)
Expand Down Expand Up @@ -452,7 +457,7 @@ func (s *synonymIndexSection) Merge(opaque map[int]resetable, segments []*Segmen
drops []*roaring.Bitmap, fieldsInv []string, newDocNumsIn [][]uint64,
w *CountHashWriter, closeCh chan struct{}) error {
so := s.getSynonymIndexOpaque(opaque)
thesaurusAddrs, fieldIDtoThesaurusID, err := mergeAndPersistSynonymSection(segments, drops, fieldsInv, newDocNumsIn, w, closeCh)
thesaurusAddrs, fieldIDtoThesaurusID, err := mergeAndPersistSynonymSection(segments, drops, fieldsInv, newDocNumsIn, so.updatedFields, w, closeCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -553,7 +558,8 @@ func writeSynTermMap(synTermMap map[uint32]string, w *CountHashWriter, bufMaxVar
}

func mergeAndPersistSynonymSection(segments []*SegmentBase, dropsIn []*roaring.Bitmap,
fieldsInv []string, newDocNumsIn [][]uint64, w *CountHashWriter,
fieldsInv []string, newDocNumsIn [][]uint64,
updatedFields map[string]*index.UpdateFieldInfo, w *CountHashWriter,
closeCh chan struct{}) (map[int]int, map[uint16]int, error) {

var bufMaxVarintLen64 []byte = make([]byte, binary.MaxVarintLen64)
Expand Down Expand Up @@ -600,7 +606,10 @@ func mergeAndPersistSynonymSection(segments []*SegmentBase, dropsIn []*roaring.B
if isClosed(closeCh) {
return nil, nil, seg.ErrClosed
}

// early exit if index data is supposed to be deleted
if info, ok := updatedFields[fieldName]; ok && info.Index {
continue
}
thes, err2 := segment.thesaurus(fieldName)
if err2 != nil {
return nil, nil, err2
Expand Down
13 changes: 13 additions & 0 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"unsafe"

"github.com/RoaringBitmap/roaring/v2"
index "github.com/blevesearch/bleve_index_api"
mmap "github.com/blevesearch/mmap-go"
segment "github.com/blevesearch/scorch_segment_api/v2"
"github.com/blevesearch/vellum"
Expand Down Expand Up @@ -109,6 +110,8 @@ type SegmentBase struct {
fieldDvNames []string // field names cached in fieldDvReaders
size uint64

updatedFields map[string]*index.UpdateFieldInfo

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST

Expand Down Expand Up @@ -952,3 +955,13 @@ func (s *SegmentBase) loadDvReaders() error {

return nil
}

// Getter method to retrieve updateFieldInfo within segment base
func (s *SegmentBase) GetUpdatedFields() map[string]*index.UpdateFieldInfo {
return s.updatedFields
}

// Setter method to store updateFieldInfo within segment base
func (s *SegmentBase) PutUpdatedFields(updatedFields map[string]*index.UpdateFieldInfo) {
s.updatedFields = updatedFields
}