diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java index 26d231651de2..4c594e8bee80 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java @@ -63,7 +63,7 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, SegmentZ } @Nullable - private static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) { + public static Integer getPartitionIdFromRealtimeSegmentName(String segmentName) { // A fast path to get partition id if the segmentName is in a known format like LLC. LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); if (llcSegmentName != null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 48349099b472..08b0eca90907 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -65,6 +65,7 @@ private MinionConstants() { */ public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks"; public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments"; + public static final long DEFAULT_TABLE_MAX_NUM_TASKS = 1; /** * Job configs @@ -223,4 +224,59 @@ public static class UpsertCompactionTask { */ public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; } + + public static class UpsertCompactMergeTask { + public static final String TASK_TYPE = "UpsertCompactMergeTask"; + + /** + * The time period to wait before picking segments for this task + * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days + */ + public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod"; + + /** + * number of segments to query in one batch to fetch valid doc id metadata, by default 500 + */ + public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; + + /** + * prefix for the new segment name that is created, + * {@link org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} will add __ as delimiter + * so not adding _ as a suffix here. + */ + public static final String MERGED_SEGMENT_NAME_PREFIX = "compacted"; + + /** + * maximum number of records to process in a single task, sum of all docs in to-be-merged segments + */ + public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask"; + + /** + * default maximum number of records to process in a single task, same as the value in {@link MergeRollupTask} + */ + public static final long DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000; + + /** + * maximum number of records in the output segment + */ + public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment"; + + /** + * default maximum number of records in output segment, same as the value in + * {@link org.apache.pinot.core.segment.processing.framework.SegmentConfig} + */ + public static final long DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000; + + /** + * maximum number of segments to process in a single task + */ + public static final String MAX_NUM_SEGMENTS_PER_TASK_KEY = "maxNumSegmentsPerTask"; + + /** + * default maximum number of segments to process in a single task + */ + public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10; + + public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments"; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java index 56009608ee77..3b22ce84a1d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java @@ -23,10 +23,12 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig; import org.apache.pinot.core.segment.processing.timehandler.TimeHandler; import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.TimestampIndexUtils; @@ -47,12 +49,15 @@ public class SegmentProcessorConfig { private final Map> _aggregationFunctionParameters; private final SegmentConfig _segmentConfig; private final Consumer _progressObserver; + private final SegmentNameGenerator _segmentNameGenerator; + private final Long _customCreationTime; private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig, List partitionerConfigs, MergeType mergeType, Map aggregationTypes, Map> aggregationFunctionParameters, SegmentConfig segmentConfig, - Consumer progressObserver) { + Consumer progressObserver, @Nullable SegmentNameGenerator segmentNameGenerator, + @Nullable Long customCreationTime) { TimestampIndexUtils.applyTimestampIndex(tableConfig, schema); _tableConfig = tableConfig; _schema = schema; @@ -65,6 +70,8 @@ private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandl _progressObserver = (progressObserver != null) ? progressObserver : p -> { // Do nothing. }; + _segmentNameGenerator = segmentNameGenerator; + _customCreationTime = customCreationTime; } /** @@ -127,11 +134,20 @@ public Consumer getProgressObserver() { return _progressObserver; } + public SegmentNameGenerator getSegmentNameGenerator() { + return _segmentNameGenerator; + } + + public long getCustomCreationTime() { + return _customCreationTime != null ? _customCreationTime : System.currentTimeMillis(); + } + @Override public String toString() { return "SegmentProcessorConfig{" + "_tableConfig=" + _tableConfig + ", _schema=" + _schema + ", _timeHandlerConfig=" + _timeHandlerConfig + ", _partitionerConfigs=" + _partitionerConfigs + ", _mergeType=" + _mergeType - + ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + '}'; + + ", _aggregationTypes=" + _aggregationTypes + ", _segmentConfig=" + _segmentConfig + + ", _segmentNameGenerator=" + _segmentNameGenerator + ", _customCreationTime=" + _customCreationTime + '}'; } /** @@ -147,6 +163,8 @@ public static class Builder { private Map> _aggregationFunctionParameters; private SegmentConfig _segmentConfig; private Consumer _progressObserver; + private SegmentNameGenerator _segmentNameGenerator; + private Long _customCreationTime; public Builder setTableConfig(TableConfig tableConfig) { _tableConfig = tableConfig; @@ -193,6 +211,16 @@ public Builder setProgressObserver(Consumer progressObserver) { return this; } + public Builder setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) { + _segmentNameGenerator = segmentNameGenerator; + return this; + } + + public Builder setCustomCreationTime(Long customCreationTime) { + _customCreationTime = customCreationTime; + return this; + } + public SegmentProcessorConfig build() { Preconditions.checkState(_tableConfig != null, "Must provide table config in SegmentProcessorConfig"); Preconditions.checkState(_schema != null, "Must provide schema in SegmentProcessorConfig"); @@ -216,7 +244,8 @@ public SegmentProcessorConfig build() { _segmentConfig = new SegmentConfig.Builder().build(); } return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType, - _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver); + _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver, + _segmentNameGenerator, _customCreationTime); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index 378d79fbabfa..a6d3f740429b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -280,8 +280,11 @@ private List generateSegment(Map partitionT SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema); generatorConfig.setOutDir(_segmentsOutputDir.getPath()); Consumer observer = _segmentProcessorConfig.getProgressObserver(); + generatorConfig.setCreationTime(String.valueOf(_segmentProcessorConfig.getCustomCreationTime())); - if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) { + if (_segmentProcessorConfig.getSegmentNameGenerator() != null) { + generatorConfig.setSegmentNameGenerator(_segmentProcessorConfig.getSegmentNameGenerator()); + } else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) { generatorConfig.setSegmentNameGenerator( SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName, false)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java new file mode 100644 index 000000000000..0f326f331467 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.restlet.resources.ValidDocIdsType; +import org.apache.pinot.common.utils.SegmentUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.core.segment.processing.framework.DefaultSegmentNumRowProvider; +import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig; +import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework; +import org.apache.pinot.minion.MinionConf; +import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils; +import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader; +import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.roaringbitmap.RoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Minion task that compacts and merges multiple segments of an upsert table and uploads it back as one single + * segment. This helps in keeping the segment count in check and also prevents a lot of small segments created over + * time. + */ +public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class); + + public UpsertCompactMergeTaskExecutor(MinionConf minionConf) { + super(minionConf); + } + + @Override + protected List convert(PinotTaskConfig pinotTaskConfig, List segmentDirs, + File workingDir) + throws Exception { + int numInputSegments = segmentDirs.size(); + _eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + numInputSegments); + String taskType = pinotTaskConfig.getTaskType(); + Map configs = pinotTaskConfig.getConfigs(); + LOGGER.info("Starting task: {} with configs: {}", taskType, configs); + long startMillis = System.currentTimeMillis(); + + String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); + TableConfig tableConfig = getTableConfig(tableNameWithType); + Schema schema = getSchema(tableNameWithType); + + SegmentProcessorConfig.Builder segmentProcessorConfigBuilder = + new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema); + + // Progress observer + segmentProcessorConfigBuilder.setProgressObserver(p -> _eventObserver.notifyProgress(_pinotTaskConfig, p)); + + // get list of segment metadata + List segmentMetadataList = segmentDirs.stream().map(x -> { + try { + return new SegmentMetadataImpl(x); + } catch (Exception e) { + throw new RuntimeException(String.format("Error fetching segment-metadata for segmentDir: %s", x), e); + } + }).collect(Collectors.toList()); + + // validate if partitionID is same for all small segments. Get partition id value for new segment. + int partitionID = getCommonPartitionIDForSegments(segmentMetadataList); + + // get the max creation time of the small segments. This will be the index creation time for the new segment. + Optional maxCreationTimeOfMergingSegments = + segmentMetadataList.stream().map(SegmentMetadataImpl::getIndexCreationTime).reduce(Long::max); + if (maxCreationTimeOfMergingSegments.isEmpty()) { + String message = "No valid creation time found for the new merged segment. This might be due to " + + "missing creation time for merging segments"; + LOGGER.error(message); + throw new RuntimeException(message); + } + + // validate if crc of deepstore copies is same as that in ZK of segments + List originalSegmentCrcFromTaskGenerator = + List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(",")); + validateCRCForInputSegments(segmentMetadataList, originalSegmentCrcFromTaskGenerator); + + // Fetch validDocID snapshot from server and get record-reader for compacted reader. + List recordReaders = segmentMetadataList.stream().map(x -> { + RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(), + ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc()); + if (validDocIds == null) { + // no valid crc match found or no validDocIds obtained from all servers + // error out the task instead of silently failing so that we can track it via task-error metrics + String message = String.format("No validDocIds found from all servers. They either failed to download " + + "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s", ""); + LOGGER.error(message); + throw new IllegalStateException(message); + } + return new CompactedPinotSegmentRecordReader(x.getIndexDir(), validDocIds); + }).collect(Collectors.toList()); + + // create new UploadedRealtimeSegment + segmentProcessorConfigBuilder.setCustomCreationTime(maxCreationTimeOfMergingSegments.get()); + segmentProcessorConfigBuilder.setSegmentNameGenerator( + new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(tableNameWithType), partitionID, + System.currentTimeMillis(), MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENT_NAME_PREFIX, null)); + SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build(); + List outputSegmentDirs; + try { + _eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments"); + outputSegmentDirs = new SegmentProcessorFramework(segmentProcessorConfig, workingDir, + SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(recordReaders), + Collections.emptyList(), new DefaultSegmentNumRowProvider(Integer.parseInt( + configs.get(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY)))).process(); + } finally { + for (RecordReader recordReader : recordReaders) { + recordReader.close(); + } + } + + long endMillis = System.currentTimeMillis(); + LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis)); + + List results = new ArrayList<>(); + for (File outputSegmentDir : outputSegmentDirs) { + String outputSegmentName = outputSegmentDir.getName(); + results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName) + .setTableNameWithType(tableNameWithType).build()); + } + return results; + } + + @Override + protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, + SegmentConversionResult segmentConversionResult) { + Map updateMap = new TreeMap<>(); + updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, + String.valueOf(System.currentTimeMillis())); + updateMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX, + pinotTaskConfig.getConfigs().get(MinionConstants.SEGMENT_NAME_KEY)); + return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, updateMap); + } + + int getCommonPartitionIDForSegments(List segmentMetadataList) { + List segmentNames = + segmentMetadataList.stream().map(SegmentMetadataImpl::getName).collect(Collectors.toList()); + Set partitionIDSet = segmentNames.stream().map(x -> { + Integer segmentPartitionId = SegmentUtils.getPartitionIdFromRealtimeSegmentName(x); + if (segmentPartitionId == null) { + throw new IllegalStateException(String.format("Partition id not found for %s", x)); + } + return segmentPartitionId; + }).collect(Collectors.toSet()); + if (partitionIDSet.size() > 1) { + throw new IllegalStateException( + "Found segments with different partition ids during task execution: " + partitionIDSet); + } + return partitionIDSet.iterator().next(); + } + + void validateCRCForInputSegments(List segmentMetadataList, List expectedCRCList) { + for (int i = 0; i < segmentMetadataList.size(); i++) { + SegmentMetadataImpl segmentMetadata = segmentMetadataList.get(i); + if (!Objects.equals(segmentMetadata.getCrc(), expectedCRCList.get(i))) { + String message = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc " + + "from ZK: %s, crc from deepstore: %s", segmentMetadata.getName(), expectedCRCList.get(i), + segmentMetadata.getCrc()); + LOGGER.error(message); + throw new IllegalStateException(message); + } + } + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java new file mode 100644 index 000000000000..b93684dae70a --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorFactory.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge; + +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.minion.MinionConf; +import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; +import org.apache.pinot.minion.executor.PinotTaskExecutor; +import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; +import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory; + + +@TaskExecutorFactory +public class UpsertCompactMergeTaskExecutorFactory implements PinotTaskExecutorFactory { + + private MinionConf _minionConf; + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager) { + } + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) { + _minionConf = minionConf; + } + + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactMergeTask.TASK_TYPE; + } + + @Override + public PinotTaskExecutor create() { + return new UpsertCompactMergeTaskExecutor(_minionConf); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java new file mode 100644 index 000000000000..ae3a4aa0d847 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; +import org.apache.pinot.common.restlet.resources.ValidDocIdsType; +import org.apache.pinot.common.utils.SegmentUtils; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils; +import org.apache.pinot.controller.util.ServerSegmentMetadataReader; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@TaskGenerator +public class UpsertCompactMergeTaskGenerator extends BaseTaskGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskGenerator.class); + private static final String DEFAULT_BUFFER_PERIOD = "2d"; + private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500; + + public static class SegmentMergerMetadata { + private final SegmentZKMetadata _segmentZKMetadata; + private final long _validDocIds; + private final long _invalidDocIds; + + SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long validDocIds, long invalidDocIds) { + _segmentZKMetadata = segmentZKMetadata; + _validDocIds = validDocIds; + _invalidDocIds = invalidDocIds; + } + + public SegmentZKMetadata getSegmentZKMetadata() { + return _segmentZKMetadata; + } + + public long getValidDocIds() { + return _validDocIds; + } + + public long getInvalidDocIds() { + return _invalidDocIds; + } + } + + public static class SegmentSelectionResult { + + private final Map>> _segmentsForCompactMergeByPartition; + + private final List _segmentsForDeletion; + + SegmentSelectionResult(Map>> segmentsForCompactMergeByPartition, + List segmentsForDeletion) { + _segmentsForCompactMergeByPartition = segmentsForCompactMergeByPartition; + _segmentsForDeletion = segmentsForDeletion; + } + + public Map>> getSegmentsForCompactMergeByPartition() { + return _segmentsForCompactMergeByPartition; + } + + public List getSegmentsForDeletion() { + return _segmentsForDeletion; + } + } + + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactMergeTask.TASK_TYPE; + } + + @Override + public List generateTasks(List tableConfigs) { + String taskType = MinionConstants.UpsertCompactMergeTask.TASK_TYPE; + List pinotTaskConfigs = new ArrayList<>(); + for (TableConfig tableConfig : tableConfigs) { + + String tableNameWithType = tableConfig.getTableName(); + LOGGER.info("Start generating task configs for table: {}", tableNameWithType); + + if (tableConfig.getTaskConfig() == null) { + LOGGER.warn("Task config is null for table: {}", tableNameWithType); + continue; + } + + // Only schedule 1 task of this type, per table + Map incompleteTasks = + TaskGeneratorUtils.getIncompleteTasks(taskType, tableNameWithType, _clusterInfoAccessor); + if (!incompleteTasks.isEmpty()) { + LOGGER.warn("Found incomplete tasks: {} for same table: {} and task type: {}. Skipping task generation.", + incompleteTasks.keySet(), tableNameWithType, taskType); + continue; + } + + Map taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType); + List allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + + // Get completed segments and filter out the segments based on the buffer time configuration + List candidateSegments = + getCandidateSegments(taskConfigs, allSegments, System.currentTimeMillis()); + + if (candidateSegments.isEmpty()) { + LOGGER.info("No segments were eligible for compactMerge task for table: {}", tableNameWithType); + continue; + } + + // get server to segment mappings + PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); + Map> serverToSegments = pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + BiMap serverToEndpoints; + try { + serverToEndpoints = pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + } catch (InvalidConfigException e) { + throw new RuntimeException(e); + } + + ServerSegmentMetadataReader serverSegmentMetadataReader = + new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(), + _clusterInfoAccessor.getConnectionManager()); + + // Number of segments to query per server request. If a table has a lot of segments, then we might send a + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. + int numSegmentsBatchPerServerRequest = Integer.parseInt( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST, + String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))); + + Map> validDocIdsMetadataList = + serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, + serverToEndpoints, null, 60_000, ValidDocIdsType.SNAPSHOT.toString(), numSegmentsBatchPerServerRequest); + + Map candidateSegmentsMap = + candidateSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); + + Set alreadyMergedSegments = getAlreadyMergedSegments(allSegments); + + SegmentSelectionResult segmentSelectionResult = + processValidDocIdsMetadata(taskConfigs, candidateSegmentsMap, validDocIdsMetadataList, alreadyMergedSegments); + + if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) { + pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(), + "0d"); + LOGGER.info( + "Deleted segments containing only invalid records for table: {}, number of segments to be deleted: {}", + tableNameWithType, segmentSelectionResult.getSegmentsForDeletion()); + } + + int numTasks = 0; + int maxTasks = Integer.parseInt(taskConfigs.getOrDefault(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, + String.valueOf(MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS))); + for (Map.Entry>> entry + : segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) { + if (numTasks == maxTasks) { + break; + } + List> groups = entry.getValue(); + // no valid groups found in the partition to merge + if (groups.isEmpty()) { + continue; + } + // there are no groups with more than 1 segment to merge + // TODO this can be later removed if we want to just do single-segment compaction from this task + if (groups.get(0).size() <= 1) { + continue; + } + // TODO see if multiple groups of same partition can be added + Map configs = new HashMap<>(getBaseTaskConfigs(tableConfig, + groups.get(0).stream().map(x -> x.getSegmentZKMetadata().getSegmentName()).collect(Collectors.toList()))); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, getDownloadUrl(groups.get(0))); + configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, getSegmentCrcList(groups.get(0))); + configs.put(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, String.valueOf( + Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT))))); + pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, configs)); + numTasks++; + } + LOGGER.info("Finished generating {} tasks configs for table: {}", numTasks, tableNameWithType); + } + return pinotTaskConfigs; + } + + @VisibleForTesting + public static SegmentSelectionResult processValidDocIdsMetadata(Map taskConfigs, + Map candidateSegmentsMap, + Map> validDocIdsMetadataInfoMap, Set alreadyMergedSegments) { + Map> segmentsEligibleForCompactMerge = new HashMap<>(); + Set segmentsForDeletion = new HashSet<>(); + for (String segmentName : validDocIdsMetadataInfoMap.keySet()) { + // check if segment is part of completed segments + if (!candidateSegmentsMap.containsKey(segmentName)) { + LOGGER.debug("Segment {} is not found in the candidate segments list, skipping it for {}", segmentName, + MinionConstants.UpsertCompactMergeTask.TASK_TYPE); + continue; + } + SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName); + for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) { + long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs(); + long totalValidDocs = validDocIdsMetadata.getTotalValidDocs(); + + // Skip segments if the crc from zk metadata and server does not match. They may be getting reloaded. + if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) { + LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", segmentName, + segment.getCrc(), validDocIdsMetadata.getSegmentCrc()); + continue; + } + + // segments eligible for deletion with no valid records + long totalDocs = validDocIdsMetadata.getTotalDocs(); + if (totalInvalidDocs == totalDocs) { + segmentsForDeletion.add(segmentName); + } else if (alreadyMergedSegments.contains(segmentName)) { + LOGGER.debug("Segment {} already merged. Skipping it for {}", segmentName, + MinionConstants.UpsertCompactMergeTask.TASK_TYPE); + break; + } else { + Integer partitionID = SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName); + if (partitionID == null) { + LOGGER.warn("Partition ID not found for segment: {}, skipping it for {}", segmentName, + MinionConstants.UpsertCompactMergeTask.TASK_TYPE); + continue; + } + segmentsEligibleForCompactMerge.computeIfAbsent(partitionID, k -> new ArrayList<>()) + .add(new SegmentMergerMetadata(segment, totalValidDocs, totalInvalidDocs)); + } + break; + } + } + + segmentsEligibleForCompactMerge.forEach((partitionID, segmentList) -> segmentList.sort( + Comparator.comparingLong(o -> o.getSegmentZKMetadata().getCreationTime()))); + + // Map to store the result: each key (partition) will have a list of groups + Map>> groupedSegments = new HashMap<>(); + + // Iterate over each partition and process its segments list + for (Map.Entry> entry : segmentsEligibleForCompactMerge.entrySet()) { + int partitionID = entry.getKey(); + List segments = entry.getValue(); + // task config thresholds + // TODO add output segment size as one of the thresholds + long validDocsThreshold = Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT))); + long maxRecordsPerTask = Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_RECORDS_PER_TASK_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_RECORDS_PER_TASK))); + long maxNumSegments = Long.parseLong( + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY, + String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK))); + + // List to store groups for the current partition + List> groups = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + + // variables to maintain current group sum + long currentValidDocsSum = 0; + long currentTotalDocsSum = 0; + + for (SegmentMergerMetadata segment : segments) { + long validDocs = segment.getValidDocIds(); + long invalidDocs = segment.getInvalidDocIds(); + + // Check if adding this segment would keep the validDocs sum within the threshold + if (currentValidDocsSum + validDocs <= validDocsThreshold && currentGroup.size() < maxNumSegments + && currentTotalDocsSum + validDocs + invalidDocs < maxRecordsPerTask) { + // Add the segment to the current group + currentGroup.add(segment); + currentValidDocsSum += validDocs; + currentTotalDocsSum += validDocs + invalidDocs; + } else { + // Finalize the current group and start a new one + if (!currentGroup.isEmpty()) { + groups.add(new ArrayList<>(currentGroup)); // Add the finalized group + } + + // Reset current group, sums and start with the new segment + currentGroup = new ArrayList<>(); + currentGroup.add(segment); + currentValidDocsSum = validDocs; + currentTotalDocsSum = validDocs + invalidDocs; + } + } + // Add the last group + if (!currentGroup.isEmpty()) { + groups.add(new ArrayList<>(currentGroup)); // Add a copy of the current group + } + + // Sort groups by total invalidDocs in descending order, if invalidDocs count are same, prefer group with + // higher number of small segments in them + // remove the groups having only 1 segments in them + // TODO this check can be later removed if we want single-segment compaction from this task itself + List> compactMergeGroups = + groups.stream().filter(x -> x.size() > 1).sorted((group1, group2) -> { + long invalidDocsSum1 = group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum(); + long invalidDocsSum2 = group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum(); + if (invalidDocsSum2 < invalidDocsSum1) { + return -1; + } else if (invalidDocsSum2 == invalidDocsSum1) { + return Long.compare(group2.size(), group1.size()); + } else { + return 1; + } + }).collect(Collectors.toList()); + + if (!compactMergeGroups.isEmpty()) { + groupedSegments.put(partitionID, compactMergeGroups); + } + } + return new SegmentSelectionResult(groupedSegments, new ArrayList<>(segmentsForDeletion)); + } + + @VisibleForTesting + public static List getCandidateSegments(Map taskConfigs, + List allSegments, long currentTimeInMillis) { + List candidateSegments = new ArrayList<>(); + String bufferPeriod = + taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); + long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod); + for (SegmentZKMetadata segment : allSegments) { + // Skip segments if HDFS download url is empty. This also avoids any race condition with deepstore upload + // retry task and this task + if (StringUtils.isBlank(segment.getDownloadUrl())) { + LOGGER.warn("Skipping segment {} for task as download url is empty", segment.getSegmentName()); + continue; + } + // initial segments selection based on status and age + if (segment.getStatus().isCompleted() && (segment.getEndTimeMs() <= (currentTimeInMillis - bufferMs))) { + candidateSegments.add(segment); + } + } + return candidateSegments; + } + + @VisibleForTesting + protected static Set getAlreadyMergedSegments(List allSegments) { + Set alreadyMergedSegments = new HashSet<>(); + for (SegmentZKMetadata segment : allSegments) { + // check if the segment has custom map having list of segments which merged to form this. we will later + // filter out the merged segments as they will be deleted + if (segment.getCustomMap() != null && !segment.getCustomMap().isEmpty() && !StringUtils.isBlank( + segment.getCustomMap().get(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX))) { + alreadyMergedSegments.addAll(List.of(StringUtils.split(segment.getCustomMap().get( + MinionConstants.UpsertCompactMergeTask.TASK_TYPE + + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX), ","))); + } + } + return alreadyMergedSegments; + } + + @Override + public void validateTaskConfigs(TableConfig tableConfig, Map taskConfigs) { + // check table is realtime + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + String.format("%s only supports realtime tables!", MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); + // check upsert enabled + Preconditions.checkState(tableConfig.isUpsertEnabled(), + String.format("Upsert must be enabled for %s", MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); + // check no malformed period + if (taskConfigs.containsKey(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY)) { + TimeUtils.convertPeriodToMillis(taskConfigs.get(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY)); + } + // check enableSnapshot = true + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + Preconditions.checkNotNull(upsertConfig, + String.format("UpsertConfig must be provided for %s", MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); + Preconditions.checkState(upsertConfig.isEnableSnapshot(), + String.format("'enableSnapshot' from UpsertConfig must be enabled for %s", + MinionConstants.UpsertCompactMergeTask.TASK_TYPE)); + } + + @VisibleForTesting + protected String getDownloadUrl(List segmentMergerMetadataList) { + return StringUtils.join(segmentMergerMetadataList.stream().map(x -> x.getSegmentZKMetadata().getDownloadUrl()) + .collect(Collectors.toList()), ","); + } + + @VisibleForTesting + protected String getSegmentCrcList(List segmentMergerMetadataList) { + return StringUtils.join( + segmentMergerMetadataList.stream().map(x -> String.valueOf(x.getSegmentZKMetadata().getCrc())) + .collect(Collectors.toList()), ","); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java new file mode 100644 index 000000000000..1a717e88db95 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskProgressObserverFactory.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge; + +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory; +import org.apache.pinot.spi.annotations.minion.EventObserverFactory; + + +@EventObserverFactory +public class UpsertCompactMergeTaskProgressObserverFactory extends BaseMinionProgressObserverFactory { + + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactMergeTask.TASK_TYPE; + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java new file mode 100644 index 000000000000..9c5093d1ab0d --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutorTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge; + +import java.util.Arrays; +import java.util.List; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + + +public class UpsertCompactMergeTaskExecutorTest { + private UpsertCompactMergeTaskExecutor _taskExecutor; + + @BeforeClass + public void setUp() { + _taskExecutor = new UpsertCompactMergeTaskExecutor(null); + } + + @Test + public void testValidateCRCForInputSegments() { + SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class); + SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class); + + Mockito.when(segment1.getCrc()).thenReturn("1000"); + Mockito.when(segment2.getCrc()).thenReturn("2000"); + + List segmentMetadataList = Arrays.asList(segment1, segment2); + List expectedCRCList = Arrays.asList("1000", "2000"); + + _taskExecutor.validateCRCForInputSegments(segmentMetadataList, expectedCRCList); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testValidateCRCForInputSegmentsWithMismatchedCRC() { + SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class); + SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class); + + Mockito.when(segment1.getCrc()).thenReturn("1000"); + Mockito.when(segment2.getCrc()).thenReturn("3000"); + + List segmentMetadataList = Arrays.asList(segment1, segment2); + List expectedCRCList = Arrays.asList("1000", "2000"); + + _taskExecutor.validateCRCForInputSegments(segmentMetadataList, expectedCRCList); + } + + @Test + public void testGetCommonPartitionIDForSegments() { + SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class); + SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class); + SegmentMetadataImpl segment3 = Mockito.mock(SegmentMetadataImpl.class); + + Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0"); + Mockito.when(segment2.getName()).thenReturn("testTable__0__1__0"); + Mockito.when(segment3.getName()).thenReturn("testTable__0__2__0"); + + List segmentMetadataList = Arrays.asList(segment1, segment2, segment3); + + int partitionID = _taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList); + Assert.assertEquals(partitionID, 0); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testGetCommonPartitionIDForSegmentsWithDifferentPartitionIDs() { + SegmentMetadataImpl segment1 = Mockito.mock(SegmentMetadataImpl.class); + SegmentMetadataImpl segment2 = Mockito.mock(SegmentMetadataImpl.class); + + Mockito.when(segment1.getName()).thenReturn("testTable__0__0__0"); + Mockito.when(segment2.getName()).thenReturn("testTable__1__0__0"); + + List segmentMetadataList = Arrays.asList(segment1, segment2); + + _taskExecutor.getCommonPartitionIDForSegments(segmentMetadataList); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java new file mode 100644 index 000000000000..5556ac53cd20 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class UpsertCompactMergeTaskGeneratorTest { + + private static final String RAW_TABLE_NAME = "testTable"; + private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; + private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; + private UpsertCompactMergeTaskGenerator _taskGenerator; + private TableConfig _tableConfig; + private SegmentZKMetadata _completedSegment; + private SegmentZKMetadata _completedSegment2; + private Map _completedSegmentsMap; + + @BeforeClass + public void setUp() { + _taskGenerator = new UpsertCompactMergeTaskGenerator(); + Map> tableTaskConfigs = new HashMap<>(); + Map compactionConfigs = new HashMap<>(); + tableTaskConfigs.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, compactionConfigs); + UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfig.setEnableSnapshot(true); + _tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .setUpsertConfig(upsertConfig) + .setTaskConfig(new TableTaskConfig(tableTaskConfigs)).build(); + + _completedSegment = new SegmentZKMetadata("testTable__0"); + _completedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + _completedSegment.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("12d")); + _completedSegment.setEndTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("11d")); + _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS); + _completedSegment.setTotalDocs(100L); + _completedSegment.setCrc(1000); + _completedSegment.setDownloadUrl("fs://testTable__0"); + + _completedSegment2 = new SegmentZKMetadata("testTable__1"); + _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + _completedSegment2.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("10d")); + _completedSegment2.setEndTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("9d")); + _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS); + _completedSegment2.setTotalDocs(10L); + _completedSegment2.setCrc(2000); + _completedSegment2.setDownloadUrl("fs://testTable__1"); + + _completedSegmentsMap = new HashMap<>(); + _completedSegmentsMap.put(_completedSegment.getSegmentName(), _completedSegment); + _completedSegmentsMap.put(_completedSegment2.getSegmentName(), _completedSegment2); + } + + @Test + public void testUpsertCompactMergeTaskConfig() { + + // check with OFFLINE table + Map upsertCompactMergeTaskConfig = + ImmutableMap.of("bufferTimePeriod", "5d"); + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig( + new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, + upsertCompactMergeTaskConfig))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(offlineTableConfig, upsertCompactMergeTaskConfig)); + + // check with non-upsert REALTIME table + TableConfig nonUpsertRealtimetableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, + upsertCompactMergeTaskConfig))) + .build(); + + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig, upsertCompactMergeTaskConfig)); + + // check with snapshot disabled + TableConfig disabledSnapshotTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, + upsertCompactMergeTaskConfig))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig, upsertCompactMergeTaskConfig)); + + // valid table configs + UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); + upsertConfig.setEnableSnapshot(true); + TableConfig validTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setUpsertConfig(upsertConfig) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, + upsertCompactMergeTaskConfig))) + .build(); + _taskGenerator.validateTaskConfigs(validTableConfig, upsertCompactMergeTaskConfig); + + // invalid buffer time period + Map upsertCompactMergeTaskConfig1 = + ImmutableMap.of("bufferTimePeriod", "5hd"); + Assert.assertThrows(IllegalArgumentException.class, + () -> _taskGenerator.validateTaskConfigs(validTableConfig, upsertCompactMergeTaskConfig1)); + } + + @Test + public void testGetAlreadyMergedSegments() { + SegmentZKMetadata mergedSegment = new SegmentZKMetadata("testTable__merged"); + mergedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + Map customMap = new HashMap<>(); + customMap.put(MinionConstants.UpsertCompactMergeTask.TASK_TYPE + + MinionConstants.UpsertCompactMergeTask.MERGED_SEGMENTS_ZK_SUFFIX, "testTable__0,testTable__1"); + mergedSegment.setCustomMap(customMap); + + // merged segment present + List allSegments = Arrays.asList(_completedSegment, _completedSegment2, mergedSegment); + Set alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(allSegments); + Assert.assertEquals(alreadyMergedSegments.size(), 2); + Assert.assertTrue(alreadyMergedSegments.contains("testTable__0")); + Assert.assertTrue(alreadyMergedSegments.contains("testTable__1")); + + // no merging happened till now + List segments = Arrays.asList(_completedSegment, _completedSegment2); + alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(segments); + Assert.assertTrue(alreadyMergedSegments.isEmpty()); + + // no segment present, empty list + alreadyMergedSegments = UpsertCompactMergeTaskGenerator.getAlreadyMergedSegments(Collections.emptyList()); + Assert.assertTrue(alreadyMergedSegments.isEmpty()); + } + + @Test + public void testGetCandidateSegments() { + Map taskConfigs = new HashMap<>(); + taskConfigs.put(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY, "5d"); + + // candidates are valid, outside buffer period and download urls + List candidateSegments = UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs, + new ArrayList<>(_completedSegmentsMap.values()), System.currentTimeMillis()); + Assert.assertEquals(candidateSegments.size(), 2); + Assert.assertTrue(candidateSegments.contains(_completedSegment)); + Assert.assertTrue(candidateSegments.contains(_completedSegment2)); + + // candidate have empty download url + SegmentZKMetadata segmentWithNoDownloadUrl = new SegmentZKMetadata("testTable__2"); + segmentWithNoDownloadUrl.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + segmentWithNoDownloadUrl.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("10d")); + segmentWithNoDownloadUrl.setEndTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("9d")); + segmentWithNoDownloadUrl.setTimeUnit(TimeUnit.MILLISECONDS); + segmentWithNoDownloadUrl.setTotalDocs(100L); + segmentWithNoDownloadUrl.setCrc(1000); + segmentWithNoDownloadUrl.setDownloadUrl(""); + candidateSegments = UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs, + List.of(segmentWithNoDownloadUrl), System.currentTimeMillis()); + Assert.assertEquals(candidateSegments.size(), 0); + + // candidates are within buffer period + SegmentZKMetadata segmentWithinBufferPeriod = new SegmentZKMetadata("testTable__3"); + segmentWithinBufferPeriod.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + segmentWithinBufferPeriod.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d")); + segmentWithinBufferPeriod.setEndTime(System.currentTimeMillis()); + segmentWithinBufferPeriod.setTimeUnit(TimeUnit.MILLISECONDS); + segmentWithinBufferPeriod.setTotalDocs(100L); + segmentWithinBufferPeriod.setCrc(1000); + segmentWithinBufferPeriod.setDownloadUrl("fs://testTable__3"); + candidateSegments = UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs, + List.of(segmentWithinBufferPeriod), System.currentTimeMillis()); + Assert.assertEquals(candidateSegments.size(), 0); + + // no completed segment + SegmentZKMetadata incompleteSegment = new SegmentZKMetadata("testTable__4"); + incompleteSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); + incompleteSegment.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d")); + incompleteSegment.setTimeUnit(TimeUnit.MILLISECONDS); + incompleteSegment.setTotalDocs(100L); + incompleteSegment.setCrc(1000); + candidateSegments = UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs, + List.of(incompleteSegment), System.currentTimeMillis()); + Assert.assertEquals(candidateSegments.size(), 0); + } + + @Test + public void testGetDownloadUrl() { + // empty list + List segmentMergerMetadataList = Arrays.asList(); + Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), ""); + + // single segment + segmentMergerMetadataList = + List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10)); + Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), "fs://testTable__0"); + + // multiple segments + segmentMergerMetadataList = Arrays.asList( + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10), + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20) + ); + Assert.assertEquals(_taskGenerator.getDownloadUrl(segmentMergerMetadataList), + "fs://testTable__0,fs://testTable__1"); + } + + @Test + public void testGetSegmentCrcList() { + // empty list + List segmentMergerMetadataList = Arrays.asList(); + Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), ""); + + // single segment + segmentMergerMetadataList = + List.of(new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10)); + Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000"); + + // multiple segments + segmentMergerMetadataList = Arrays.asList( + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment, 100, 10), + new UpsertCompactMergeTaskGenerator.SegmentMergerMetadata(_completedSegment2, 200, 20) + ); + Assert.assertEquals(_taskGenerator.getSegmentCrcList(segmentMergerMetadataList), "1000,2000"); + } +}