Skip to content

Commit

Permalink
Add Vector merge policy for indices with vectors
Browse files Browse the repository at this point in the history
Add a new VectorMergePolicy for indices with vector fields.

This new merge policy is a  copy of Lucene's `TieredMergePolicy` with some modifications to
speed up merging of vector indexes:

- Instead of merging segments of equal size, a merge is composed of
the current largest segment with smallest segments
-Instead of choosing a merge with the smallest skew (giving preference
 to merges of segments of same size), preference is given to a merge
 of biggest segment with smallest segments.

These modifications were done to take advantage of optimization during merging
that keeps the largest graph from merging segments and appends vectors
from other merging segments to it.
  • Loading branch information
mayya-sharipova committed Dec 21, 2023
1 parent 5d8e297 commit 9d5555b
Show file tree
Hide file tree
Showing 6 changed files with 1,131 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,8 @@ public long getGcDeletesInMillis() {
/**
* Returns the merge policy that should be used for this index.
*/
public MergePolicy getMergePolicy(boolean isTimeBasedIndex) {
return mergePolicyConfig.getMergePolicy(isTimeBasedIndex);
public MergePolicy getMergePolicy(boolean isTimeBasedIndex, boolean hasVectorField) {
return mergePolicyConfig.getMergePolicy(isTimeBasedIndex, hasVectorField);
}

public <T> T getValue(Setting<T> setting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public final class MergePolicyConfig {
* turn, this creates segments that have non-overlapping @timestamp ranges if data gets ingested in order.
*/
private final LogByteSizeMergePolicy timeBasedMergePolicy = new LogByteSizeMergePolicy();

// A merge policy that is optimized for merging vector data
private final VectorMergePolicy vectorMergePolicy = new VectorMergePolicy();
private final Logger logger;
private final boolean mergesEnabled;
private volatile Type mergePolicyType;
Expand Down Expand Up @@ -160,7 +163,7 @@ public final class MergePolicyConfig {
public enum Type {
UNSET {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex, boolean hasVectorField) {
if (isTimeBasedIndex) {
// With time-based data, it's important that the merge policy only merges adjacent segments, so that segments end up
// with non-overlapping time ranges if data gets indexed in order. This makes queries more efficient, as range filters
Expand All @@ -180,24 +183,27 @@ MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {

return config.timeBasedMergePolicy;
} else {
if (hasVectorField) {
return config.vectorMergePolicy;
}
return config.tieredMergePolicy;
}
}
},
TIERED {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex, boolean hasVectorField) {
return config.tieredMergePolicy;
}
},
TIME_BASED {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex, boolean hasVectorField) {
return config.timeBasedMergePolicy;
}
};

abstract MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeries);
abstract MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeries, boolean hasVectorField);
}

public static final Setting<Type> INDEX_MERGE_POLICY_TYPE_SETTING = Setting.enumSetting(
Expand Down Expand Up @@ -321,6 +327,7 @@ void setMergePolicyType(Type type) {
void setSegmentsPerTier(double segmentsPerTier) {
tieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
// LogByteSizeMergePolicy ignores this parameter, it always tries to have between 1 and merge_factor - 1 segments per tier.
vectorMergePolicy.setSegmentsPerTier(segmentsPerTier);
}

void setMergeFactor(int mergeFactor) {
Expand All @@ -332,35 +339,42 @@ void setMaxMergedSegment(ByteSizeValue maxMergedSegment) {
// We use 0 as a placeholder for "unset".
if (maxMergedSegment.getBytes() == 0) {
tieredMergePolicy.setMaxMergedSegmentMB(defaultMaxMergedSegment.getMbFrac());
vectorMergePolicy.setMaxMergedSegmentMB(defaultMaxMergedSegment.getMbFrac());
timeBasedMergePolicy.setMaxMergeMB(defaultMaxTimeBasedMergedSegment.getMbFrac());
} else {
tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
vectorMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
timeBasedMergePolicy.setMaxMergeMB(maxMergedSegment.getMbFrac());
}
}

void setMaxMergesAtOnce(int maxMergeAtOnce) {
tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
vectorMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
// LogByteSizeMergePolicy ignores this parameter, it always merges merge_factor segments at once.
}

void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) {
tieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
vectorMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
timeBasedMergePolicy.setMinMergeMB(floorSegementSetting.getMbFrac());
}

void setExpungeDeletesAllowed(Double value) {
tieredMergePolicy.setForceMergeDeletesPctAllowed(value);
vectorMergePolicy.setForceMergeDeletesPctAllowed(value);
// LogByteSizeMergePolicy doesn't have a similar configuration option
}

void setCompoundFormatThreshold(CompoundFileThreshold compoundFileThreshold) {
compoundFileThreshold.configure(tieredMergePolicy);
compoundFileThreshold.configure(vectorMergePolicy);
compoundFileThreshold.configure(timeBasedMergePolicy);
}

void setDeletesPctAllowed(Double deletesPctAllowed) {
tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed);
vectorMergePolicy.setDeletesPctAllowed(deletesPctAllowed);
// LogByteSizeMergePolicy doesn't have a similar configuration option
}

Expand All @@ -384,11 +398,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT
}

@SuppressForbidden(reason = "we always use an appropriate merge scheduler alongside this policy so NoMergePolic#INSTANCE is ok")
MergePolicy getMergePolicy(boolean isTimeBasedIndex) {
MergePolicy getMergePolicy(boolean isTimeBasedIndex, boolean hasVectorField) {
if (mergesEnabled == false) {
return NoMergePolicy.INSTANCE;
}
return mergePolicyType.getMergePolicy(this, isTimeBasedIndex);
return mergePolicyType.getMergePolicy(this, isTimeBasedIndex, hasVectorField);
}

private static CompoundFileThreshold parseCompoundFormat(String noCFSRatio) {
Expand Down
Loading

0 comments on commit 9d5555b

Please sign in to comment.