Skip to content

Commit

Permalink
comment code
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Sep 28, 2024
1 parent df420db commit d62a7f5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
boolean isSortBySize) {
Transformation<Tuple2<T, RowData>> input = inputDataStream.getTransformation();

// 计算数据类型的字节长度 return Tuple2<T, Integer> => Tuple2<zIndex, 整row字节长度,包含所有字段>
OneInputTransformation<Tuple2<T, RowData>, Tuple2<T, Integer>> keyInput =
new OneInputTransformation<>(
input,
Expand All @@ -112,31 +113,33 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
new TupleTypeInfo<>(keyTypeInformation, BasicTypeInfo.INT_TYPE_INFO),
input.getParallelism());

// 1. Fixed size sample in each partitions.
// 1. Fixed size sample in each partitions. (根据localSampleSize(sink 并行度 * 放大倍数)获取样本大小) 多个分区
OneInputTransformation<Tuple2<T, Integer>, Tuple3<Double, T, Integer>> localSample =
new OneInputTransformation<>(
keyInput,
"LOCAL SAMPLE",
new LocalSampleOperator<>(localSampleSize),
new LocalSampleOperator<>(localSampleSize),// 对一批数据进行排序 ,取最大权重(随机生成的)的前localSampleSize的数据
new TupleTypeInfo<>(
BasicTypeInfo.DOUBLE_TYPE_INFO,
keyTypeInformation,
BasicTypeInfo.INT_TYPE_INFO),
keyInput.getParallelism());

// 2. Collect all the samples and gather them into a sorted key range.
// 2. Collect all the samples and gather them into a sorted key range. 将所有的分区样本进行汇总,生成一个总的样本数据
OneInputTransformation<Tuple3<Double, T, Integer>, List<T>> sampleAndHistogram =
new OneInputTransformation<>(
localSample,
"GLOBAL SAMPLE",
new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum),
new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum), // rangeNum=sinkParallelism * 10
new ListTypeInfo<>(keyTypeInformation),
1);

// 3. Take range boundaries as broadcast input and take the tuple of partition id and
// record as output.
// The shuffle mode of input edge must be BATCH to avoid dead lock. See
// DeadlockBreakupProcessor.

// 将拆分出的边际数据发给所有的分区,然后进行洗牌分配到对应的分区
TwoInputTransformation<List<T>, Tuple2<T, RowData>, Tuple2<Integer, Tuple2<T, RowData>>>
preparePartition =
new TwoInputTransformation<>(
Expand Down Expand Up @@ -241,6 +244,7 @@ public void open() throws Exception {
sampler = new Sampler<>(numSample, System.nanoTime());
}

// zindex 和 rowsize
@Override
public void processElement(StreamRecord<Tuple2<T, Integer>> streamRecord) throws Exception {
sampler.collect(streamRecord.getValue());
Expand Down Expand Up @@ -310,12 +314,14 @@ public void endInput() {
sampledData.add(sampled.next().f1);
}

// 将样本根据 zIndex 进行排序
sampledData.sort((o1, o2) -> keyComparator.compare(o1.f0, o2.f0));

List<T> range;
if (sampledData.isEmpty()) {
range = new ArrayList<>();
} else {
// 开始分range
range = Arrays.asList(allocateRangeBaseSize(sampledData, rangesNum));
}

Expand Down Expand Up @@ -352,6 +358,7 @@ public void open() throws Exception {
this.collector = new StreamRecordCollector<>(output);
}

// List<T> 代表的是有 zindex 组成的数据的分区边界值的集合
@Override
public void processElement1(StreamRecord<List<T>> streamRecord) {
keyIndex = new ArrayList<>();
Expand Down Expand Up @@ -381,6 +388,7 @@ public void processElement2(StreamRecord<Tuple2<T, RowData>> streamRecord) {
collector.collect(new Tuple2<>(0, streamRecord.getValue()));
} else {
Tuple2<T, RowData> row = streamRecord.getValue();
// 找到对应的分区
collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
}
Expand Down Expand Up @@ -489,6 +497,7 @@ private static class Sampler<T> {

private final int numSamples;
private final Random random;
// Priority Queue, 小顶
private final PriorityQueue<Tuple2<Double, T>> queue;

private int index = 0;
Expand All @@ -503,6 +512,7 @@ private static class Sampler<T> {
Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative.");
this.numSamples = numSamples;
this.random = new XORShiftRandom(seed);
// Priority Queue, 小顶堆实现
this.queue = new PriorityQueue<>(numSamples, Comparator.comparingDouble(o -> o.f0));
}

Expand All @@ -511,10 +521,13 @@ void collect(T rowData) {
}

void collect(double weight, T key) {

if (index < numSamples) {
// 未达到指定样本的总数,直接添加数据
// Fill the queue with first K elements from input.
addQueue(weight, key);
} else {
// 达到样本总数后,开始移除权重小的,添加当前元素
// Remove the element with the smallest weight,
// and append current element into the queue.
if (weight > smallest.f0) {
Expand Down Expand Up @@ -551,15 +564,27 @@ public int get() {
}
}

/**
* (找到对应的分桶边界值)
* @param sampledData 按照 zindex 排后续的数据
* @param rangesNum 分的range的大小
* @return
* @param <T>
*/
@VisibleForTesting
static <T> T[] allocateRangeBaseSize(List<Tuple2<T, Integer>> sampledData, int rangesNum) {

// 样本数量
int sampeNum = sampledData.size();
// 边界值数量
int boundarySize = rangesNum - 1;
@SuppressWarnings("unchecked")
T[] boundaries = (T[]) new Object[boundarySize];

if (!sampledData.isEmpty()) {
// t.f1 row size 的值
long restSize = sampledData.stream().mapToLong(t -> (long) t.f1).sum();
// 求 range 的步长
double stepRange = restSize / (double) rangesNum;

int currentWeight = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static <KEY> DataStream<RowData> sortStreamByKey(
final InternalTypeInfo<InternalRow> internalRowType =
InternalTypeInfo.fromRowType(longRowType);

// generate the KEY as the key of Pair.
// generate the KEY as the key of Pair.(返回的Key是生成的zIndex的key,通过位交叉获得的,类iceberg的处理)
DataStream<Tuple2<KEY, RowData>> inputWithKey =
inputStream
.map(
Expand Down

0 comments on commit d62a7f5

Please sign in to comment.