diff --git a/index/scorch/merge.go b/index/scorch/merge.go index b74504ca1..663c258cd 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -339,8 +339,8 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1) prevBytesReadTotal := cumulateBytesRead(segmentsToMerge) - newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path, - cw.cancelCh, s) + newDocNums, _, err := s.segPlugin.MergeEx(segmentsToMerge, docsToDrop, path, + cw.cancelCh, s, s.segmentConfig) atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1) fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime)) @@ -358,7 +358,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, return fmt.Errorf("merging failed: %v", err) } - seg, err = s.segPlugin.Open(path) + seg, err = s.segPlugin.OpenEx(path, s.segmentConfig) if err != nil { s.unmarkIneligibleForRemoval(filename) atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1) @@ -469,7 +469,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, path := s.path + string(os.PathSeparator) + filename newDocNums, _, err := - s.segPlugin.Merge(sbs, sbsDrops, path, s.closeCh, s) + s.segPlugin.MergeEx(sbs, sbsDrops, path, s.closeCh, s, s.segmentConfig) atomic.AddUint64(&s.stats.TotMemMergeZapEnd, 1) @@ -484,7 +484,7 @@ func (s *Scorch) mergeSegmentBases(snapshot *IndexSnapshot, return nil, 0, err } - seg, err := s.segPlugin.Open(path) + seg, err := s.segPlugin.OpenEx(path, s.segmentConfig) if err != nil { atomic.AddUint64(&s.stats.TotMemMergeErr, 1) return nil, 0, err diff --git a/index/scorch/persister.go b/index/scorch/persister.go index d59f733df..6e272e1ad 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -662,7 +662,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) { } }() for segmentID, path := range newSegmentPaths { - newSegments[segmentID], err = s.segPlugin.Open(path) + newSegments[segmentID], err = s.segPlugin.OpenEx(path, s.segmentConfig) if err != nil { return fmt.Errorf("error opening new segment at %s, %v", path, err) } @@ -872,7 +872,7 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro return nil, fmt.Errorf("segment path missing") } segmentPath := s.path + string(os.PathSeparator) + string(pathBytes) - segment, err := s.segPlugin.Open(segmentPath) + segment, err := s.segPlugin.OpenEx(segmentPath, s.segmentConfig) if err != nil { return nil, fmt.Errorf("error opening bolt segment: %v", err) } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 429d1daa9..5901ac34e 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -44,6 +44,7 @@ type Scorch struct { readOnly bool version uint8 config map[string]interface{} + segmentConfig map[string]interface{} analysisQueue *index.AnalysisQueue path string @@ -121,6 +122,7 @@ func NewScorch(storeName string, forceMergeRequestCh: make(chan *mergerCtrl, 1), segPlugin: defaultSegmentPlugin, copyScheduled: map[string]int{}, + segmentConfig: make(map[string]interface{}), } forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) @@ -441,7 +443,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { stats := newFieldStats() if len(analysisResults) > 0 { - newSegment, bufBytes, err = s.segPlugin.New(analysisResults) + newSegment, bufBytes, err = s.segPlugin.NewEx(analysisResults, s.segmentConfig) if err != nil { return err } diff --git a/index/scorch/segment_plugin.go b/index/scorch/segment_plugin.go index b3b9ba01f..cf12cf928 100644 --- a/index/scorch/segment_plugin.go +++ b/index/scorch/segment_plugin.go @@ -46,10 +46,14 @@ type SegmentPlugin interface { // New takes a set of Documents and turns them into a new Segment New(results []index.Document) (segment.Segment, uint64, error) + NewEx(results []index.Document, config map[string]interface{}) (segment.Segment, uint64, error) + // Open attempts to open the file at the specified path and // return the corresponding Segment Open(path string) (segment.Segment, error) + OpenEx(path string, config map[string]interface{}) (segment.Segment, error) + // Merge takes a set of Segments, and creates a new segment on disk at // the specified path. // Drops is a set of bitmaps (one for each segment) indicating which @@ -67,6 +71,10 @@ type SegmentPlugin interface { Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string, closeCh chan struct{}, s segment.StatsReporter) ( [][]uint64, uint64, error) + + MergeEx(segments []segment.Segment, drops []*roaring.Bitmap, path string, + closeCh chan struct{}, s segment.StatsReporter, config map[string]interface{}) ( + [][]uint64, uint64, error) } var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin