Skip to content

Commit

Permalink
read source code
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Oct 2, 2024
1 parent d62a7f5 commit c6f2067
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,7 @@ public int numSortedRunStopTrigger() {
return Math.max(numSortedRunCompactionTrigger(), stopTrigger);
}

// 如果 num levels 是空, 怎默认是6
public int numLevels() {
// By default, this ensures that the compaction does not fall to level 0, but at least to
// level 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@
import java.util.List;

/** A files unit for compaction. */

// 一个压缩单元
public interface CompactUnit {

// 需要输出到的level, 一般是将当前层级压缩到指定的层级
int outputLevel();

// 需要进行压缩的文件
List<DataFileMeta> files();

static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {
List<DataFileMeta> files = new ArrayList<>();
for (LevelSortedRun run : runs) {
// 添加所有的文件
files.addAll(run.run().files());
}
// 返回包含所有文件的 Compact Unit
return fromFiles(outputLevel, files);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ public List<DataFileMeta> allFiles() {

public List<LevelSortedRun> levelSortedRuns() {
List<LevelSortedRun> runs = new ArrayList<>();
// level0 每个文件一个 SortRun
level0.forEach(file -> runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file))));
// 其它level, 每个level 有一个SortRun
for (int i = 0; i < levels.size(); i++) {
SortedRun run = levels.get(i);
if (run.nonEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,18 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
waitForLatestCompaction = true;
}

// 判断要不要声明 change log writer(若是input 是直接写入 change log 目录, 所以需要创建新的滚动log 的文件)
final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
(changelogProducer == ChangelogProducer.INPUT && !isInsertOnly)
? writerFactory.createRollingChangelogFileWriter(0)
: null;

// 文件先滚动刷新到 l0 中
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);

try {
// 将 buffer 中的数据刷新到 l0 中
writeBuffer.forEach(
keyComparator,
mergeFunction,
Expand Down Expand Up @@ -249,12 +253,14 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
newFilesChangelog.addAll(changelogMetas);
}

// 将文件添加到 level0 层
for (DataFileMeta dataMeta : dataMetas) {
newFiles.add(dataMeta);
compactManager.addNewFile(dataMeta);
}
}

// waitForLatestCompaction=true等待上一个compaction 完成,再进行下一个压缩开始, 若
// waitForLatestCompaction=false, 则不需要等待,直接触发压缩
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Optional;

/** Compact strategy to decide which files to select for compaction. */

// 压缩策略决定了被选中的压缩文件
public interface CompactStrategy {

/**
Expand All @@ -36,9 +38,13 @@ public interface CompactStrategy {
* <li>compaction is sequential from small level to large level.
* </ul>
*/

// 压缩是从小到大的顺序
Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);

/** Pick a compaction unit consisting of all existing files. */

// 选择一个由所有现有文件组成的压缩单元。
static Optional<CompactUnit> pickFullCompaction(int numLevels, List<LevelSortedRun> runs) {
int maxLevel = numLevels - 1;
if (runs.isEmpty() || (runs.size() == 1 && runs.get(0).level() == maxLevel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import java.util.stream.Collectors;

/** Algorithm to partition several data files into the minimum number of {@link SortedRun}s. */

// 算法将多个数据文件分割成最少数量的SortedRun。
public class IntervalPartition {

private final List<DataFileMeta> files;
private final Comparator<InternalRow> keyComparator;

public IntervalPartition(List<DataFileMeta> inputFiles, Comparator<InternalRow> keyComparator) {
this.files = new ArrayList<>(inputFiles);
// 排序
this.files.sort(
(o1, o2) -> {
int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());
Expand Down Expand Up @@ -70,8 +73,9 @@ public List<List<SortedRun>> partition() {
BinaryRow bound = null;

for (DataFileMeta meta : files) {
// 比较是不是有重叠
if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
// larger than current right bound, conclude current section and create a new one
// larger than current right bound, conclude current section and create a new one 大于当前右边界,结束当前部分并创建一个新的部分
result.add(partition(section));
section.clear();
bound = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public boolean shouldWaitForPreparingCheckpoint() {
return levels.numberOfSortedRuns() > (long) numSortedRunStopTrigger + 1;
}

// 将文件写入到 L0 层,新的文件
@Override
public void addNewFile(DataFileMeta file) {
levels.addLevel0File(file);
Expand All @@ -114,6 +115,7 @@ public List<DataFileMeta> allFiles() {
@Override
public void triggerCompaction(boolean fullCompaction) {
Optional<CompactUnit> optionalUnit;
// L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling out. 所以同一层的sorted run看做是一个全局按照pk排序的文件.
List<LevelSortedRun> runs = levels.levelSortedRuns();
if (fullCompaction) {
Preconditions.checkState(
Expand All @@ -125,6 +127,7 @@ public void triggerCompaction(boolean fullCompaction) {
"Trigger forced full compaction. Picking from the following runs\n{}",
runs);
}
// 全部压缩到最后MaxLevel
optionalUnit = CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs);
} else {
if (taskFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public MergeTreeCompactTask(
this.upgradeFilesNum = 0;
}

// 开始进行压缩
@Override
protected CompactResult doCompact() throws Exception {
List<List<SortedRun>> candidate = new ArrayList<>();
Expand All @@ -81,6 +82,8 @@ protected CompactResult doCompact() throws Exception {
// Checking the order and compacting adjacent and contiguous files
// Note: can't skip an intermediate file to compact, this will destroy the overall
// orderliness

// 检查相邻和顺序的文件并进行压缩,顺序去执行压缩
for (List<SortedRun> section : partitioned) {
if (section.size() > 1) {
candidate.add(section);
Expand Down Expand Up @@ -142,6 +145,7 @@ private void rewrite(List<List<SortedRun>> candidate, CompactResult toUpdate) th
if (section.size() == 0) {
return;
} else if (section.size() == 1) {
// 遍历选中的文件进行压缩
for (DataFileMeta file : section.get(0).files()) {
upgrade(file, toUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
* <p>See RocksDb Universal-Compaction:
* https://github.com/facebook/rocksdb/wiki/Universal-Compaction.
*/

// 更低的写放大,权衡读取放大和空间放大。
public class UniversalCompaction implements CompactStrategy {

private static final Logger LOG = LoggerFactory.getLogger(UniversalCompaction.class);
Expand Down Expand Up @@ -83,6 +85,7 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
CompactUnit unit = pickForSizeAmp(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
// 由于尺寸变大导致的压缩
LOG.debug("Universal compaction due to size amplification");
}
return Optional.of(unit);
Expand Down Expand Up @@ -116,6 +119,7 @@ CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
return null;
}

// 候选人的总数量
long candidateSize =
runs.subList(0, runs.size() - 1).stream()
.map(LevelSortedRun::run)
Expand All @@ -125,6 +129,7 @@ CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
long earliestRunSize = runs.get(runs.size() - 1).run().totalSize();

// size amplification = percentage of additional size
// todo 方程的解析
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
updateLastOptimizedCompaction();
return CompactUnit.fromLevelRuns(maxLevel, runs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ protected MergeTreeWriter createWriter(
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
// 声明总共有几个level,
Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
UniversalCompaction universalCompaction =
new UniversalCompaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,24 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
new OneInputTransformation<>(
keyInput,
"LOCAL SAMPLE",
new LocalSampleOperator<>(localSampleSize),// 对一批数据进行排序 ,取最大权重(随机生成的)的前localSampleSize的数据
new LocalSampleOperator<>(
localSampleSize), // 对一批数据进行排序 ,取最大权重(随机生成的)的前localSampleSize的数据
new TupleTypeInfo<>(
BasicTypeInfo.DOUBLE_TYPE_INFO,
keyTypeInformation,
BasicTypeInfo.INT_TYPE_INFO),
keyInput.getParallelism());

// 2. Collect all the samples and gather them into a sorted key range. 将所有的分区样本进行汇总,生成一个总的样本数据
// 2. Collect all the samples and gather them into a sorted key range.
// 将所有的分区样本进行汇总,生成一个总的样本数据
OneInputTransformation<Tuple3<Double, T, Integer>, List<T>> sampleAndHistogram =
new OneInputTransformation<>(
localSample,
"GLOBAL SAMPLE",
new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum), // rangeNum=sinkParallelism * 10
new GlobalSampleOperator<>(
globalSampleSize,
keyComparator,
rangeNum), // rangeNum=sinkParallelism * 10
new ListTypeInfo<>(keyTypeInformation),
1);

Expand Down Expand Up @@ -566,8 +571,9 @@ public int get() {

/**
* (找到对应的分桶边界值)
*
* @param sampledData 按照 zindex 排后续的数据
* @param rangesNum 分的range的大小
* @param rangesNum 分的range的大小
* @return
* @param <T>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private StoreSinkWrite.Provider createWriteProvider(
if (coreOptions.writeOnly()) {
waitCompaction = false;
} else {
// 不需要等待全局压缩
waitCompaction = coreOptions.prepareCommitWaitCompaction();
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
Expand All @@ -115,6 +116,8 @@ private StoreSinkWrite.Provider createWriteProvider(
/ checkpointConfig.getCheckpointInterval());
}

// 如何 change log 设置的是 full-compaction,全压缩
// deltaCommits, 当增量提交的数量达到一定程度后进行压缩
if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
Expand All @@ -134,6 +137,7 @@ private StoreSinkWrite.Provider createWriteProvider(
}
}

// 实现 deletion vector 和 change producer
if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
Expand All @@ -149,7 +153,7 @@ private StoreSinkWrite.Provider createWriteProvider(
metricGroup);
};
}

// 正常写入器
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl(
Expand Down

0 comments on commit c6f2067

Please sign in to comment.