From 52ad7f56ba9bdbdee9363b62a259908135a0fdcd Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Sun, 25 Feb 2024 16:47:29 -0800 Subject: [PATCH] Use splits weights. (#8822) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8822 HiveConnectorSplit has splitWeight member. It is being used by Presto coordinator to schedule splits while monitoring how busy Tasks are with running and queued splits. This change adds book keeping for such weights. We update them only for TableScans, as this is what Presto Java does. In addition, we add numRunningTableScanSplits and numQueuedTableScanSplits, which are also updated only for TableScans. After this change is in, we will need to update presto_cpp to use and report the new stats. Reviewed By: pranjalssh Differential Revision: D54049212 fbshipit-source-id: 9dfeaa6885185e8aa94e4477dcee91c8932851a7 --- velox/connectors/Connector.h | 7 +- velox/connectors/hive/HiveConnectorSplit.h | 5 +- velox/exec/Exchange.cpp | 4 +- velox/exec/Merge.cpp | 2 +- velox/exec/TableScan.cpp | 7 +- velox/exec/TableScan.h | 1 + velox/exec/Task.cpp | 59 +++++-- velox/exec/Task.h | 20 ++- velox/exec/TaskStats.h | 6 + velox/exec/TaskStructs.h | 3 + velox/exec/tests/TableScanTest.cpp | 156 +++++++++++++++++- .../tests/utils/HiveConnectorTestBase.cpp | 4 +- .../exec/tests/utils/HiveConnectorTestBase.h | 18 +- 13 files changed, 259 insertions(+), 33 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index b0ccb52c22c0..06b705d7b023 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -43,11 +43,14 @@ class DataSource; // as a RowVectorPtr, potentially after processing pushdowns. struct ConnectorSplit { const std::string connectorId; + const int64_t splitWeight{0}; std::unique_ptr> dataSource; - explicit ConnectorSplit(const std::string& _connectorId) - : connectorId(_connectorId) {} + explicit ConnectorSplit( + const std::string& _connectorId, + int64_t _splitWeight = 0) + : connectorId(_connectorId), splitWeight(_splitWeight) {} virtual ~ConnectorSplit() {} diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index 15be998884b0..10fa9206ec2d 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -50,8 +50,9 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { std::optional _tableBucketNumber = std::nullopt, const std::unordered_map& _customSplitInfo = {}, const std::shared_ptr& _extraFileInfo = {}, - const std::unordered_map& _serdeParameters = {}) - : ConnectorSplit(connectorId), + const std::unordered_map& _serdeParameters = {}, + int64_t _splitWeight = 0) + : ConnectorSplit(connectorId, _splitWeight), filePath(_filePath), fileFormat(_fileFormat), start(_start), diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index 775d79613191..6f09aaf5a9dd 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -50,7 +50,7 @@ bool Exchange::getSplits(ContinueFuture* future) { noMoreSplits_ = true; if (atEnd_) { operatorCtx_->task()->multipleSplitsFinished( - stats_.rlock()->numSplits); + false, stats_.rlock()->numSplits, 0); recordExchangeClientStats(); } return false; @@ -83,7 +83,7 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) { if (!currentPages_.empty() || atEnd_) { if (atEnd_ && noMoreSplits_) { const auto numSplits = stats_.rlock()->numSplits; - operatorCtx_->task()->multipleSplitsFinished(numSplits); + operatorCtx_->task()->multipleSplitsFinished(false, numSplits, 0); } recordExchangeClientStats(); return BlockingReason::kNotBlocked; diff --git a/velox/exec/Merge.cpp b/velox/exec/Merge.cpp index a5ef800232f3..dcca6d28e2c4 100644 --- a/velox/exec/Merge.cpp +++ b/velox/exec/Merge.cpp @@ -355,7 +355,7 @@ BlockingReason MergeExchange::addMergeSources(ContinueFuture* future) { } // TODO Delay this call until all input data has been processed. operatorCtx_->task()->multipleSplitsFinished( - remoteSourceTaskIds_.size()); + false, remoteSourceTaskIds_.size(), 0); return BlockingReason::kNotBlocked; } } else { diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index d23c0c5f15fa..08ec4129f5af 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -127,8 +127,13 @@ RowVectorPtr TableScan::getOutput() { } const auto& connectorSplit = split.connectorSplit; + currentSplitWeight_ = connectorSplit->splitWeight; needNewSplit_ = false; + // A point for test code injection. + TestValue::adjust( + "facebook::velox::exec::TableScan::getOutput::gotSplit", this); + VELOX_CHECK_EQ( connector_->connectorId(), connectorSplit->connectorId, @@ -261,7 +266,7 @@ RowVectorPtr TableScan::getOutput() { } curStatus_ = "getOutput: task->splitFinished"; - driverCtx_->task->splitFinished(); + driverCtx_->task->splitFinished(true, currentSplitWeight_); needNewSplit_ = true; } } diff --git a/velox/exec/TableScan.h b/velox/exec/TableScan.h index debd8f8d8b38..42f14eb52baf 100644 --- a/velox/exec/TableScan.h +++ b/velox/exec/TableScan.h @@ -80,6 +80,7 @@ class TableScan : public SourceOperator { memory::MemoryPool* const connectorPool_; ContinueFuture blockingFuture_{ContinueFuture::makeEmpty()}; BlockingReason blockingReason_; + int64_t currentSplitWeight_{0}; bool needNewSplit_ = true; std::shared_ptr connector_; std::shared_ptr connectorQueryCtx_; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index ced22d540cc6..35b64ffcc533 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -134,7 +134,8 @@ void buildSplitStates( // Not all leaf nodes require splits. ValuesNode doesn't. Check if this plan // node requires splits. if (planNode->requiresSplits()) { - splitStateMap[planNode->id()]; + splitStateMap[planNode->id()].sourceIsTableScan = + (dynamic_cast(planNode) != nullptr); } return; } @@ -1220,6 +1221,11 @@ std::unique_ptr Task::addSplitLocked( if (split.connectorSplit) { VELOX_CHECK_NULL(split.connectorSplit->dataSource); + if (splitsState.sourceIsTableScan) { + ++taskStats_.numQueuedTableScanSplits; + taskStats_.queuedTableScanSplitWeights += + split.connectorSplit->splitWeight; + } } if (!split.hasGroup()) { @@ -1359,10 +1365,13 @@ BlockingReason Task::getSplitOrFuture( exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits, - std::function)> preload) { + const std::function)>& + preload) { std::lock_guard l(mutex_); + auto& splitsState = getPlanNodeSplitsStateLocked(planNodeId); return getSplitOrFutureLocked( - getPlanNodeSplitsStateLocked(planNodeId).groupSplitsStores[splitGroupId], + splitsState.sourceIsTableScan, + splitsState.groupSplitsStores[splitGroupId], split, future, maxPreloadSplits, @@ -1370,11 +1379,13 @@ BlockingReason Task::getSplitOrFuture( } BlockingReason Task::getSplitOrFutureLocked( + bool forTableScan, SplitsStore& splitsStore, exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits, - std::function)> preload) { + const std::function)>& + preload) { if (splitsStore.splits.empty()) { if (splitsStore.noMoreSplits) { return BlockingReason::kNotBlocked; @@ -1386,23 +1397,26 @@ BlockingReason Task::getSplitOrFutureLocked( return BlockingReason::kWaitForSplit; } - split = getSplitLocked(splitsStore, maxPreloadSplits, preload); + split = getSplitLocked(forTableScan, splitsStore, maxPreloadSplits, preload); return BlockingReason::kNotBlocked; } exec::Split Task::getSplitLocked( + bool forTableScan, SplitsStore& splitsStore, int32_t maxPreloadSplits, - std::function)> preload) { + const std::function)>& + preload) { int32_t readySplitIndex = -1; if (maxPreloadSplits) { for (auto i = 0; i < splitsStore.splits.size() && i < maxPreloadSplits; ++i) { - auto& split = splitsStore.splits[i].connectorSplit; - if (!split->dataSource) { + auto& connectorSplit = splitsStore.splits[i].connectorSplit; + if (!connectorSplit->dataSource) { // Initializes split->dataSource - preload(split); - } else if (readySplitIndex == -1 && split->dataSource->hasValue()) { + preload(connectorSplit); + } else if ( + readySplitIndex == -1 && connectorSplit->dataSource->hasValue()) { readySplitIndex = i; } } @@ -1416,6 +1430,13 @@ exec::Split Task::getSplitLocked( --taskStats_.numQueuedSplits; ++taskStats_.numRunningSplits; + if (forTableScan && split.connectorSplit) { + --taskStats_.numQueuedTableScanSplits; + ++taskStats_.numRunningTableScanSplits; + taskStats_.queuedTableScanSplitWeights -= split.connectorSplit->splitWeight; + taskStats_.runningTableScanSplitWeights += + split.connectorSplit->splitWeight; + } taskStats_.lastSplitStartTimeMs = getCurrentTimeMs(); if (taskStats_.firstSplitStartTimeMs == 0) { taskStats_.firstSplitStartTimeMs = taskStats_.lastSplitStartTimeMs; @@ -1424,19 +1445,30 @@ exec::Split Task::getSplitLocked( return split; } -void Task::splitFinished() { +void Task::splitFinished(bool fromTableScan, int64_t splitWeight) { std::lock_guard l(mutex_); ++taskStats_.numFinishedSplits; --taskStats_.numRunningSplits; + if (fromTableScan) { + --taskStats_.numRunningTableScanSplits; + taskStats_.runningTableScanSplitWeights -= splitWeight; + } if (isAllSplitsFinishedLocked()) { taskStats_.executionEndTimeMs = getCurrentTimeMs(); } } -void Task::multipleSplitsFinished(int32_t numSplits) { +void Task::multipleSplitsFinished( + bool fromTableScan, + int32_t numSplits, + int64_t splitsWeight) { std::lock_guard l(mutex_); taskStats_.numFinishedSplits += numSplits; taskStats_.numRunningSplits -= numSplits; + if (fromTableScan) { + taskStats_.numRunningTableScanSplits -= numSplits; + taskStats_.runningTableScanSplitWeights -= splitsWeight; + } if (isAllSplitsFinishedLocked()) { taskStats_.executionEndTimeMs = getCurrentTimeMs(); } @@ -1847,7 +1879,8 @@ ContinueFuture Task::terminate(TaskState terminalState) { std::vector splits; for (auto& [groupId, store] : splitState.groupSplitsStores) { while (!store.splits.empty()) { - splits.emplace_back(getSplitLocked(store, 0, nullptr)); + splits.emplace_back(getSplitLocked( + splitState.sourceIsTableScan, store, 0, nullptr)); } } if (!splits.empty()) { diff --git a/velox/exec/Task.h b/velox/exec/Task.h index cb4a8507f13a..a8ed9ddc775d 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -360,12 +360,15 @@ class Task : public std::enable_shared_from_this { exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits = 0, - std::function)> preload = - nullptr); + const std::function)>& + preload = nullptr); - void splitFinished(); + void splitFinished(bool fromTableScan, int64_t splitWeight); - void multipleSplitsFinished(int32_t numSplits); + void multipleSplitsFinished( + bool fromTableScan, + int32_t numSplits, + int64_t splitsWeight); /// Adds a MergeSource for the specified splitGroupId and planNodeId. std::shared_ptr addLocalMergeSource( @@ -784,19 +787,22 @@ class Task : public std::enable_shared_from_this { /// Retrieve a split or split future from the given split store structure. BlockingReason getSplitOrFutureLocked( + bool forTableScan, SplitsStore& splitsStore, exec::Split& split, ContinueFuture& future, int32_t maxPreloadSplits = 0, - std::function)> preload = - nullptr); + const std::function)>& + preload = nullptr); /// Returns next split from the store. The caller must ensure the store is not /// empty. exec::Split getSplitLocked( + bool forTableScan, SplitsStore& splitsStore, int32_t maxPreloadSplits, - std::function)> preload); + const std::function)>& + preload); // Creates for the given split group and fills up the 'SplitGroupState' // structure, which stores inter-operator state (local exchange, bridges). diff --git a/velox/exec/TaskStats.h b/velox/exec/TaskStats.h index be5475b37788..85e7690d6c50 100644 --- a/velox/exec/TaskStats.h +++ b/velox/exec/TaskStats.h @@ -52,6 +52,12 @@ struct TaskStats { int32_t numQueuedSplits{0}; std::unordered_set completedSplitGroups; + /// Table scan split stats. + int32_t numRunningTableScanSplits{0}; + int32_t numQueuedTableScanSplits{0}; + int64_t runningTableScanSplitWeights{0}; + int64_t queuedTableScanSplitWeights{0}; + /// The subscript is given by each Operator's /// DriverCtx::pipelineId. This is a sum total reflecting fully /// processed Splits for Drivers of this pipeline. diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index 5ceec37289bb..a266902cd4cc 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -64,6 +64,9 @@ struct SplitsStore { /// Structure contains the current info on splits for a particular plan node. struct SplitsState { + /// True if the source node is a table scan. + bool sourceIsTableScan{false}; + /// Plan node-wide 'no more splits'. bool noMoreSplits{false}; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 76485ebb37fa..b1fe42f504e9 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -14,17 +14,20 @@ * limitations under the License. */ #include "velox/exec/TableScan.h" +#include #include "velox/common/base/Fs.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/tests/utils/DataFiles.h" +#include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/PlanNodeStats.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/LocalExchangeSource.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/expression/ExprToSubfieldFilter.h" #include "velox/type/Timestamp.h" @@ -54,6 +57,8 @@ class TableScanTest : public virtual HiveConnectorTestBase { protected: void SetUp() override { HiveConnectorTestBase::SetUp(); + exec::ExchangeSource::factories().clear(); + exec::ExchangeSource::registerFactory(createLocalExchangeSource); } static void SetUpTestCase() { @@ -68,8 +73,9 @@ class TableScanTest : public virtual HiveConnectorTestBase { return HiveConnectorTestBase::makeVectors(inputs, count, rowsPerVector); } - exec::Split makeHiveSplit(std::string path) { - return exec::Split(makeHiveConnectorSplit(std::move(path))); + exec::Split makeHiveSplit(std::string path, int64_t splitWeight = 0) { + return exec::Split(makeHiveConnectorSplit( + std::move(path), 0, std::numeric_limits::max(), splitWeight)); } std::shared_ptr assertQuery( @@ -1369,6 +1375,152 @@ TEST_F(TableScanTest, waitForSplit) { duckDbQueryRunner_); } +TEST_F(TableScanTest, tableScanSplitsAndWeights) { + // Create 10 data files for 10 splits. + const size_t numSplits{10}; + const auto filePaths = makeFilePaths(numSplits); + auto vectors = makeVectors(numSplits, 100); + for (auto i = 0; i < numSplits; i++) { + writeToFile(filePaths[i]->path, vectors[i]); + } + + // Set the table scan operators wait twice: + // First, before acquiring a split and then after. + std::atomic_uint32_t numAcquiredSplits{0}; + std::shared_mutex pauseTableScan; + std::shared_mutex pauseSplitProcessing; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::TableScan::getOutput", + std::function( + ([&](const TableScan* /*tableScan*/) { + pauseTableScan.lock_shared(); + pauseTableScan.unlock_shared(); + }))); + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::TableScan::getOutput::gotSplit", + std::function( + ([&](const TableScan* /*tableScan*/) { + ++numAcquiredSplits; + pauseSplitProcessing.lock_shared(); + pauseSplitProcessing.unlock_shared(); + }))); + // This will stop table scan operators from proceeding reading from the + // acquired splits. + pauseTableScan.lock(); + pauseSplitProcessing.lock(); + + // Prepare leaf task for the remote exchange node to pull data from. + auto leafTaskId = "local://leaf-0"; + auto leafPlan = PlanBuilder() + .values(vectors) + .partitionedOutput({}, 1, {"c0", "c1", "c2"}) + .planNode(); + std::unordered_map config; + auto queryCtx = std::make_shared( + executor_.get(), core::QueryConfig(std::move(config))); + core::PlanFragment planFragment{leafPlan}; + Consumer consumer = nullptr; + auto leafTask = Task::create( + leafTaskId, + core::PlanFragment{leafPlan}, + 0, + std::move(queryCtx), + std::move(consumer)); + leafTask->start(4); + + // Main task plan with table scan and remote exchange. + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId scanNodeId, exchangeNodeId; + auto planNode = PlanBuilder(planNodeIdGenerator, pool_.get()) + .tableScan(rowType_) + .capturePlanNodeId(scanNodeId) + .project({"c0 AS t0", "c1 AS t1", "c2 AS t2"}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator, pool_.get()) + .exchange(leafPlan->outputType()) + .capturePlanNodeId(exchangeNodeId) + // .values(vectors) + // .partitionedOutput({}, 1, {"c0", "c1", "c2"}) + .project({"c0 AS u0", "c1 AS u1", "c2 AS u2"}) + .planNode(), + "", + {"t1"}, + core::JoinType::kAnti) + .planNode(); + + // Create task, cursor, start the task and supply the table scan splits. + const int32_t numDrivers = 6; + CursorParameters params; + params.planNode = planNode; + params.maxDrivers = numDrivers; + auto cursor = TaskCursor::create(params); + cursor->start(); + auto task = cursor->task(); + int64_t totalSplitWeights{0}; + for (auto fileIndex = 0; fileIndex < numSplits; ++fileIndex) { + const int64_t splitWeight = fileIndex * 10 + 1; + totalSplitWeights += splitWeight; + auto split = makeHiveSplit(filePaths[fileIndex]->path, splitWeight); + task->addSplit(scanNodeId, std::move(split)); + } + task->noMoreSplits(scanNodeId); + // Manage remote exchange splits. + task->addSplit( + exchangeNodeId, + exec::Split(std::make_shared(leafTaskId))); + task->noMoreSplits(exchangeNodeId); + + // Check the task stats. + auto stats = task->taskStats(); + EXPECT_EQ(stats.numRunningTableScanSplits, 0); + EXPECT_EQ(stats.numQueuedTableScanSplits, numSplits); + EXPECT_EQ(stats.runningTableScanSplitWeights, 0); + EXPECT_EQ(stats.queuedTableScanSplitWeights, totalSplitWeights); + EXPECT_EQ(stats.numTotalSplits, numSplits + 1); + + // Let all the operators proceed to acquire splits. + pauseTableScan.unlock(); + + // Wait till 6 out of 10 splits are acquired by the operators in 6 threads + while (numAcquiredSplits < numDrivers) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // Check the task stats. + int64_t runningSplitWeights{0}; + for (auto i = 0; i < numAcquiredSplits; ++i) { + runningSplitWeights += i * 10 + 1; + } + stats = task->taskStats(); + const auto queuedSplitWeights = totalSplitWeights - runningSplitWeights; + EXPECT_EQ(stats.numRunningTableScanSplits, numDrivers); + EXPECT_EQ(stats.numQueuedTableScanSplits, numSplits - numDrivers); + EXPECT_EQ(stats.runningTableScanSplitWeights, runningSplitWeights); + EXPECT_EQ(stats.queuedTableScanSplitWeights, queuedSplitWeights); + + // Let all the operators proceed. + pauseSplitProcessing.unlock(); + + // Finish the task. + std::vector result; + while (cursor->moveNext()) { + result.push_back(cursor->current()); + } + EXPECT_TRUE(waitForTaskCompletion(leafTask.get())) << leafTask->taskId(); + EXPECT_TRUE(waitForTaskCompletion(task.get())) << task->taskId(); + + // Check task stats again. + stats = task->taskStats(); + EXPECT_EQ(stats.numRunningTableScanSplits, 0); + EXPECT_EQ(stats.numQueuedTableScanSplits, 0); + EXPECT_EQ(stats.runningTableScanSplitWeights, 0); + EXPECT_EQ(stats.queuedTableScanSplitWeights, 0); + EXPECT_EQ(numAcquiredSplits, numSplits); +} + TEST_F(TableScanTest, splitOffsetAndLength) { auto vectors = makeVectors(10, 1'000); auto filePath = TempFilePath::create(); diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index 70d7c630abc9..ece0d3545bdf 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -180,10 +180,12 @@ std::shared_ptr HiveConnectorTestBase::makeHiveConnectorSplit( const std::string& filePath, uint64_t start, - uint64_t length) { + uint64_t length, + int64_t splitWeight) { return HiveConnectorSplitBuilder(filePath) .start(start) .length(length) + .splitWeight(splitWeight) .build(); } diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 34ad0be17713..8f7a03fe6cff 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -71,7 +71,8 @@ class HiveConnectorTestBase : public OperatorTestBase { static std::shared_ptr makeHiveConnectorSplit( const std::string& filePath, uint64_t start = 0, - uint64_t length = std::numeric_limits::max()); + uint64_t length = std::numeric_limits::max(), + int64_t splitWeight = 0); /// Split file at path 'filePath' into 'splitCount' splits. If not local file, /// file size can be given as 'externalSize'. @@ -197,6 +198,11 @@ class HiveConnectorSplitBuilder { return *this; } + HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { + splitWeight_ = splitWeight; + return *this; + } + HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { fileFormat_ = format; return *this; @@ -220,6 +226,9 @@ class HiveConnectorSplitBuilder { } std::shared_ptr build() const { + static const std::unordered_map customSplitInfo; + static const std::shared_ptr extraFileInfo; + static const std::unordered_map serdeParameters; return std::make_shared( connectorId_, filePath_.find("/") == 0 ? "file:" + filePath_ : filePath_, @@ -227,7 +236,11 @@ class HiveConnectorSplitBuilder { start_, length_, partitionKeys_, - tableBucketNumber_); + tableBucketNumber_, + customSplitInfo, + extraFileInfo, + serdeParameters, + splitWeight_); } private: @@ -238,6 +251,7 @@ class HiveConnectorSplitBuilder { std::unordered_map> partitionKeys_; std::optional tableBucketNumber_; std::string connectorId_ = kHiveConnectorId; + int64_t splitWeight_{0}; }; } // namespace facebook::velox::exec::test