Skip to content

Commit

Permalink
Enforce removeSegment flow with _enableDeletedKeysCompactionConsistency
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Aug 30, 2024
1 parent d3eaea6 commit b2e3a63
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,7 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
if (_partialUpsertHandler != null) {
recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
}
recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
doAddOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, oldSegment,
validDocIdsForOldSegment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -28,13 +29,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
Expand All @@ -46,7 +49,8 @@

/**
* Implementation of {@link PartitionUpsertMetadataManager} that is backed by a {@link ConcurrentHashMap} and ensures
* consistent deletions. This should be used when the table is configured with 'enableConsistentDeletes' set to true.
* consistent deletions. This should be used when the table is configured with 'enableDeletedKeysCompactionConsistency'
* set to true.
*
* Consistent deletion ensures that when deletedKeysTTL is enabled with UpsertCompaction, the key metadata is
* removed from the HashMap only after all other records in the old segments are compacted. This guarantees
Expand Down Expand Up @@ -209,6 +213,49 @@ protected void doRemoveSegment(IndexSegment segment) {
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}

@Override
public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator,
IndexSegment oldSegment) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
try {
MutableRoaringBitmap validDocIdsForOldSegment =
oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
if (recordInfoIterator != null) {
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
_tableNameWithType);
if (validDocIds == null) {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
if (queryableDocIds == null && _deleteRecordColumn != null) {
queryableDocIds = new ThreadSafeMutableRoaringBitmap();
}
addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, queryableDocIds, recordInfoIterator,
oldSegment, validDocIdsForOldSegment);
}
if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty() && _partialUpsertHandler != null) {
int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
// For partial-upsert table, because we do not restore the original record location when removing the primary
// keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
// consuming segment is replaced by a committed segment that is consumed from a different server with
// different records (some stream consumer cannot guarantee consuming the messages in the same order).
_logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
+ "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
numKeysNotReplaced);
}
// we want to always remove a segment in case of enableDeletedKeysCompactionConsistency = true
// this is to account for the removal of primary-key in the to-be-removed segment and reduce
// distinctSegmentCount by 1
doRemoveSegment(oldSegment);
} finally {
segmentLock.unlock();
}
}

@Override
protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> primaryKeyIterator) {
// We need to decrease the distinctSegmentCount for each unique primary key in this deleting segment by 1
Expand Down
Loading

0 comments on commit b2e3a63

Please sign in to comment.