Skip to content

Commit

Permalink
add option for ignoreCrcMismatch for upsert-compaction task
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 committed Dec 17, 2024
1 parent 0e6b9a9 commit a77c982
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ public static class UpsertCompactionTask {
*/
public static final String SNAPSHOT = "snapshot";

/**
* key representing if upsert compaction task executor should ignore crc mismatch or not during task execution
*/
public static final String IGNORE_CRC_MISMATCH_KEY = "ignoreCrcMismatch";

/**
* default value for the key IGNORE_CRC_MISMATCH_KEY: false
*/
public static final boolean DEFAULT_IGNORE_CRC_MISMATCH = false;

/**
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
Expand Down Expand Up @@ -58,11 +59,13 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
TableConfig tableConfig = getTableConfig(tableNameWithType);

String validDocIdsTypeStr =
configs.getOrDefault(MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
configs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.name());
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
String originalSegmentCrcFromTaskGenerator = configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY);
String crcFromDeepStorageSegment = segmentMetadata.getCrc();
if (!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
boolean ignoreCrcMismatch = Boolean.parseBoolean(configs.getOrDefault(UpsertCompactionTask.IGNORE_CRC_MISMATCH_KEY,
String.valueOf(UpsertCompactionTask.DEFAULT_IGNORE_CRC_MISMATCH)));
if (!ignoreCrcMismatch && !originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)) {
String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc "
+ "from ZK: %s, crc from deepstore: %s", segmentName, originalSegmentCrcFromTaskGenerator,
crcFromDeepStorageSegment);
Expand Down Expand Up @@ -145,7 +148,7 @@ private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir,
protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
Collections.singletonMap(UpsertCompactionTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX,
String.valueOf(System.currentTimeMillis())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc()));
configs.put(UpsertCompactionTask.VALID_DOC_IDS_TYPE, validDocIdsType.toString());
configs.put(UpsertCompactionTask.IGNORE_CRC_MISMATCH_KEY,
taskConfigs.getOrDefault(UpsertCompactionTask.IGNORE_CRC_MISMATCH_KEY,
String.valueOf(UpsertCompactionTask.DEFAULT_IGNORE_CRC_MISMATCH)));
pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs));
numTasks++;
}
Expand Down

0 comments on commit a77c982

Please sign in to comment.