diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 3422944d3036..ee8c65ea8c57 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -252,6 +252,12 @@ inline bool isSynthesizedColumn( return name == kPath || name == kBucket || infoColumns.count(name) != 0; } +inline bool isRowIndexColumn( + const std::string& name, + std::shared_ptr rowIndexColumn) { + return rowIndexColumn != nullptr && rowIndexColumn->name() == name; +} + } // namespace const std::string& getColumnName(const common::Subfield& subfield) { @@ -343,6 +349,7 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, + const std::shared_ptr& rowIndexColumn, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> @@ -351,6 +358,7 @@ std::shared_ptr makeScanSpec( for (auto& [subfield, _] : filters) { if (auto name = subfield.toString(); !isSynthesizedColumn(name, infoColumns) && + !isRowIndexColumn(name, rowIndexColumn) && partitionKeys.count(name) == 0) { filterSubfields[getColumnName(subfield)].push_back(&subfield); } @@ -360,6 +368,9 @@ std::shared_ptr makeScanSpec( for (int i = 0; i < rowType->size(); ++i) { auto& name = rowType->nameOf(i); auto& type = rowType->childAt(i); + if (isRowIndexColumn(name, rowIndexColumn)) { + continue; + } auto it = outputSubfields.find(name); if (it == outputSubfields.end()) { auto* fieldSpec = spec->addFieldRecursively(name, *type, i); diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 2742866d516b..5493e997ad95 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -57,6 +57,7 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, + const std::shared_ptr& rowIndexColumn, memory::MemoryPool* pool); void configureReaderOptions( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 8b36b16699b1..47f261132542 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -61,6 +61,10 @@ HiveDataSource::HiveDataSource( if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) { infoColumns_.emplace(handle->name(), handle); } + + if (handle->columnType() == HiveColumnHandle::ColumnType::kRowIndex) { + rowIndexColumn_ = handle; + } } std::vector readerRowNames; @@ -153,6 +157,7 @@ HiveDataSource::HiveDataSource( hiveTableHandle_->dataColumns(), partitionKeys_, infoColumns_, + rowIndexColumn_, pool_); if (remainingFilter) { metadataFilter_ = std::make_shared( @@ -193,7 +198,7 @@ void HiveDataSource::addSplit(std::shared_ptr split) { // Split reader subclasses may need to use the reader options in prepareSplit // so we initialize it beforehand. splitReader_->configureReaderOptions(randomSkip_); - splitReader_->prepareSplit(metadataFilter_, runtimeStats_); + splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_); } std::optional HiveDataSource::next( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index b9023061a491..335bdb43b157 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -119,6 +119,7 @@ class HiveDataSource : public DataSource { const ConnectorQueryCtx* const connectorQueryCtx_; const std::shared_ptr hiveConfig_; std::shared_ptr ioStats_; + std::shared_ptr rowIndexColumn_; private: // Evaluates remainingFilter_ on the specified vector. Returns number of rows diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 412e1e5e9315..24557a5539d8 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -140,7 +140,8 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats) { + dwio::common::RuntimeStatistics& runtimeStats, + const std::shared_ptr& rowIndexColumn) { createReader(); if (checkIfSplitIsEmpty(runtimeStats)) { @@ -148,7 +149,7 @@ void SplitReader::prepareSplit( return; } - createRowReader(metadataFilter); + createRowReader(metadataFilter, rowIndexColumn); } uint64_t SplitReader::next(uint64_t size, VectorPtr& output) { @@ -278,16 +279,21 @@ bool SplitReader::checkIfSplitIsEmpty( } void SplitReader::createRowReader( - std::shared_ptr metadataFilter) { + std::shared_ptr metadataFilter, + const std::shared_ptr& rowIndexColumn) { auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema()); + auto columnNames = fileType->names(); + if (rowIndexColumn != nullptr) { + setRowIndexColumn(fileType, rowIndexColumn); + } configureRowReaderOptions( baseRowReaderOpts_, hiveTableHandle_->tableParameters(), scanSpec_, metadataFilter, - ROW(std::vector(fileType->names()), std::move(columnTypes)), + ROW(std::move(columnNames), std::move(columnTypes)), hiveSplit_); // NOTE: we firstly reset the finished 'baseRowReader_' of previous split // before setting up for the next one to avoid doubling the peak memory usage. @@ -295,6 +301,23 @@ void SplitReader::createRowReader( baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } +void SplitReader::setRowIndexColumn( + const RowTypePtr& fileType, + const std::shared_ptr& rowIndexColumn) { + auto rowIndexColumnName = rowIndexColumn->name(); + auto rowIndexMetaColIdx = + readerOutputType_->getChildIdxIfExists(rowIndexColumnName); + if (rowIndexMetaColIdx.has_value() && + !fileType->containsChild(rowIndexColumnName) && + hiveSplit_->partitionKeys.find(rowIndexColumnName) == + hiveSplit_->partitionKeys.end()) { + dwio::common::RowNumberColumnInfo rowNumberColumnInfo; + rowNumberColumnInfo.insertPosition = rowIndexMetaColIdx.value(); + rowNumberColumnInfo.name = rowIndexColumnName; + baseRowReaderOpts_.setRowNumberColumnInfo(std::move(rowNumberColumnInfo)); + } +} + std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index e066702228c4..acd92441c92f 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -91,7 +91,8 @@ class SplitReader { /// would be called only once per incoming split virtual void prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats); + dwio::common::RuntimeStatistics& runtimeStats, + const std::shared_ptr& rowIndexColumn); virtual uint64_t next(uint64_t size, VectorPtr& output); @@ -126,7 +127,9 @@ class SplitReader { /// Create the dwio::common::RowReader object baseRowReader_, which owns the /// ColumnReaders that will be used to read the data - void createRowReader(std::shared_ptr metadataFilter); + void createRowReader( + std::shared_ptr metadataFilter, + const std::shared_ptr& rowIndexColumn); /// Different table formats may have different meatadata columns. /// This function will be used to update the scanSpec for these columns. @@ -134,6 +137,10 @@ class SplitReader { const RowTypePtr& fileType, const std::shared_ptr& tableSchema); + void setRowIndexColumn( + const RowTypePtr& fileType, + const std::shared_ptr& rowIndexColumn); + void setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index 9e0dac38af77..cfc9295bd405 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -28,7 +28,14 @@ using SubfieldFilters = class HiveColumnHandle : public ColumnHandle { public: - enum class ColumnType { kPartitionKey, kRegular, kSynthesized }; + enum class ColumnType { + kPartitionKey, + kRegular, + kSynthesized, + /// A zero-based row number of type BIGINT auto-generated by the connector. + /// Rows numbers are unique within a single file only. + kRowIndex + }; /// NOTE: 'dataType' is the column type in target write table. 'hiveType' is /// converted type of the corresponding column in source table which might not diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 58515256ca21..be59e7170206 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -52,7 +52,8 @@ IcebergSplitReader::IcebergSplitReader( void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats) { + dwio::common::RuntimeStatistics& runtimeStats, + const std::shared_ptr& rowIndexColumn) { createReader(); if (checkIfSplitIsEmpty(runtimeStats)) { @@ -60,7 +61,7 @@ void IcebergSplitReader::prepareSplit( return; } - createRowReader(metadataFilter); + createRowReader(metadataFilter, rowIndexColumn); std::shared_ptr icebergSplit = std::dynamic_pointer_cast(hiveSplit_); diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index 070d519c5eaa..4e308f47b6c4 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -43,7 +43,8 @@ class IcebergSplitReader : public SplitReader { void prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats) override; + dwio::common::RuntimeStatistics& runtimeStats, + const std::shared_ptr& rowIndexColumn) override; uint64_t next(uint64_t size, VectorPtr& output) override; diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index ebc0afdcb6b4..325c1a00c440 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -92,7 +92,14 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) { auto rowType = ROW({{"c0", columnType}}); auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"}); auto scanSpec = makeScanSpec( - rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get()); + rowType, + groupSubfields(subfields), + {}, + nullptr, + {}, + {}, + nullptr, + pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); validateNullConstant(*c0c0, *BIGINT()); auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1"); @@ -128,6 +135,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant()); @@ -151,6 +159,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->maxArrayElementsCount(), 2); @@ -168,7 +177,8 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"}); auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( - makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()), + makeScanSpec( + rowType, groupedSubfields, {}, nullptr, {}, {}, nullptr, pool_.get()), "Non-positive array subscript cannot be push down"); } @@ -184,6 +194,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ( @@ -212,6 +223,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_TRUE(c0->flatMapFeatureSelection().empty()); @@ -232,6 +244,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_TRUE(mapKeyIsNotNull(*c0)); @@ -255,6 +268,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, + nullptr, pool_.get()); auto* keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -283,6 +297,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, + nullptr, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -302,6 +317,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, + nullptr, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -318,6 +334,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, + nullptr, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -354,6 +371,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) { ROW({{"c0", c0Type}, {"c1", c1Type}}), {}, {}, + nullptr, pool_.get()); auto c0 = scanSpec->childByName("c0"); ASSERT_FALSE(c0->isConstant()); @@ -399,6 +417,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -412,7 +431,14 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) { SubfieldFilters filters; filters.emplace(Subfield("ds"), exec::equal("2023-10-13")); auto scanSpec = makeScanSpec( - rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get()); + rowType, + {}, + filters, + rowType, + {{"ds", nullptr}}, + {}, + nullptr, + pool_.get()); ASSERT_TRUE(scanSpec->childByName("c0")->projectOut()); ASSERT_FALSE(scanSpec->childByName("ds")->projectOut()); } @@ -430,6 +456,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_prunedMapNonNullMapKey) { nullptr, {}, {}, + nullptr, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 5f806f22b08f..ec1d4e713cf7 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -108,6 +108,11 @@ struct TableParameter { "serialization.null.format"; }; +struct RowNumberColumnInfo { + column_index_t insertPosition; + std::string name; +}; + /** * Options for creating a RowReader. */ @@ -131,6 +136,8 @@ class RowReaderOptions { std::shared_ptr decodingExecutor_; size_t decodingParallelismFactor_{0}; bool appendRowNumberColumn_ = false; + std::optional rowNumberColumnInfo_ = std::nullopt; + // Function to populate metrics related to feature projection stats // in Koski. This gets fired in FlatMapColumnReader. // This is a bit of a hack as there is (by design) no good way @@ -329,18 +336,13 @@ class RowReaderOptions { decodingParallelismFactor_ = factor; } - /* - * Set to true, if you want to add a new column to the results containing the - * row numbers. These row numbers are relative to the beginning of file (0 as - * first row) and does not affected by filtering or deletion during the read - * (it always counts all rows in the file). - */ - void setAppendRowNumberColumn(bool value) { - appendRowNumberColumn_ = value; + void setRowNumberColumnInfo( + std::optional rowNumberColumnInfo) { + rowNumberColumnInfo_ = std::move(rowNumberColumnInfo); } - bool getAppendRowNumberColumn() const { - return appendRowNumberColumn_; + const std::optional& getRowNumberColumnInfo() const { + return rowNumberColumnInfo_; } void setKeySelectionCallback( diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index 4c79f56e3545..e8b82b08ddbc 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -148,4 +148,82 @@ VectorPtr RowReader::projectColumns( input->pool(), rowType, nullptr, size, std::move(children)); } +void RowReader::readWithRowNumber( + std::unique_ptr& columnReader, + const dwio::common::RowReaderOptions& options, + uint64_t previousRow, + uint64_t rowsToRead, + const dwio::common::Mutation* mutation, + VectorPtr& result) { + auto* rowVector = result->asUnchecked(); + column_index_t numChildren = 0; + for (auto& column : options.getScanSpec()->children()) { + if (column->projectOut()) { + ++numChildren; + } + } + VectorPtr rowNumVector; + auto rowNumberColumnInfo = options.getRowNumberColumnInfo(); + VELOX_CHECK_EQ(rowNumberColumnInfo.has_value(), true); + auto& rowNumberColumnIndex = rowNumberColumnInfo->insertPosition; + auto& rowNumberColumnName = rowNumberColumnInfo->name; + VELOX_CHECK_LE(rowNumberColumnIndex, numChildren); + if (rowVector->childrenSize() != numChildren) { + VELOX_CHECK_EQ(rowVector->childrenSize(), numChildren + 1); + rowNumVector = rowVector->childAt(rowNumberColumnIndex); + auto& rowType = rowVector->type()->asRow(); + auto names = rowType.names(); + auto types = rowType.children(); + auto children = rowVector->children(); + VELOX_DCHECK(!names.empty() && !types.empty() && !children.empty()); + names.erase(names.begin() + rowNumberColumnIndex); + types.erase(types.begin() + rowNumberColumnIndex); + children.erase(children.begin() + rowNumberColumnIndex); + result = std::make_shared( + rowVector->pool(), + ROW(std::move(names), std::move(types)), + rowVector->nulls(), + rowVector->size(), + std::move(children)); + } + columnReader->next(rowsToRead, result, mutation); + FlatVector* flatRowNum = nullptr; + if (rowNumVector && BaseVector::isVectorWritable(rowNumVector)) { + flatRowNum = rowNumVector->asFlatVector(); + } + if (flatRowNum) { + flatRowNum->clearAllNulls(); + flatRowNum->resize(result->size()); + } else { + rowNumVector = std::make_shared>( + result->pool(), + BIGINT(), + nullptr, + result->size(), + AlignedBuffer::allocate(result->size(), result->pool()), + std::vector()); + flatRowNum = rowNumVector->asUnchecked>(); + } + auto rowOffsets = columnReader->outputRows(); + VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); + auto* rawRowNum = flatRowNum->mutableRawValues(); + for (int i = 0; i < rowOffsets.size(); ++i) { + rawRowNum[i] = previousRow + rowOffsets[i]; + } + rowVector = result->asUnchecked(); + auto& rowType = rowVector->type()->asRow(); + auto names = rowType.names(); + auto types = rowType.children(); + auto children = rowVector->children(); + names.insert(names.begin() + rowNumberColumnIndex, rowNumberColumnName); + types.insert(types.begin() + rowNumberColumnIndex, BIGINT()); + children.insert(children.begin() + rowNumberColumnIndex, rowNumVector); + result = std::make_shared( + rowVector->pool(), + ROW(std::move(names), std::move(types)), + rowVector->nulls(), + rowVector->size(), + std::move(children)); +} + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Reader.h b/velox/dwio/common/Reader.h index 6aa564fbb11c..0c3da77d29ce 100644 --- a/velox/dwio/common/Reader.h +++ b/velox/dwio/common/Reader.h @@ -24,6 +24,7 @@ #include "velox/dwio/common/InputStream.h" #include "velox/dwio/common/Mutation.h" #include "velox/dwio/common/Options.h" +#include "velox/dwio/common/SelectiveColumnReader.h" #include "velox/dwio/common/Statistics.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/type/Type.h" @@ -146,6 +147,14 @@ class RowReader { const VectorPtr& input, const velox::common::ScanSpec& spec, const Mutation* mutation); + + static void readWithRowNumber( + std::unique_ptr& columnReader, + const dwio::common::RowReaderOptions& options, + uint64_t previousRow, + uint64_t rowsToRead, + const dwio::common::Mutation*, + VectorPtr& result); }; /** diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index 8ec122b1d5e4..aa96af53a2d9 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -36,7 +36,8 @@ add_executable( TestBufferedInput.cpp TypeTests.cpp UnitLoaderToolsTests.cpp - WriterTest.cpp) + WriterTest.cpp + OptionsTests.cpp) add_test(velox_dwio_common_test velox_dwio_common_test) target_link_libraries( velox_dwio_common_test diff --git a/velox/dwio/common/tests/OptionsTests.cpp b/velox/dwio/common/tests/OptionsTests.cpp index f46834cb9145..7de98abb8843 100644 --- a/velox/dwio/common/tests/OptionsTests.cpp +++ b/velox/dwio/common/tests/OptionsTests.cpp @@ -19,24 +19,35 @@ using namespace ::testing; using namespace facebook::velox::dwio::common; -TEST(OptionsTests, defaultAppendRowNumberColumnTest) { +TEST(OptionsTests, defaultRowNumberColumnInfoTest) { // appendRowNumberColumn flag should be false by default RowReaderOptions rowReaderOptions; - ASSERT_EQ(false, rowReaderOptions.getAppendRowNumberColumn()); + ASSERT_EQ(std::nullopt, rowReaderOptions.getRowNumberColumnInfo()); } -TEST(OptionsTests, setAppendRowNumberColumnToTrueTest) { +TEST(OptionsTests, setRowNumberColumnInfoTest) { RowReaderOptions rowReaderOptions; - rowReaderOptions.setAppendRowNumberColumn(true); - ASSERT_EQ(true, rowReaderOptions.getAppendRowNumberColumn()); + RowNumberColumnInfo rowNumberColumnInfo; + rowNumberColumnInfo.insertPosition = 0; + rowNumberColumnInfo.name = "test"; + rowReaderOptions.setRowNumberColumnInfo(rowNumberColumnInfo); + auto rowNumberColumn = rowReaderOptions.getRowNumberColumnInfo().value(); + ASSERT_EQ(rowNumberColumnInfo.insertPosition, rowNumberColumn.insertPosition); + ASSERT_EQ(rowNumberColumnInfo.name, rowNumberColumn.name); } -TEST(OptionsTests, testAppendRowNumberColumnInCopy) { +TEST(OptionsTests, testRowNumberColumnInfoInCopy) { RowReaderOptions rowReaderOptions; RowReaderOptions rowReaderOptionsCopy{rowReaderOptions}; - ASSERT_EQ(false, rowReaderOptionsCopy.getAppendRowNumberColumn()); + ASSERT_EQ(std::nullopt, rowReaderOptionsCopy.getRowNumberColumnInfo()); - rowReaderOptions.setAppendRowNumberColumn(true); + RowNumberColumnInfo rowNumberColumnInfo; + rowNumberColumnInfo.insertPosition = 0; + rowNumberColumnInfo.name = "test"; + rowReaderOptions.setRowNumberColumnInfo(rowNumberColumnInfo); RowReaderOptions rowReaderOptionsSecondCopy{rowReaderOptions}; - ASSERT_EQ(true, rowReaderOptionsSecondCopy.getAppendRowNumberColumn()); + auto rowNumberColumn = + rowReaderOptionsSecondCopy.getRowNumberColumnInfo().value(); + ASSERT_EQ(rowNumberColumnInfo.insertPosition, rowNumberColumn.insertPosition); + ASSERT_EQ(rowNumberColumnInfo.name, rowNumberColumn.name); } diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 5899981be559..83c1258014bf 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -501,11 +501,17 @@ void DwrfRowReader::readNext( } return; } - if (!options_.getAppendRowNumberColumn()) { + if (!options_.getRowNumberColumnInfo().has_value()) { getSelectiveColumnReader()->next(rowsToRead, result, mutation); return; } - readWithRowNumber(rowsToRead, mutation, result); + readWithRowNumber( + getSelectiveColumnReader(), + options_, + previousRow_, + rowsToRead, + mutation, + result); } uint64_t DwrfRowReader::skip(uint64_t numValues) { @@ -516,76 +522,6 @@ uint64_t DwrfRowReader::skip(uint64_t numValues) { } } -void DwrfRowReader::readWithRowNumber( - uint64_t rowsToRead, - const dwio::common::Mutation* mutation, - VectorPtr& result) { - auto* rowVector = result->asUnchecked(); - column_index_t numChildren = 0; - for (auto& column : options_.getScanSpec()->children()) { - if (column->projectOut()) { - ++numChildren; - } - } - VectorPtr rowNumVector; - if (rowVector->childrenSize() != numChildren) { - VELOX_CHECK_EQ(rowVector->childrenSize(), numChildren + 1); - rowNumVector = rowVector->childAt(numChildren); - auto& rowType = rowVector->type()->asRow(); - auto names = rowType.names(); - auto types = rowType.children(); - auto children = rowVector->children(); - VELOX_DCHECK(!names.empty() && !types.empty() && !children.empty()); - names.pop_back(); - types.pop_back(); - children.pop_back(); - result = std::make_shared( - rowVector->pool(), - ROW(std::move(names), std::move(types)), - rowVector->nulls(), - rowVector->size(), - std::move(children)); - } - getSelectiveColumnReader()->next(rowsToRead, result, mutation); - FlatVector* flatRowNum = nullptr; - if (rowNumVector && BaseVector::isVectorWritable(rowNumVector)) { - flatRowNum = rowNumVector->asFlatVector(); - } - if (flatRowNum) { - flatRowNum->clearAllNulls(); - flatRowNum->resize(result->size()); - } else { - rowNumVector = std::make_shared>( - result->pool(), - BIGINT(), - nullptr, - result->size(), - AlignedBuffer::allocate(result->size(), result->pool()), - std::vector()); - flatRowNum = rowNumVector->asUnchecked>(); - } - auto rowOffsets = getSelectiveColumnReader()->outputRows(); - VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); - auto* rawRowNum = flatRowNum->mutableRawValues(); - for (int i = 0; i < rowOffsets.size(); ++i) { - rawRowNum[i] = previousRow_ + rowOffsets[i]; - } - rowVector = result->asUnchecked(); - auto& rowType = rowVector->type()->asRow(); - auto names = rowType.names(); - auto types = rowType.children(); - auto children = rowVector->children(); - names.emplace_back(); - types.push_back(BIGINT()); - children.push_back(rowNumVector); - result = std::make_shared( - rowVector->pool(), - ROW(std::move(names), std::move(types)), - rowVector->nulls(), - rowVector->size(), - std::move(children)); -} - int64_t DwrfRowReader::nextRowNumber() { if (nextRowNumber_.has_value()) { return *nextRowNumber_; diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index 549746776fa1..9786f804871c 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -187,11 +187,6 @@ class DwrfRowReader : public StrideIndexProvider, const dwio::common::Mutation*, VectorPtr& result); - void readWithRowNumber( - uint64_t rowsToRead, - const dwio::common::Mutation*, - VectorPtr& result); - uint64_t skip(uint64_t numValues); std::unique_ptr& getColumnReader(); diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index 795165e363c3..98c669ddfd4b 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -2226,7 +2226,7 @@ createWriterReader( } // namespace -TEST_F(TestReader, appendRowNumberColumn) { +TEST_F(TestReader, setRowNumberColumnInfo) { std::vector> integerValues{ {0, 1, 2, 3, 4}, {5, 6, 7}, @@ -2242,7 +2242,10 @@ TEST_F(TestReader, appendRowNumberColumn) { spec->addAllChildFields(*schema); RowReaderOptions rowReaderOpts; rowReaderOpts.setScanSpec(spec); - rowReaderOpts.setAppendRowNumberColumn(true); + RowNumberColumnInfo rowNumberColumnInfo; + rowNumberColumnInfo.insertPosition = 1; + rowNumberColumnInfo.name = ""; + rowReaderOpts.setRowNumberColumnInfo(rowNumberColumnInfo); { SCOPED_TRACE("Selective no filter"); auto rowReader = reader->createRowReader(rowReaderOpts); @@ -2268,7 +2271,10 @@ TEST_F(TestReader, reuseRowNumberColumn) { spec->addAllChildFields(*schema); RowReaderOptions rowReaderOpts; rowReaderOpts.setScanSpec(spec); - rowReaderOpts.setAppendRowNumberColumn(true); + RowNumberColumnInfo rowNumberColumnInfo; + rowNumberColumnInfo.insertPosition = 1; + rowNumberColumnInfo.name = ""; + rowReaderOpts.setRowNumberColumnInfo(rowNumberColumnInfo); { SCOPED_TRACE("Reuse passed in"); auto rowReader = reader->createRowReader(rowReaderOpts); diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 2c4b531a706e..eb4c88c81ba3 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -751,7 +751,7 @@ class ParquetRowReader::Impl { "Input Table Schema (with partition columns): {}\n", readerBase_->bufferedInput().getReadFile()->getName(), readerBase_->schema()->toString(), - requestedType_->toString()); + requestedType_->type()->toString()); return exceptionMessageContext; }; @@ -762,8 +762,9 @@ class ParquetRowReader::Impl { pool_, columnReaderStats_, readerBase_->fileMetaData()); auto columnSelector = std::make_shared( ColumnSelector::apply(options_.getSelector(), readerBase_->schema())); + requestedType_ = columnSelector->getSchemaWithId(); columnReader_ = ParquetColumnReader::build( - columnSelector->getSchemaWithId(), + requestedType_, readerBase_->schemaWithId(), // Id is schema id params, *options_.getScanSpec()); @@ -834,13 +835,23 @@ class ParquetRowReader::Impl { uint64_t size, velox::VectorPtr& result, const dwio::common::Mutation* mutation) { - VELOX_DCHECK(!options_.getAppendRowNumberColumn()); auto rowsToRead = nextReadSize(size); if (rowsToRead == kAtEnd) { return 0; } VELOX_DCHECK_GT(rowsToRead, 0); - columnReader_->next(rowsToRead, result, mutation); + if (!options_.getRowNumberColumnInfo().has_value()) { + columnReader_->next(rowsToRead, result, mutation); + } else { + readWithRowNumber( + columnReader_, + options_, + nextRowNumber(), + rowsToRead, + mutation, + result); + } + currentRowInGroup_ += rowsToRead; return rowsToRead; } @@ -900,7 +911,7 @@ class ParquetRowReader::Impl { std::unique_ptr columnReader_; - RowTypePtr requestedType_; + std::shared_ptr requestedType_; dwio::common::ColumnReaderStatistics columnReaderStats_; }; diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index eca887eab155..92683783bd8f 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -43,6 +43,7 @@ StructColumnReader::StructColumnReader( requestedType_->childByName(childSpec->fieldName()); addChild(ParquetColumnReader::build( childRequestedType, childFileType, params, *childSpec)); + childSpecs[i]->setSubscript(children_.size() - 1); } auto type = reinterpret_cast(fileType_.get()); diff --git a/velox/dwio/parquet/tests/examples/sample_with_rowindex.parquet b/velox/dwio/parquet/tests/examples/sample_with_rowindex.parquet new file mode 100644 index 000000000000..2f7636639f1d Binary files /dev/null and b/velox/dwio/parquet/tests/examples/sample_with_rowindex.parquet differ diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 7b2d1e24c4d7..ab832dbbc74a 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -56,6 +56,18 @@ class ParquetTableScanTest : public HiveConnectorTestBase { assertQuery(plan, splits_, sql); } + void assertSelectWithAssignments( + std::vector&& outputColumnNames, + std::unordered_map>& + assignments, + const std::string& sql) { + auto rowType = getRowType(std::move(outputColumnNames)); + auto plan = PlanBuilder() + .tableScan(rowType, {}, "", nullptr, assignments) + .planNode(); + assertQuery(plan, splits_, sql); + } + void assertSelectWithFilter( std::vector&& outputColumnNames, const std::vector& subfieldFilters, @@ -131,8 +143,10 @@ class ParquetTableScanTest : public HiveConnectorTestBase { std::shared_ptr makeSplit( const std::string& filePath) { - return makeHiveConnectorSplits( + auto split = makeHiveConnectorSplits( filePath, 1, dwio::common::FileFormat::PARQUET)[0]; + + return split; } private: @@ -469,6 +483,63 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) { result.second, {makeRowVector({"a"}, {makeFlatVector({0, 1})})}); } +TEST_F(ParquetTableScanTest, rowIndex) { + // case 1: file not have `_tmp_metadata_row_index`, scan generate it for user. + loadData( + getExampleFilePath("sample.parquet"), + ROW({"a", "b", "_tmp_metadata_row_index"}, + {BIGINT(), DOUBLE(), BIGINT()}), + makeRowVector( + {"a", "b", "_tmp_metadata_row_index"}, + { + makeFlatVector(20, [](auto row) { return row + 1; }), + makeFlatVector(20, [](auto row) { return row + 1; }), + makeFlatVector(20, [](auto row) { return row; }), + })); + std::unordered_map> + assignments; + assignments["a"] = std::make_shared( + "a", + connector::hive::HiveColumnHandle::ColumnType::kRegular, + BIGINT(), + BIGINT()); + assignments["b"] = std::make_shared( + "b", + connector::hive::HiveColumnHandle::ColumnType::kRegular, + DOUBLE(), + DOUBLE()); + assignments["_tmp_metadata_row_index"] = + std::make_shared( + "_tmp_metadata_row_index", + connector::hive::HiveColumnHandle::ColumnType::kRowIndex, + BIGINT(), + BIGINT()); + + assertSelect({"a"}, "SELECT a FROM tmp"); + assertSelectWithAssignments( + {"a", "_tmp_metadata_row_index"}, + assignments, + "SELECT a, _tmp_metadata_row_index FROM tmp"); + // case 2: file has `_tmp_metadata_row_index` column, then use user data + // insteads of generating it. + loadData( + getExampleFilePath("sample_with_rowindex.parquet"), + ROW({"a", "b", "_tmp_metadata_row_index"}, + {BIGINT(), DOUBLE(), BIGINT()}), + makeRowVector( + {"a", "b", "_tmp_metadata_row_index"}, + { + makeFlatVector(20, [](auto row) { return row + 1; }), + makeFlatVector(20, [](auto row) { return row + 1; }), + makeFlatVector(20, [](auto row) { return row + 1; }), + })); + + assertSelect({"a"}, "SELECT a FROM tmp"); + assertSelect( + {"a", "_tmp_metadata_row_index"}, + "SELECT a, _tmp_metadata_row_index FROM tmp"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::Init init{&argc, &argv, false}; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 73d7828c7194..fc575961daf8 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -82,9 +82,13 @@ PlanBuilder& PlanBuilder::tableScan( const RowTypePtr& outputType, const std::vector& subfieldFilters, const std::string& remainingFilter, - const RowTypePtr& dataColumns) { + const RowTypePtr& dataColumns, + const std::unordered_map< + std::string, + std::shared_ptr>& assignments) { return TableScanBuilder(*this) .outputType(outputType) + .assignments(assignments) .subfieldFilters(subfieldFilters) .remainingFilter(remainingFilter) .dataColumns(dataColumns) diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 0c6928216a1f..678da6f77ca6 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -111,11 +111,15 @@ class PlanBuilder { /// types (for all columns) in this argument as opposed to 'outputType', where /// you define the output types only. See 'missingColumns' test in /// 'TableScanTest'. + /// @param assignments Optional ColumnHandles. PlanBuilder& tableScan( const RowTypePtr& outputType, const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", - const RowTypePtr& dataColumns = nullptr); + const RowTypePtr& dataColumns = nullptr, + const std::unordered_map< + std::string, + std::shared_ptr>& assignments = {}); /// Add a TableScanNode to scan a Hive table. ///