Skip to content

Commit

Permalink
[opt](split) optimize the split manner of hive and hudi (#3)
Browse files Browse the repository at this point in the history
## Proposed changes

`fetch_splits_max_wait_time ` to set the max wait time of getting
splits;

`partition_ordering` to set the ordering style of listing partitions:
natural(in default), reverse, shuffle
  • Loading branch information
AshinGau authored Jun 26, 2024
1 parent ddf282f commit 320b029
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,11 @@ public void createScanRangeLocations() throws UserException {
locationType = getLocationType(fileSplit.getPath().toString());
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
for (Backend backend : backendPolicy.getBackends()) {
SplitSource splitSource = new SplitSource(backend, splitAssignment);
SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime);
splitSources.add(splitSource);
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
TScanRangeLocations curLocations = newLocations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@
public class SplitSource {
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
private static final long WAIT_TIME_OUT = 100; // 100ms
private static final long MAX_WAIT_TIME_OUT = 500; // 500ms

private final long uniqueId;
private final Backend backend;
private final SplitAssignment splitAssignment;
private final AtomicBoolean isLastBatch;
private final long maxWaitTime;

public SplitSource(Backend backend, SplitAssignment splitAssignment) {
public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) {
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
this.backend = backend;
this.splitAssignment = splitAssignment;
this.maxWaitTime = maxWaitTime;
this.isLastBatch = new AtomicBoolean(false);
splitAssignment.registerSource(uniqueId);
}
Expand All @@ -71,7 +72,7 @@ public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserExcep
return Collections.emptyList();
}
List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize);
long maxTimeOut = 0;
long startTime = System.currentTimeMillis();
while (scanRanges.size() < maxBatchSize) {
BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend);
if (splits == null) {
Expand All @@ -81,18 +82,19 @@ public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserExcep
while (scanRanges.size() < maxBatchSize) {
try {
Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
if (splitCollection != null) {
scanRanges.addAll(splitCollection);
}
if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) {
return scanRanges;
}
if (splitCollection == null) {
maxTimeOut += WAIT_TIME_OUT;
break;
}
scanRanges.addAll(splitCollection);
} catch (InterruptedException e) {
throw new UserException("Failed to get next batch of splits", e);
}
}
if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
break;
}
}
return scanRanges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@

@Data
public class TablePartitionValues {
public enum PartitionOrdering {
NATURAL,
REVERSE,
SHUFFLE;

public static PartitionOrdering parse(String ordering) {
for (PartitionOrdering order : PartitionOrdering.values()) {
if (order.name().equalsIgnoreCase(ordering)) {
return order;
}
}
return null;
}
}

public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";

private final ReadWriteLock readWriteLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.TablePartitionValues.PartitionOrdering;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
Expand All @@ -57,6 +58,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -137,6 +139,17 @@ protected void doInitialize() throws UserException {
}
}

protected List<HivePartition> orderingPartitions(List<HivePartition> partitions) {
PartitionOrdering ordering = PartitionOrdering.parse(
ConnectContext.get().getSessionVariable().getPartitionOrdering());
if (ordering == PartitionOrdering.REVERSE) {
return Ordering.natural().onResultOf(HivePartition::getPath).reverse().sortedCopy(partitions);
} else if (ordering == PartitionOrdering.SHUFFLE) {
return Ordering.arbitrary().onResultOf(HivePartition::getPath).sortedCopy(partitions);
}
return partitions;
}

protected List<HivePartition> getPartitions() throws AnalysisException {
List<HivePartition> resPartitions = Lists.newArrayList();
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -211,7 +224,7 @@ public List<Split> getSplits() throws UserException {
long start = System.currentTimeMillis();
try {
if (!partitionInit) {
prunedPartitions = getPartitions();
prunedPartitions = orderingPartitions(getPartitions());
partitionInit = true;
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
Expand Down Expand Up @@ -289,7 +302,7 @@ public void startSplit() {
public boolean isBatchMode() {
if (!partitionInit) {
try {
prunedPartitions = getPartitions();
prunedPartitions = orderingPartitions(getPartitions());
} catch (Exception e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ public List<Split> getSplits() throws UserException {
return getIncrementalSplits();
}
if (!partitionInit) {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
prunedPartitions = orderingPartitions(HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient, snapshotTimestamp)));
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -448,9 +448,9 @@ public boolean isBatchMode() {
}
if (!partitionInit) {
// Non partition table will get one dummy partition
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
prunedPartitions = orderingPartitions(HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient, snapshotTimestamp)));
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
Expand Down
34 changes: 34 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ public class SessionVariable implements Serializable, Writable {

public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode";

public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time";

public static final String PARTITION_ORDERING = "partition_ordering";

/**
* use insert stmt as the unified backend for all loads
*/
Expand Down Expand Up @@ -1461,6 +1465,20 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
needForward = true)
public int numPartitionsInBatchMode = 1024;

@VariableMgr.VarAttr(
name = FETCH_SPLITS_MAX_WAIT_TIME,
description = {"batch方式中BE获取splits的最大等待时间",
"The max wait time of getting splits in batch mode."},
needForward = true)
public long fetchSplitsMaxWaitTime = 4000;

@VariableMgr.VarAttr(
name = PARTITION_ORDERING,
description = {"list partition的排序方式",
"Ordering style of list partition."},
needForward = true)
public String partitionOrdering = "natural";

@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
Expand Down Expand Up @@ -2713,6 +2731,22 @@ public void setNumSplitsInBatchMode(int numPartitionsInBatchMode) {
this.numPartitionsInBatchMode = numPartitionsInBatchMode;
}

public long getFetchSplitsMaxWaitTime() {
return fetchSplitsMaxWaitTime;
}

public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) {
this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime;
}

public String getPartitionOrdering() {
return partitionOrdering;
}

public void setPartitionOrdering(String partitionOrdering) {
this.partitionOrdering = partitionOrdering;
}

public boolean isEnableParquetLazyMat() {
return enableParquetLazyMat;
}
Expand Down

0 comments on commit 320b029

Please sign in to comment.