diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index ff61c5ee13de20..463cf629b48aaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -337,10 +337,11 @@ public void createScanRangeLocations() throws UserException { locationType = getLocationType(fileSplit.getPath().toString()); } totalFileSize = fileSplit.getLength() * inputSplitsNum; + long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime(); // Not accurate, only used to estimate concurrency. int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends(); for (Backend backend : backendPolicy.getBackends()) { - SplitSource splitSource = new SplitSource(backend, splitAssignment); + SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime); splitSources.add(splitSource); Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource); TScanRangeLocations curLocations = newLocations(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java index dce135292ec8a9..8515e686f36bd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -44,17 +44,18 @@ public class SplitSource { private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0); private static final long WAIT_TIME_OUT = 100; // 100ms - private static final long MAX_WAIT_TIME_OUT = 500; // 500ms private final long uniqueId; private final Backend backend; private final SplitAssignment splitAssignment; private final AtomicBoolean isLastBatch; + private final long maxWaitTime; - public SplitSource(Backend backend, SplitAssignment splitAssignment) { + public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) { this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement(); this.backend = backend; this.splitAssignment = splitAssignment; + this.maxWaitTime = maxWaitTime; this.isLastBatch = new AtomicBoolean(false); splitAssignment.registerSource(uniqueId); } @@ -71,7 +72,7 @@ public List getNextBatch(int maxBatchSize) throws UserExcep return Collections.emptyList(); } List scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize); - long maxTimeOut = 0; + long startTime = System.currentTimeMillis(); while (scanRanges.size() < maxBatchSize) { BlockingQueue> splits = splitAssignment.getAssignedSplits(backend); if (splits == null) { @@ -81,18 +82,19 @@ public List getNextBatch(int maxBatchSize) throws UserExcep while (scanRanges.size() < maxBatchSize) { try { Collection splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS); + if (splitCollection != null) { + scanRanges.addAll(splitCollection); + } + if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) { + return scanRanges; + } if (splitCollection == null) { - maxTimeOut += WAIT_TIME_OUT; break; } - scanRanges.addAll(splitCollection); } catch (InterruptedException e) { throw new UserException("Failed to get next batch of splits", e); } } - if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) { - break; - } } return scanRanges; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 534b4f3386a50d..94568f01cd5cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -417,6 +417,8 @@ public class SessionVariable implements Serializable, Writable { public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode"; + public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time"; + /** * use insert stmt as the unified backend for all loads */ @@ -1461,6 +1463,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { needForward = true) public int numPartitionsInBatchMode = 1024; + @VariableMgr.VarAttr( + name = FETCH_SPLITS_MAX_WAIT_TIME, + description = {"batch方式中BE获取splits的最大等待时间", + "The max wait time of getting splits in batch mode."}, + needForward = true) + public long fetchSplitsMaxWaitTime = 4000; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2713,6 +2722,14 @@ public void setNumSplitsInBatchMode(int numPartitionsInBatchMode) { this.numPartitionsInBatchMode = numPartitionsInBatchMode; } + public long getFetchSplitsMaxWaitTime() { + return fetchSplitsMaxWaitTime; + } + + public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) { + this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime; + } + public boolean isEnableParquetLazyMat() { return enableParquetLazyMat; }