Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce removeSegment flow with _enableDeletedKeysCompactionConsistency #13914

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,7 @@ public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutable
protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
@Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
if (_partialUpsertHandler != null) {
tibrewalpratik17 marked this conversation as resolved.
Show resolved Hide resolved
recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
}
recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
doAddOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, oldSegment,
validDocIdsForOldSegment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.upsert;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -28,13 +29,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
Expand All @@ -46,7 +49,8 @@

/**
* Implementation of {@link PartitionUpsertMetadataManager} that is backed by a {@link ConcurrentHashMap} and ensures
* consistent deletions. This should be used when the table is configured with 'enableConsistentDeletes' set to true.
* consistent deletions. This should be used when the table is configured with 'enableDeletedKeysCompactionConsistency'
* set to true.
*
* Consistent deletion ensures that when deletedKeysTTL is enabled with UpsertCompaction, the key metadata is
* removed from the HashMap only after all other records in the old segments are compacted. This guarantees
Expand Down Expand Up @@ -209,6 +213,49 @@ protected void doRemoveSegment(IndexSegment segment) {
System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
}

@Override
public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator,
IndexSegment oldSegment) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
try {
MutableRoaringBitmap validDocIdsForOldSegment =
oldSegment.getValidDocIds() != null ? oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
if (recordInfoIterator != null) {
Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
"Got unsupported segment implementation: {} for segment: {}, table: {}", segment.getClass(), segmentName,
_tableNameWithType);
if (validDocIds == null) {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
if (queryableDocIds == null && _deleteRecordColumn != null) {
queryableDocIds = new ThreadSafeMutableRoaringBitmap();
}
addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, queryableDocIds, recordInfoIterator,
oldSegment, validDocIdsForOldSegment);
}
if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty() && _partialUpsertHandler != null) {
int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
// For partial-upsert table, because we do not restore the original record location when removing the primary
// keys not replaced, it can potentially cause inconsistency between replicas. This can happen when a
// consuming segment is replaced by a committed segment that is consumed from a different server with
// different records (some stream consumer cannot guarantee consuming the messages in the same order).
_logger.warn("Found {} primary keys not replaced when replacing segment: {} for partial-upsert table. This "
+ "can potentially cause inconsistency between replicas", numKeysNotReplaced, segmentName);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
numKeysNotReplaced);
}
// we want to always remove a segment in case of enableDeletedKeysCompactionConsistency = true
// this is to account for the removal of primary-key in the to-be-removed segment and reduce
// distinctSegmentCount by 1
doRemoveSegment(oldSegment);
} finally {
segmentLock.unlock();
}
}

@Override
protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> primaryKeyIterator) {
// We need to decrease the distinctSegmentCount for each unique primary key in this deleting segment by 1
Expand Down
Loading
Loading