diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 569816115530..bf67eadfff4c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1761,6 +1761,7 @@ public int numSortedRunStopTrigger() { return Math.max(numSortedRunCompactionTrigger(), stopTrigger); } + // 如果 num levels 是空, 怎默认是6 public int numLevels() { // By default, this ensures that the compaction does not fall to level 0, but at least to // level 1 diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java index 9b0fdd26ad03..b7372b0706f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java @@ -25,17 +25,23 @@ import java.util.List; /** A files unit for compaction. */ + +// 一个压缩单元 public interface CompactUnit { + // 需要输出到的level, 一般是将当前层级压缩到指定的层级 int outputLevel(); + // 需要进行压缩的文件 List files(); static CompactUnit fromLevelRuns(int outputLevel, List runs) { List files = new ArrayList<>(); for (LevelSortedRun run : runs) { + // 添加所有的文件 files.addAll(run.run().files()); } + // 返回包含所有文件的 Compact Unit return fromFiles(outputLevel, files); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java index 350b693dbf8b..4fff40962887 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/Levels.java @@ -144,7 +144,9 @@ public List allFiles() { public List levelSortedRuns() { List runs = new ArrayList<>(); + // level0 每个文件一个 SortRun level0.forEach(file -> runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file)))); + // 其它level, 每个level 有一个SortRun for (int i = 0; i < levels.size(); i++) { SortedRun run = levels.get(i); if (run.nonEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index de0a28c33417..ee3f6ecf67d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -214,14 +214,18 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul waitForLatestCompaction = true; } + // 判断要不要声明 change log writer(若是input 是直接写入 change log 目录, 所以需要创建新的滚动log 的文件) final RollingFileWriter changelogWriter = (changelogProducer == ChangelogProducer.INPUT && !isInsertOnly) ? writerFactory.createRollingChangelogFileWriter(0) : null; + + // 文件先滚动刷新到 l0 中 final RollingFileWriter dataWriter = writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); try { + // 将 buffer 中的数据刷新到 l0 中 writeBuffer.forEach( keyComparator, mergeFunction, @@ -249,12 +253,14 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul newFilesChangelog.addAll(changelogMetas); } + // 将文件添加到 level0 层 for (DataFileMeta dataMeta : dataMetas) { newFiles.add(dataMeta); compactManager.addNewFile(dataMeta); } } - + // waitForLatestCompaction=true等待上一个compaction 完成,再进行下一个压缩开始, 若 + // waitForLatestCompaction=false, 则不需要等待,直接触发压缩 trySyncLatestCompaction(waitForLatestCompaction); compactManager.triggerCompaction(forcedFullCompaction); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java index 90471d1caf49..037855d592b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java @@ -25,6 +25,8 @@ import java.util.Optional; /** Compact strategy to decide which files to select for compaction. */ + +// 压缩策略决定了被选中的压缩文件 public interface CompactStrategy { /** @@ -36,9 +38,13 @@ public interface CompactStrategy { *
  • compaction is sequential from small level to large level. * */ + + // 压缩是从小到大的顺序 Optional pick(int numLevels, List runs); /** Pick a compaction unit consisting of all existing files. */ + + // 选择一个由所有现有文件组成的压缩单元。 static Optional pickFullCompaction(int numLevels, List runs) { int maxLevel = numLevels - 1; if (runs.isEmpty() || (runs.size() == 1 && runs.get(0).level() == maxLevel)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java index 80365b0d4f00..9c5e91af9091 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java @@ -30,6 +30,8 @@ import java.util.stream.Collectors; /** Algorithm to partition several data files into the minimum number of {@link SortedRun}s. */ + +// 算法将多个数据文件分割成最少数量的SortedRun。 public class IntervalPartition { private final List files; @@ -37,6 +39,7 @@ public class IntervalPartition { public IntervalPartition(List inputFiles, Comparator keyComparator) { this.files = new ArrayList<>(inputFiles); + // 排序 this.files.sort( (o1, o2) -> { int leftResult = keyComparator.compare(o1.minKey(), o2.minKey()); @@ -70,8 +73,9 @@ public List> partition() { BinaryRow bound = null; for (DataFileMeta meta : files) { + // 比较是不是有重叠 if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) { - // larger than current right bound, conclude current section and create a new one + // larger than current right bound, conclude current section and create a new one 大于当前右边界,结束当前部分并创建一个新的部分 result.add(partition(section)); section.clear(); bound = null; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index 28853a12381f..cb34fd8cbbe9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -100,6 +100,7 @@ public boolean shouldWaitForPreparingCheckpoint() { return levels.numberOfSortedRuns() > (long) numSortedRunStopTrigger + 1; } + // 将文件写入到 L0 层,新的文件 @Override public void addNewFile(DataFileMeta file) { levels.addLevel0File(file); @@ -114,6 +115,7 @@ public List allFiles() { @Override public void triggerCompaction(boolean fullCompaction) { Optional optionalUnit; + // L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling out. 所以同一层的sorted run看做是一个全局按照pk排序的文件. List runs = levels.levelSortedRuns(); if (fullCompaction) { Preconditions.checkState( @@ -125,6 +127,7 @@ public void triggerCompaction(boolean fullCompaction) { "Trigger forced full compaction. Picking from the following runs\n{}", runs); } + // 全部压缩到最后MaxLevel optionalUnit = CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs); } else { if (taskFuture != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index a1b64072d817..bd74412c629a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -73,6 +73,7 @@ public MergeTreeCompactTask( this.upgradeFilesNum = 0; } + // 开始进行压缩 @Override protected CompactResult doCompact() throws Exception { List> candidate = new ArrayList<>(); @@ -81,6 +82,8 @@ protected CompactResult doCompact() throws Exception { // Checking the order and compacting adjacent and contiguous files // Note: can't skip an intermediate file to compact, this will destroy the overall // orderliness + + // 检查相邻和顺序的文件并进行压缩,顺序去执行压缩 for (List section : partitioned) { if (section.size() > 1) { candidate.add(section); @@ -142,6 +145,7 @@ private void rewrite(List> candidate, CompactResult toUpdate) th if (section.size() == 0) { return; } else if (section.size() == 1) { + // 遍历选中的文件进行压缩 for (DataFileMeta file : section.get(0).files()) { upgrade(file, toUpdate); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java index c31aec682b64..6754f5cce63f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java @@ -39,6 +39,8 @@ *

    See RocksDb Universal-Compaction: * https://github.com/facebook/rocksdb/wiki/Universal-Compaction. */ + +// 更低的写放大,权衡读取放大和空间放大。 public class UniversalCompaction implements CompactStrategy { private static final Logger LOG = LoggerFactory.getLogger(UniversalCompaction.class); @@ -83,6 +85,7 @@ public Optional pick(int numLevels, List runs) { CompactUnit unit = pickForSizeAmp(maxLevel, runs); if (unit != null) { if (LOG.isDebugEnabled()) { + // 由于尺寸变大导致的压缩 LOG.debug("Universal compaction due to size amplification"); } return Optional.of(unit); @@ -116,6 +119,7 @@ CompactUnit pickForSizeAmp(int maxLevel, List runs) { return null; } + // 候选人的总数量 long candidateSize = runs.subList(0, runs.size() - 1).stream() .map(LevelSortedRun::run) @@ -125,6 +129,7 @@ CompactUnit pickForSizeAmp(int maxLevel, List runs) { long earliestRunSize = runs.get(runs.size() - 1).run().totalSize(); // size amplification = percentage of additional size + // todo 方程的解析 if (candidateSize * 100 > maxSizeAmp * earliestRunSize) { updateLastOptimizedCompaction(); return CompactUnit.fromLevelRuns(maxLevel, runs); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index d061e181618b..7adaa1246aac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -195,6 +195,7 @@ protected MergeTreeWriter createWriter( KeyValueFileWriterFactory writerFactory = writerFactoryBuilder.build(partition, bucket, options); Comparator keyComparator = keyComparatorSupplier.get(); + // 声明总共有几个level, Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels()); UniversalCompaction universalCompaction = new UniversalCompaction( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java index a47662683c87..70f64e4ae3e9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java @@ -118,19 +118,24 @@ public static DataStream> rangeShuffleByKey( new OneInputTransformation<>( keyInput, "LOCAL SAMPLE", - new LocalSampleOperator<>(localSampleSize),// 对一批数据进行排序 ,取最大权重(随机生成的)的前localSampleSize的数据 + new LocalSampleOperator<>( + localSampleSize), // 对一批数据进行排序 ,取最大权重(随机生成的)的前localSampleSize的数据 new TupleTypeInfo<>( BasicTypeInfo.DOUBLE_TYPE_INFO, keyTypeInformation, BasicTypeInfo.INT_TYPE_INFO), keyInput.getParallelism()); - // 2. Collect all the samples and gather them into a sorted key range. 将所有的分区样本进行汇总,生成一个总的样本数据 + // 2. Collect all the samples and gather them into a sorted key range. + // 将所有的分区样本进行汇总,生成一个总的样本数据 OneInputTransformation, List> sampleAndHistogram = new OneInputTransformation<>( localSample, "GLOBAL SAMPLE", - new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum), // rangeNum=sinkParallelism * 10 + new GlobalSampleOperator<>( + globalSampleSize, + keyComparator, + rangeNum), // rangeNum=sinkParallelism * 10 new ListTypeInfo<>(keyTypeInformation), 1); @@ -566,8 +571,9 @@ public int get() { /** * (找到对应的分桶边界值) + * * @param sampledData 按照 zindex 排后续的数据 - * @param rangesNum 分的range的大小 + * @param rangesNum 分的range的大小 * @return * @param */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index e483e3c19f74..bbf01b33d8f5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -102,6 +102,7 @@ private StoreSinkWrite.Provider createWriteProvider( if (coreOptions.writeOnly()) { waitCompaction = false; } else { + // 不需要等待全局压缩 waitCompaction = coreOptions.prepareCommitWaitCompaction(); int deltaCommits = -1; if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { @@ -115,6 +116,8 @@ private StoreSinkWrite.Provider createWriteProvider( / checkpointConfig.getCheckpointInterval()); } + // 如何 change log 设置的是 full-compaction,全压缩 + // deltaCommits, 当增量提交的数量达到一定程度后进行压缩 if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { @@ -134,6 +137,7 @@ private StoreSinkWrite.Provider createWriteProvider( } } + // 实现 deletion vector 和 change producer if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) { return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { assertNoSinkMaterializer.run(); @@ -149,7 +153,7 @@ private StoreSinkWrite.Provider createWriteProvider( metricGroup); }; } - + // 正常写入器 return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { assertNoSinkMaterializer.run(); return new StoreSinkWriteImpl(