diff --git a/velox/common/base/RawVector.cpp b/velox/common/base/RawVector.cpp index 6dc1c048d57f..91ae094b8249 100644 --- a/velox/common/base/RawVector.cpp +++ b/velox/common/base/RawVector.cpp @@ -21,10 +21,11 @@ namespace facebook::velox { namespace { -std::vector iotaData; +raw_vector iotaData; bool initializeIota() { iotaData.resize(10000); + iotaData.resize(iotaData.capacity()); std::iota(iotaData.begin(), iotaData.end(), 0); return true; } diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index c86c9ef9c2da..9af4d7ef357a 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -36,6 +36,12 @@ struct HiveBucketConversion { std::vector> bucketColumnHandles; }; +struct RowIdProperties { + int64_t metadataVersion; + int64_t partitionId; + std::string tableGuid; +}; + struct HiveConnectorSplit : public connector::ConnectorSplit { const std::string filePath; dwio::common::FileFormat fileFormat; @@ -62,6 +68,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { /// the file handle. std::optional properties; + std::optional rowIdProperties; + HiveConnectorSplit( const std::string& connectorId, const std::string& _filePath, diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index ec087c8657b0..22a7052e6f14 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -260,10 +260,10 @@ inline bool isSynthesizedColumn( return infoColumns.count(name) != 0; } -inline bool isRowIndexColumn( +bool isSpecialColumn( const std::string& name, - std::shared_ptr rowIndexColumn) { - return rowIndexColumn != nullptr && rowIndexColumn->name() == name; + const std::optional& specialName) { + return specialName.has_value() && name == *specialName; } } // namespace @@ -368,7 +368,7 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, - const std::shared_ptr& rowIndexColumn, + const SpecialColumnNames& specialColumns, memory::MemoryPool* pool) { auto spec = std::make_shared("root"); folly::F14FastMap> @@ -377,8 +377,9 @@ std::shared_ptr makeScanSpec( for (auto& [subfield, _] : filters) { if (auto name = subfield.toString(); !isSynthesizedColumn(name, infoColumns) && - !isRowIndexColumn(name, rowIndexColumn) && partitionKeys.count(name) == 0) { + VELOX_CHECK(!isSpecialColumn(name, specialColumns.rowIndex)); + VELOX_CHECK(!isSpecialColumn(name, specialColumns.rowId)); filterSubfields[getColumnName(subfield)].push_back(&subfield); } } @@ -387,13 +388,24 @@ std::shared_ptr makeScanSpec( for (int i = 0; i < rowType->size(); ++i) { auto& name = rowType->nameOf(i); auto& type = rowType->childAt(i); + if (isSpecialColumn(name, specialColumns.rowIndex)) { + VELOX_CHECK(type->isBigint()); + auto* fieldSpec = spec->addField(name, i); + fieldSpec->setColumnType(common::ScanSpec::ColumnType::kRowIndex); + continue; + } + if (isSpecialColumn(name, specialColumns.rowId)) { + VELOX_CHECK(type->isRow() && type->size() == 5); + auto& rowIdType = type->asRow(); + auto* fieldSpec = spec->addFieldRecursively(name, rowIdType, i); + fieldSpec->setColumnType(common::ScanSpec::ColumnType::kComposite); + fieldSpec->childByName(rowIdType.nameOf(0)) + ->setColumnType(common::ScanSpec::ColumnType::kRowIndex); + continue; + } auto it = outputSubfields.find(name); if (it == outputSubfields.end()) { auto* fieldSpec = spec->addFieldRecursively(name, *type, i); - if (isRowIndexColumn(name, rowIndexColumn)) { - VELOX_CHECK(type->isBigint()); - fieldSpec->setExplicitRowNumber(true); - } processFieldSpec(dataColumns, type, *fieldSpec); filterSubfields.erase(name); continue; @@ -409,12 +421,6 @@ std::shared_ptr makeScanSpec( filterSubfields.erase(it); } auto* fieldSpec = spec->addField(name, i); - if (isRowIndexColumn(name, rowIndexColumn)) { - VELOX_CHECK(type->isBigint()); - // Set the flag for the case that the row index column only exists in - // remaining filters. - fieldSpec->setExplicitRowNumber(true); - } addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec); processFieldSpec(dataColumns, type, *fieldSpec); subfieldSpecs.clear(); @@ -448,7 +454,6 @@ std::shared_ptr makeScanSpec( if (isSynthesizedColumn(name, infoColumns)) { continue; } - VELOX_CHECK(!isRowIndexColumn(name, rowIndexColumn)); auto fieldSpec = spec->getOrCreateChild(pair.first); fieldSpec->addFilter(*pair.second); } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 84a2dc67782d..e6480a78165d 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -44,6 +44,11 @@ void checkColumnNameLowerCase( void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr); +struct SpecialColumnNames { + std::optional rowIndex; + std::optional rowId; +}; + std::shared_ptr makeScanSpec( const RowTypePtr& rowType, const folly::F14FastMap>& @@ -54,7 +59,7 @@ std::shared_ptr makeScanSpec( partitionKeys, const std::unordered_map>& infoColumns, - const std::shared_ptr& rowIndexColumn, + const SpecialColumnNames& specialColumns, memory::MemoryPool* pool); void configureReaderOptions( diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 546e6cd9a4cb..bd930bbffd4f 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -22,7 +22,6 @@ #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConfig.h" -#include "velox/connectors/hive/HiveConnectorUtil.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/FieldReference.h" @@ -81,18 +80,21 @@ HiveDataSource::HiveDataSource( handle, "ColumnHandle must be an instance of HiveColumnHandle for {}", canonicalizedName); - - if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) { - partitionKeys_.emplace(handle->name(), handle); - } - - if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) { - infoColumns_.emplace(handle->name(), handle); - } - - if (handle->columnType() == HiveColumnHandle::ColumnType::kRowIndex) { - VELOX_CHECK_NULL(rowIndexColumn_); - rowIndexColumn_ = handle; + switch (handle->columnType()) { + case HiveColumnHandle::ColumnType::kRegular: + break; + case HiveColumnHandle::ColumnType::kPartitionKey: + partitionKeys_.emplace(handle->name(), handle); + break; + case HiveColumnHandle::ColumnType::kSynthesized: + infoColumns_.emplace(handle->name(), handle); + break; + case HiveColumnHandle::ColumnType::kRowIndex: + specialColumns_.rowIndex = handle->name(); + break; + case HiveColumnHandle::ColumnType::kRowId: + specialColumns_.rowId = handle->name(); + break; } } @@ -192,7 +194,7 @@ HiveDataSource::HiveDataSource( hiveTableHandle_->dataColumns(), partitionKeys_, infoColumns_, - rowIndexColumn_, + specialColumns_, pool_); if (remainingFilter) { metadataFilter_ = std::make_shared( @@ -257,7 +259,7 @@ std::unique_ptr HiveDataSource::setupBucketConversion() { hiveTableHandle_->dataColumns(), partitionKeys_, infoColumns_, - rowIndexColumn_, + specialColumns_, pool_); newScanSpec->moveAdaptationFrom(*scanSpec_); scanSpec_ = std::move(newScanSpec); @@ -266,6 +268,30 @@ std::unique_ptr HiveDataSource::setupBucketConversion() { split_->bucketConversion->tableBucketCount, std::move(bucketChannels)); } +void HiveDataSource::setupRowIdColumn() { + VELOX_CHECK(split_->rowIdProperties.has_value()); + const auto& props = *split_->rowIdProperties; + auto* rowId = scanSpec_->childByName(*specialColumns_.rowId); + VELOX_CHECK_NOT_NULL(rowId); + auto& rowIdType = + readerOutputType_->findChild(*specialColumns_.rowId)->asRow(); + auto rowGroupId = split_->getFileName(); + rowId->childByName(rowIdType.nameOf(1)) + ->setConstantValue( + StringView(rowGroupId), VARCHAR(), connectorQueryCtx_->memoryPool()); + rowId->childByName(rowIdType.nameOf(2)) + ->setConstantValue( + props.metadataVersion, BIGINT(), connectorQueryCtx_->memoryPool()); + rowId->childByName(rowIdType.nameOf(3)) + ->setConstantValue( + props.partitionId, BIGINT(), connectorQueryCtx_->memoryPool()); + rowId->childByName(rowIdType.nameOf(4)) + ->setConstantValue( + StringView(props.tableGuid), + VARCHAR(), + connectorQueryCtx_->memoryPool()); +} + void HiveDataSource::addSplit(std::shared_ptr split) { VELOX_CHECK_NULL( split_, @@ -284,12 +310,15 @@ void HiveDataSource::addSplit(std::shared_ptr split) { } else { partitionFunction_.reset(); } + if (specialColumns_.rowId.has_value()) { + setupRowIdColumn(); + } splitReader_ = createSplitReader(); // Split reader subclasses may need to use the reader options in prepareSplit // so we initialize it beforehand. splitReader_->configureReaderOptions(randomSkip_); - splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_); + splitReader_->prepareSplit(metadataFilter_, runtimeStats_); } vector_size_t HiveDataSource::applyBucketConversion( diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index b7ca9bcb6344..a870966603ca 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -20,6 +20,7 @@ #include "velox/connectors/Connector.h" #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/HiveConnectorUtil.h" #include "velox/connectors/hive/HivePartitionFunction.h" #include "velox/connectors/hive/SplitReader.h" #include "velox/connectors/hive/TableHandle.h" @@ -125,7 +126,6 @@ class HiveDataSource : public DataSource { partitionKeys_; std::shared_ptr ioStats_; - std::shared_ptr rowIndexColumn_; private: std::unique_ptr setupBucketConversion(); @@ -133,6 +133,8 @@ class HiveDataSource : public DataSource { const RowVectorPtr& rowVector, BufferPtr& indices); + void setupRowIdColumn(); + // Evaluates remainingFilter_ on the specified vector. Returns number of rows // passed. Populates filterEvalCtx_.selectedIndices and selectedBits if only // some rows passed the filter. If none or all rows passed @@ -157,6 +159,7 @@ class HiveDataSource : public DataSource { // Column handles for the Split info columns keyed on their column names. std::unordered_map> infoColumns_; + SpecialColumnNames specialColumns_{}; folly::F14FastMap> subfields_; SubfieldFilters filters_; diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index 31b596467817..bfa9e52828c4 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -145,9 +145,8 @@ void SplitReader::configureReaderOptions( void SplitReader::prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn) { - createReader(std::move(metadataFilter), rowIndexColumn); + dwio::common::RuntimeStatistics& runtimeStats) { + createReader(std::move(metadataFilter)); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); @@ -222,8 +221,7 @@ std::string SplitReader::toString() const { } void SplitReader::createReader( - std::shared_ptr metadataFilter, - const std::shared_ptr& rowIndexColumn) { + std::shared_ptr metadataFilter) { VELOX_CHECK_NE( baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN); @@ -264,10 +262,6 @@ void SplitReader::createReader( auto& fileType = baseReader_->rowType(); auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema()); auto columnNames = fileType->names(); - if (rowIndexColumn != nullptr) { - bool isExplicit = scanSpec_->childByName(rowIndexColumn->name()) != nullptr; - setRowIndexColumn(rowIndexColumn, isExplicit); - } configureRowReaderOptions( hiveTableHandle_->tableParameters(), scanSpec_, @@ -312,17 +306,6 @@ void SplitReader::createRowReader() { baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_); } -void SplitReader::setRowIndexColumn( - const std::shared_ptr& rowIndexColumn, - bool isExplicit) { - dwio::common::RowNumberColumnInfo rowNumberColumnInfo; - rowNumberColumnInfo.insertPosition = - readerOutputType_->getChildIdx(rowIndexColumn->name()); - rowNumberColumnInfo.name = rowIndexColumn->name(); - rowNumberColumnInfo.isExplicit = isExplicit; - baseRowReaderOpts_.setRowNumberColumnInfo(std::move(rowNumberColumnInfo)); -} - std::vector SplitReader::adaptColumns( const RowTypePtr& fileType, const std::shared_ptr& tableSchema) { @@ -350,7 +333,8 @@ std::vector SplitReader::adaptColumns( connectorQueryCtx_->memoryPool(), connectorQueryCtx_->sessionTimezone()); childSpec->setConstantValue(constant); - } else if (!childSpec->isExplicitRowNumber()) { + } else if ( + childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) { auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName); if (!fileTypeIdx.has_value()) { // Column is missing. Most likely due to schema evolution. diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 80231d4d563d..b8ab6e10fd04 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -77,8 +77,7 @@ class SplitReader { /// would be called only once per incoming split virtual void prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn); + dwio::common::RuntimeStatistics& runtimeStats); virtual uint64_t next(uint64_t size, VectorPtr& output); @@ -114,9 +113,7 @@ class SplitReader { /// Create the dwio::common::Reader object baseReader_, which will be used to /// read the data file's metadata and schema - void createReader( - std::shared_ptr metadataFilter, - const std::shared_ptr& rowIndexColumn); + void createReader(std::shared_ptr metadataFilter); /// Check if the hiveSplit_ is empty. The split is considered empty when /// 1) The data file is missing but the user chooses to ignore it @@ -136,10 +133,6 @@ class SplitReader { const RowTypePtr& fileType, const std::shared_ptr& tableSchema); - void setRowIndexColumn( - const std::shared_ptr& rowIndexColumn, - bool isExplicit); - void setPartitionValue( common::ScanSpec* spec, const std::string& partitionKey, diff --git a/velox/connectors/hive/TableHandle.h b/velox/connectors/hive/TableHandle.h index cfc9295bd405..9a02baa18d6f 100644 --- a/velox/connectors/hive/TableHandle.h +++ b/velox/connectors/hive/TableHandle.h @@ -34,7 +34,8 @@ class HiveColumnHandle : public ColumnHandle { kSynthesized, /// A zero-based row number of type BIGINT auto-generated by the connector. /// Rows numbers are unique within a single file only. - kRowIndex + kRowIndex, + kRowId, }; /// NOTE: 'dataType' is the column type in target write table. 'hiveType' is diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index b7c1f6b52340..1923837d112c 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -54,9 +54,8 @@ IcebergSplitReader::IcebergSplitReader( void IcebergSplitReader::prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn) { - createReader(std::move(metadataFilter), rowIndexColumn); + dwio::common::RuntimeStatistics& runtimeStats) { + createReader(std::move(metadataFilter)); if (checkIfSplitIsEmpty(runtimeStats)) { VELOX_CHECK(emptySplit_); diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.h b/velox/connectors/hive/iceberg/IcebergSplitReader.h index b5ab7da64480..5f1196038f96 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.h +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.h @@ -43,8 +43,7 @@ class IcebergSplitReader : public SplitReader { void prepareSplit( std::shared_ptr metadataFilter, - dwio::common::RuntimeStatistics& runtimeStats, - const std::shared_ptr& rowIndexColumn) override; + dwio::common::RuntimeStatistics& runtimeStats) override; uint64_t next(uint64_t size, VectorPtr& output) override; diff --git a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp index e0b2a6c31f85..e623a2ec9d41 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp @@ -346,7 +346,7 @@ void IcebergSplitReaderBenchmark::readSingleColumn( std::shared_ptr randomSkip; icebergSplitReader->configureReaderOptions(randomSkip); - icebergSplitReader->prepareSplit(nullptr, runtimeStats_, nullptr); + icebergSplitReader->prepareSplit(nullptr, runtimeStats_); // Filter range is generated from a small sample data of 4096 rows. So the // upperBound and lowerBound are introduced to estimate the result size. diff --git a/velox/connectors/hive/tests/HiveConnectorTest.cpp b/velox/connectors/hive/tests/HiveConnectorTest.cpp index 8a64513b85ec..126215b9d312 100644 --- a/velox/connectors/hive/tests/HiveConnectorTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorTest.cpp @@ -92,14 +92,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) { auto rowType = ROW({{"c0", columnType}}); auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"}); auto scanSpec = makeScanSpec( - rowType, - groupSubfields(subfields), - {}, - nullptr, - {}, - {}, - nullptr, - pool_.get()); + rowType, groupSubfields(subfields), {}, nullptr, {}, {}, {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); validateNullConstant(*c0c0, *BIGINT()); auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1"); @@ -135,7 +128,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0"); ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant()); @@ -159,7 +152,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->maxArrayElementsCount(), 2); @@ -178,7 +171,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) { auto groupedSubfields = groupSubfields(subfields); VELOX_ASSERT_USER_THROW( makeScanSpec( - rowType, groupedSubfields, {}, nullptr, {}, {}, nullptr, pool_.get()), + rowType, groupedSubfields, {}, nullptr, {}, {}, {}, pool_.get()), "Non-positive array subscript cannot be push down"); } @@ -194,7 +187,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ( @@ -226,7 +219,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_TRUE(c0->flatMapFeatureSelection().empty()); @@ -247,7 +240,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_TRUE(mapKeyIsNotNull(*c0)); @@ -271,7 +264,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -300,7 +293,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -320,7 +313,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -337,7 +330,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); keysFilter = scanSpec->childByName("c0") ->childByName(ScanSpec::kMapKeysFieldName) @@ -378,7 +371,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_onlyInFilters) { ROW({{"c0", c0Type}, {"c1", c1Type}}), {}, {}, - nullptr, + {}, pool_.get()); auto c0 = scanSpec->childByName("c0"); @@ -459,7 +452,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -473,14 +466,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) { SubfieldFilters filters; filters.emplace(Subfield("ds"), exec::equal("2023-10-13")); auto scanSpec = makeScanSpec( - rowType, - {}, - filters, - rowType, - {{"ds", nullptr}}, - {}, - nullptr, - pool_.get()); + rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, {}, pool_.get()); ASSERT_TRUE(scanSpec->childByName("c0")->projectOut()); ASSERT_FALSE(scanSpec->childByName("ds")->projectOut()); } @@ -498,7 +484,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_prunedMapNonNullMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); auto* c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); @@ -513,7 +499,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_prunedMapNonNullMapKey) { nullptr, {}, {}, - nullptr, + {}, pool_.get()); c0 = scanSpec->childByName("c0"); ASSERT_EQ(c0->children().size(), 2); diff --git a/velox/dwio/common/ColumnSelector.cpp b/velox/dwio/common/ColumnSelector.cpp index 228da970af4c..deddf43c644b 100644 --- a/velox/dwio/common/ColumnSelector.cpp +++ b/velox/dwio/common/ColumnSelector.cpp @@ -344,7 +344,7 @@ std::shared_ptr ColumnSelector::fromScanSpec( const RowTypePtr& rowType) { std::vector columnNames; for (auto& child : spec.children()) { - if (child->isConstant() || child->isExplicitRowNumber()) { + if (!child->readFromFile()) { continue; } std::string name = child->fieldName(); diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 8ba90fa9ac2a..569d30ffd77f 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -109,14 +109,12 @@ struct TableParameter { "serialization.null.format"; }; +/// Implicit row number column to be added. This column will be removed in the +/// output of split reader. Should use the ScanSpec::ColumnType::kRowIndex if +/// the column is suppose to be explicit and kept in the output. struct RowNumberColumnInfo { column_index_t insertPosition; std::string name; - // This flag is used to distinguish the explicit and implicit use cases. In - // explicit case, row index column is declared in the output type or used in - // subfield filters or remaining filter. In implicit case, it's not declared - // in the output columns but only in the split reader. - bool isExplicit; }; class FormatSpecificOptions { diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index 1beec1b02491..3fd7241ecb9e 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -151,7 +151,6 @@ VectorPtr RowReader::projectColumns( namespace { void fillRowNumberVector( VectorPtr& rowNumVector, - bool contiguousRowNumbers, uint64_t previousRow, uint64_t rowsToRead, const dwio::common::SelectiveColumnReader* columnReader, @@ -174,15 +173,10 @@ void fillRowNumberVector( flatRowNum = rowNumVector->asUnchecked>(); } auto* rawRowNum = flatRowNum->mutableRawValues(); - if (contiguousRowNumbers) { - VELOX_DCHECK_EQ(rowsToRead, result->size()); - std::iota(rawRowNum, rawRowNum + rowsToRead, previousRow); - } else { - const auto rowOffsets = columnReader->outputRows(); - VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); - for (int i = 0; i < rowOffsets.size(); ++i) { - rawRowNum[i] = previousRow + rowOffsets[i]; - } + const auto rowOffsets = columnReader->outputRows(); + VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); + for (int i = 0; i < rowOffsets.size(); ++i) { + rawRowNum[i] = previousRow + rowOffsets[i]; } } } // namespace @@ -199,64 +193,25 @@ void RowReader::readWithRowNumber( const auto rowNumberColumnIndex = rowNumberColumnInfo->insertPosition; const auto& rowNumberColumnName = rowNumberColumnInfo->name; column_index_t numChildren{0}; - column_index_t numNotReadFromFileChildren{0}; for (auto& column : options.scanSpec()->children()) { if (column->projectOut()) { ++numChildren; - if (column->isConstant() || column->isExplicitRowNumber()) { - ++numNotReadFromFileChildren; - } } } VELOX_CHECK_LE(rowNumberColumnIndex, numChildren); - const bool contiguousRowNumbers = - (numChildren == numNotReadFromFileChildren) && !hasDeletion(mutation); - if (rowNumberColumnInfo->isExplicit) { - columnReader->next(rowsToRead, result, mutation); - fillRowNumberVector( - result->asUnchecked()->childAt(rowNumberColumnIndex), - contiguousRowNumbers, - previousRow, - rowsToRead, - columnReader.get(), - result); - } else { - auto* rowVector = result->asUnchecked(); - VectorPtr rowNumVector; - if (rowVector->childrenSize() != numChildren) { - VELOX_CHECK_EQ(rowVector->childrenSize(), numChildren + 1); - rowNumVector = rowVector->childAt(rowNumberColumnIndex); - const auto& rowType = rowVector->type()->asRow(); - auto names = rowType.names(); - auto types = rowType.children(); - auto children = rowVector->children(); - VELOX_DCHECK(!names.empty() && !types.empty() && !children.empty()); - names.erase(names.begin() + rowNumberColumnIndex); - types.erase(types.begin() + rowNumberColumnIndex); - children.erase(children.begin() + rowNumberColumnIndex); - result = std::make_shared( - rowVector->pool(), - ROW(std::move(names), std::move(types)), - rowVector->nulls(), - rowVector->size(), - std::move(children)); - } - columnReader->next(rowsToRead, result, mutation); - fillRowNumberVector( - rowNumVector, - contiguousRowNumbers, - previousRow, - rowsToRead, - columnReader.get(), - result); - rowVector = result->asUnchecked(); - auto& rowType = rowVector->type()->asRow(); + auto* rowVector = result->asUnchecked(); + VectorPtr rowNumVector; + if (rowVector->childrenSize() != numChildren) { + VELOX_CHECK_EQ(rowVector->childrenSize(), numChildren + 1); + rowNumVector = rowVector->childAt(rowNumberColumnIndex); + const auto& rowType = rowVector->type()->asRow(); auto names = rowType.names(); auto types = rowType.children(); auto children = rowVector->children(); - names.insert(names.begin() + rowNumberColumnIndex, rowNumberColumnName); - types.insert(types.begin() + rowNumberColumnIndex, BIGINT()); - children.insert(children.begin() + rowNumberColumnIndex, rowNumVector); + VELOX_DCHECK(!names.empty() && !types.empty() && !children.empty()); + names.erase(names.begin() + rowNumberColumnIndex); + types.erase(types.begin() + rowNumberColumnIndex); + children.erase(children.begin() + rowNumberColumnIndex); result = std::make_shared( rowVector->pool(), ROW(std::move(names), std::move(types)), @@ -264,6 +219,23 @@ void RowReader::readWithRowNumber( rowVector->size(), std::move(children)); } + columnReader->next(rowsToRead, result, mutation); + fillRowNumberVector( + rowNumVector, previousRow, rowsToRead, columnReader.get(), result); + rowVector = result->asUnchecked(); + auto& rowType = rowVector->type()->asRow(); + auto names = rowType.names(); + auto types = rowType.children(); + auto children = rowVector->children(); + names.insert(names.begin() + rowNumberColumnIndex, rowNumberColumnName); + types.insert(types.begin() + rowNumberColumnIndex, BIGINT()); + children.insert(children.begin() + rowNumberColumnIndex, rowNumVector); + result = std::make_shared( + rowVector->pool(), + ROW(std::move(names), std::move(types)), + rowVector->nulls(), + rowVector->size(), + std::move(children)); } } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index 2639b6c354a6..25bbc543045f 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -22,6 +22,7 @@ #include "velox/type/Subfield.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" +#include "velox/vector/ConstantVector.h" #include "velox/vector/LazyVector.h" #include @@ -39,6 +40,12 @@ namespace common { // mutable by readers to reflect filter order and other adaptation. class ScanSpec { public: + enum class ColumnType : int8_t { + kRegular, // Read from file or constant + kRowIndex, // Row number in the file starting from 0 + kComposite, // A struct with all children not read from file + }; + static constexpr column_index_t kNoChannel = ~0; static constexpr const char* kMapKeysFieldName = "keys"; static constexpr const char* kMapValuesFieldName = "values"; @@ -97,16 +104,26 @@ class ScanSpec { constantValue_ = value; } + template + void setConstantValue(T val, TypePtr type, memory::MemoryPool* pool) { + constantValue_ = std::make_shared>( + pool, 1, false, std::move(type), std::move(val)); + } + bool isConstant() const { return constantValue_ != nullptr; } - void setExplicitRowNumber(bool isExplicitRowNumber) { - isExplicitRowNumber_ = isExplicitRowNumber; + void setColumnType(ColumnType value) { + columnType_ = value; } - bool isExplicitRowNumber() const { - return isExplicitRowNumber_; + ColumnType columnType() const { + return columnType_; + } + + bool readFromFile() const { + return columnType_ == ColumnType::kRegular && !isConstant(); } // Name of the value in its container, i.e. field name in struct or @@ -354,7 +371,8 @@ class ScanSpec { VectorPtr constantValue_; bool projectOut_ = false; - bool isExplicitRowNumber_ = false; + ColumnType columnType_ = ColumnType::kRegular; + // True if a string dictionary or flat map in this field should be // returned as flat. bool makeFlat_ = false; diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index c3c6585b3dd3..438bb469bc0b 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -494,11 +494,7 @@ class SelectiveColumnReader { return StringView(data, value.size()); } - // Whether output rows should be filled when there is no column projected out - // and there is delete mutation. Used for row number generation. The case - // for no delete mutation is handled more efficiently outside column reader in - // `RowReader::readWithRowNumber'. - virtual void setFillMutatedOutputRows(bool /*value*/) { + virtual void setCurrentRowNumber(int64_t /*value*/) { VELOX_UNREACHABLE("Only struct reader supports this method"); } diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index 839abc8aaad9..e3bdf614af2a 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -21,6 +21,163 @@ namespace facebook::velox::dwio::common { +namespace { + +bool testFilterOnConstant(const velox::common::ScanSpec& spec) { + if (spec.isConstant() && !spec.constantValue()->isNullAt(0)) { + // Non-null constant is known value during split scheduling and filters on + // them should not be handled at execution level. + return true; + } + // Check filter on missing field. + return !spec.hasFilter() || spec.testNull(); +} + +// Recursively makes empty RowVectors for positions in 'children' where the +// corresponding child type in 'rowType' is a row. The reader expects RowVector +// outputs to be initialized so that the content corresponds to the query schema +// regardless of the file schema. An empty RowVector can have nullptr for all +// its non-row children. +void fillRowVectorChildren( + memory::MemoryPool& pool, + const RowType& rowType, + std::vector& children) { + for (auto i = 0; i < children.size(); ++i) { + const auto& type = rowType.childAt(i); + if (type->isRow()) { + std::vector innerChildren(type->size()); + fillRowVectorChildren(pool, type->asRow(), innerChildren); + children[i] = std::make_shared( + &pool, type, nullptr, 0, std::move(innerChildren)); + } + } +} + +VectorPtr tryReuseResult(const VectorPtr& result) { + if (!result.unique()) { + return nullptr; + } + switch (result->encoding()) { + case VectorEncoding::Simple::ROW: + // Do not call prepareForReuse as it would throw away constant vectors + // that can be reused. Reusability of children should be checked in + // getValues of child readers (all readers other than struct are + // recreating the result vector on each batch currently, so no issue with + // reusability now). + result->reuseNulls(); + result->clearContainingLazyAndWrapped(); + return result; + case VectorEncoding::Simple::LAZY: { + auto& lazy = static_cast(*result); + if (!lazy.isLoaded()) { + return nullptr; + } + return tryReuseResult(lazy.loadedVectorShared()); + } + case VectorEncoding::Simple::DICTIONARY: + return tryReuseResult(result->valueVector()); + default: + return nullptr; + } +} + +void prepareResult(VectorPtr& result) { + if (auto reused = tryReuseResult(result)) { + result = std::move(reused); + return; + } + auto& rowType = result->type()->asRow(); + VLOG(1) << "Reallocating result row vector with " << rowType.size() + << " children"; + std::vector children(rowType.size()); + fillRowVectorChildren(*result->pool(), rowType, children); + result = std::make_shared( + result->pool(), result->type(), nullptr, 0, std::move(children)); +} + +void setConstantField( + const VectorPtr& constant, + vector_size_t size, + VectorPtr& field) { + if (field && field->isConstantEncoding() && field.unique() && + field->size() > 0 && field->equalValueAt(constant.get(), 0, 0)) { + field->resize(size); + } else { + field = BaseVector::wrapInConstant(size, 0, constant); + } +} + +void setNullField( + vector_size_t size, + VectorPtr& field, + const TypePtr& type, + memory::MemoryPool* pool) { + if (field && field->isConstantEncoding() && field.unique() && + field->size() > 0 && field->isNullAt(0)) { + field->resize(size); + } else { + field = BaseVector::createNullConstant(type, size, pool); + } +} + +void setRowNumberField( + int64_t offset, + const RowSet& rows, + memory::MemoryPool* pool, + VectorPtr& field) { + VELOX_CHECK_GE(offset, 0); + if (field && BaseVector::isVectorWritable(field)) { + VELOX_CHECK( + *field->type() == *BIGINT(), + "Unexpected row number type: {}", + field->type()->toString()); + field->clearAllNulls(); + field->resize(rows.size()); + } else { + field = std::make_shared>( + pool, + BIGINT(), + nullptr, + rows.size(), + AlignedBuffer::allocate(rows.size(), pool), + std::vector()); + } + auto* values = field->asChecked>()->mutableRawValues(); + for (int i = 0; i < rows.size(); ++i) { + values[i] = offset + rows[i]; + } +} + +void setCompositeField( + const velox::common::ScanSpec& spec, + int64_t offset, + const RowSet& rows, + memory::MemoryPool* pool, + VectorPtr& field) { + VELOX_CHECK(field && field->type()->isRow()); + prepareResult(field); + auto* rowField = field->asChecked(); + rowField->clearAllNulls(); + rowField->unsafeResize(rows.size()); + for (auto& child : spec.children()) { + auto& childField = rowField->childAt(child->channel()); + switch (child->columnType()) { + case velox::common::ScanSpec::ColumnType::kRegular: + VELOX_CHECK(child->isConstant()); + setConstantField(child->constantValue(), rows.size(), childField); + break; + case velox::common::ScanSpec::ColumnType::kRowIndex: + setRowNumberField(offset, rows, pool, childField); + break; + case velox::common::ScanSpec::ColumnType::kComposite: + setCompositeField(*child, offset, rows, pool, childField); + break; + } + } +} + +} // namespace + void SelectiveStructColumnReaderBase::filterRowGroups( uint64_t rowGroupSize, const dwio::common::StatsContext& context, @@ -79,20 +236,6 @@ void SelectiveStructColumnReaderBase::fillOutputRowsFromMutation( } } -namespace { - -bool testFilterOnConstant(const velox::common::ScanSpec& spec) { - if (spec.isConstant() && !spec.constantValue()->isNullAt(0)) { - // Non-null constant is known value during split scheduling and filters on - // them should not be handled at execution level. - return true; - } - // Check filter on missing field. - return !spec.hasFilter() || spec.testNull(); -} - -} // namespace - void SelectiveStructColumnReaderBase::next( uint64_t numValues, VectorPtr& result, @@ -100,52 +243,61 @@ void SelectiveStructColumnReaderBase::next( process::TraceContext trace("SelectiveStructColumnReaderBase::next"); mutation_ = mutation; hasDeletion_ = common::hasDeletion(mutation); - if (children_.empty()) { - if (hasDeletion_) { - if (fillMutatedOutputRows_) { - fillOutputRowsFromMutation(numValues); - numValues = outputRows_.size(); - } else { - if (mutation->deletedRows) { - numValues -= bits::countBits(mutation->deletedRows, 0, numValues); - } - if (mutation->randomSkip) { - numValues *= mutation->randomSkip->sampleRate(); - } - } - } - for (const auto& childSpec : scanSpec_->children()) { - if (isChildConstant(*childSpec) && !testFilterOnConstant(*childSpec)) { - numValues = 0; - break; - } - } + const RowSet rows(iota(numValues, rows_), numValues); - // No readers - // This can be either count(*) query or a query that select only - // constant columns (partition keys or columns missing from an old file - // due to schema evolution) or row number column. - auto resultRowVector = std::dynamic_pointer_cast(result); - resultRowVector->unsafeResize(numValues); - - for (auto& childSpec : scanSpec_->children()) { - VELOX_CHECK(childSpec->isConstant() || childSpec->isExplicitRowNumber()); - if (childSpec->projectOut() && childSpec->isConstant()) { - const auto channel = childSpec->channel(); - resultRowVector->childAt(channel) = BaseVector::wrapInConstant( - numValues, 0, childSpec->constantValue()); - } - } + if (!children_.empty()) { + read(readOffset_, rows, nullptr); + getValues(outputRows(), &result); return; } - const auto oldSize = rows_.size(); - rows_.resize(numValues); - if (numValues > oldSize) { - std::iota(&rows_[oldSize], &rows_[rows_.size()], oldSize); + // No child readers. This can be either count(*) query or a query that select + // only constant columns (partition keys or columns missing from an old file + // due to schema evolution) or columns not from file (e.g. row numbers). + inputRows_ = rows; + outputRows_.clear(); + if (hasDeletion_) { + fillOutputRowsFromMutation(numValues); + numValues = outputRows_.size(); + } + for (const auto& childSpec : scanSpec_->children()) { + if (isChildConstant(*childSpec) && !testFilterOnConstant(*childSpec)) { + outputRows_.clear(); + numValues = 0; + break; + } + } + prepareResult(result); + auto* resultRowVector = result->asChecked(); + resultRowVector->unsafeResize(numValues); + for (auto& childSpec : scanSpec_->children()) { + if (!childSpec->projectOut()) { + continue; + } + auto& childField = resultRowVector->childAt(childSpec->channel()); + if (childSpec->isConstant()) { + childField = + BaseVector::wrapInConstant(numValues, 0, childSpec->constantValue()); + } else if ( + childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kRowIndex) { + VELOX_CHECK(!childSpec->hasFilter()); + setRowNumberField( + currentRowNumber_, outputRows(), resultRowVector->pool(), childField); + } else if ( + childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kComposite) { + VELOX_CHECK(!childSpec->hasFilter()); + setCompositeField( + *childSpec, + currentRowNumber_, + outputRows(), + resultRowVector->pool(), + childField); + } else { + VELOX_UNREACHABLE(); + } } - read(readOffset_, rows_, nullptr); - getValues(outputRows(), &result); } void SelectiveStructColumnReaderBase::read( @@ -201,7 +353,8 @@ void SelectiveStructColumnReaderBase::read( continue; } - if (childSpec->isExplicitRowNumber()) { + if (!childSpec->readFromFile()) { + VELOX_CHECK(!childSpec->hasFilter()); continue; } @@ -258,7 +411,7 @@ void SelectiveStructColumnReaderBase::recordParentNullsInChildren( if (isChildConstant(*childSpec)) { continue; } - if (childSpec->isExplicitRowNumber()) { + if (!childSpec->readFromFile()) { continue; } @@ -290,84 +443,6 @@ bool SelectiveStructColumnReaderBase::isChildMissing( childSpec.channel() >= fileType_->size()); } -namespace { - -// Recursively makes empty RowVectors for positions in 'children' -// where the corresponding child type in 'rowType' is a row. The -// reader expects RowVector outputs to be initialized so that the -// content corresponds to the query schema regardless of the file -// schema. An empty RowVector can have nullptr for all its non-row -// children. -void fillRowVectorChildren( - memory::MemoryPool& pool, - const RowType& rowType, - std::vector& children) { - for (auto i = 0; i < children.size(); ++i) { - const auto& type = rowType.childAt(i); - if (type->isRow()) { - std::vector innerChildren(type->size()); - fillRowVectorChildren(pool, type->asRow(), innerChildren); - children[i] = std::make_shared( - &pool, type, nullptr, 0, std::move(innerChildren)); - } - } -} - -VectorPtr tryReuseResult(const VectorPtr& result) { - if (!result.unique()) { - return nullptr; - } - switch (result->encoding()) { - case VectorEncoding::Simple::ROW: - // Do not call prepareForReuse as it would throw away constant vectors - // that can be reused. Reusability of children should be checked in - // getValues of child readers (all readers other than struct are - // recreating the result vector on each batch currently, so no issue with - // reusability now). - result->reuseNulls(); - result->clearContainingLazyAndWrapped(); - return result; - case VectorEncoding::Simple::LAZY: { - auto& lazy = static_cast(*result); - if (!lazy.isLoaded()) { - return nullptr; - } - return tryReuseResult(lazy.loadedVectorShared()); - } - case VectorEncoding::Simple::DICTIONARY: - return tryReuseResult(result->valueVector()); - default: - return nullptr; - } -} - -void setConstantField( - const VectorPtr& constant, - vector_size_t size, - VectorPtr& field) { - if (field && field->isConstantEncoding() && field.unique() && - field->size() > 0 && field->equalValueAt(constant.get(), 0, 0)) { - field->resize(size); - } else { - field = BaseVector::wrapInConstant(size, 0, constant); - } -} - -void setNullField( - vector_size_t size, - VectorPtr& field, - const TypePtr& type, - memory::MemoryPool* pool) { - if (field && field->isConstantEncoding() && field.unique() && - field->size() > 0 && field->isNullAt(0)) { - field->resize(size); - } else { - field = BaseVector::createNullConstant(type, size, pool); - } -} - -} // namespace - void SelectiveStructColumnReaderBase::getValues( const RowSet& rows, VectorPtr* result) { @@ -378,21 +453,7 @@ void SelectiveStructColumnReaderBase::getValues( result->get()->type()->isRow(), "Struct reader expects a result of type ROW."); auto& rowType = result->get()->type()->asRow(); - if (auto reused = tryReuseResult(*result)) { - *result = std::move(reused); - } else { - VLOG(1) << "Reallocating result row vector with " << rowType.size() - << " children"; - std::vector children(rowType.size()); - fillRowVectorChildren(*result->get()->pool(), rowType, children); - *result = std::make_shared( - result->get()->pool(), - result->get()->type(), - nullptr, - 0, - std::move(children)); - } - + prepareResult(*result); auto* resultRow = static_cast(result->get()); resultRow->unsafeResize(rows.size()); if (rows.empty()) { @@ -407,10 +468,6 @@ void SelectiveStructColumnReaderBase::getValues( continue; } - if (childSpec->isExplicitRowNumber()) { - // Row number data is generated after, skip data loading for it. - continue; - } const auto channel = childSpec->channel(); auto& childResult = resultRow->childAt(channel); if (childSpec->isConstant()) { @@ -418,6 +475,20 @@ void SelectiveStructColumnReaderBase::getValues( continue; } + if (childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kRowIndex) { + setRowNumberField( + currentRowNumber_, rows, resultRow->pool(), childResult); + continue; + } + + if (childSpec->columnType() == + velox::common::ScanSpec::ColumnType::kComposite) { + setCompositeField( + *childSpec, currentRowNumber_, rows, resultRow->pool(), childResult); + continue; + } + const auto index = childSpec->subscript(); // Set missing fields to be null constant, if we're in the top level struct // missing columns should already be a null constant from the check above. diff --git a/velox/dwio/common/SelectiveStructColumnReader.h b/velox/dwio/common/SelectiveStructColumnReader.h index 3b264c948d71..17d98815c015 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.h +++ b/velox/dwio/common/SelectiveStructColumnReader.h @@ -98,8 +98,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { return debugString_; } - void setFillMutatedOutputRows(bool value) final { - fillMutatedOutputRows_ = value; + void setCurrentRowNumber(int64_t value) final { + currentRowNumber_ = value; } protected: @@ -121,12 +121,6 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { getExceptionContext().message(VeloxException::Type::kSystem)), isRoot_(isRoot) {} - /// Records the number of nulls added by 'this' between the end position of - /// each child reader and the end of the range of 'read(). This must be done - /// also if a child is not read so that we know how much to skip when seeking - /// forward within the row group. - void recordParentNullsInChildren(int64_t offset, const RowSet& rows); - bool hasDeletion() const final { return hasDeletion_; } @@ -141,8 +135,17 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { isChildMissing(childSpec); } + std::vector children_; + + private: void fillOutputRowsFromMutation(vector_size_t size); + /// Records the number of nulls added by 'this' between the end position of + /// each child reader and the end of the range of 'read(). This must be done + /// also if a child is not read so that we know how much to skip when seeking + /// forward within the row group. + void recordParentNullsInChildren(int64_t offset, const RowSet& rows); + // Context information obtained from ExceptionContext. Stored here // so that LazyVector readers under this can add this to their // ExceptionContext. Allows contextualizing reader errors to split @@ -154,7 +157,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { // table. const bool isRoot_; - std::vector children_; + // Dense set of rows to read in next(). + raw_vector rows_; // Sequence number of output batch. Checked against ColumnLoaders // created by 'this' to verify they are still valid at load. @@ -162,16 +166,13 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { int64_t lazyVectorReadOffset_; - // Dense set of rows to read in next(). - raw_vector rows_; + int64_t currentRowNumber_ = -1; const Mutation* mutation_ = nullptr; // After read() call mutation_ could go out of scope. Need to keep this // around for lazy columns. bool hasDeletion_ = false; - - bool fillMutatedOutputRows_ = false; }; class SelectiveStructColumnReader : public SelectiveStructColumnReaderBase { diff --git a/velox/dwio/common/tests/OptionsTests.cpp b/velox/dwio/common/tests/OptionsTests.cpp index 4492a62420d0..82335821c521 100644 --- a/velox/dwio/common/tests/OptionsTests.cpp +++ b/velox/dwio/common/tests/OptionsTests.cpp @@ -30,12 +30,10 @@ TEST(OptionsTests, setRowNumberColumnInfoTest) { RowNumberColumnInfo rowNumberColumnInfo; rowNumberColumnInfo.insertPosition = 0; rowNumberColumnInfo.name = "test"; - rowNumberColumnInfo.isExplicit = true; rowReaderOptions.setRowNumberColumnInfo(rowNumberColumnInfo); auto rowNumberColumn = rowReaderOptions.rowNumberColumnInfo().value(); ASSERT_EQ(rowNumberColumnInfo.insertPosition, rowNumberColumn.insertPosition); ASSERT_EQ(rowNumberColumnInfo.name, rowNumberColumn.name); - ASSERT_EQ(rowNumberColumnInfo.isExplicit, rowNumberColumn.isExplicit); } TEST(OptionsTests, testRowNumberColumnInfoInCopy) { diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 797c3551b625..2f63deef5961 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -169,8 +169,6 @@ void DwrfUnit::ensureDecoders() { flatMapContext, /*isRoot=*/true); selectiveColumnReader_->setIsTopLevel(); - selectiveColumnReader_->setFillMutatedOutputRows( - options_.rowNumberColumnInfo().has_value()); } else { auto requestedType = columnSelector_->getSchemaWithId(); auto factory = &ColumnReaderFactory::defaultFactory(); @@ -526,19 +524,14 @@ void DwrfRowReader::readNext( } return; } - + auto& columnReader = getSelectiveColumnReader(); + columnReader->setCurrentRowNumber(previousRow_); if (!options_.rowNumberColumnInfo().has_value()) { - getSelectiveColumnReader()->next(rowsToRead, result, mutation); + columnReader->next(rowsToRead, result, mutation); return; } - readWithRowNumber( - getSelectiveColumnReader(), - options_, - previousRow_, - rowsToRead, - mutation, - result); + columnReader, options_, previousRow_, rowsToRead, mutation, result); } uint64_t DwrfRowReader::skip(uint64_t numValues) { diff --git a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp index 6e817dc678db..c8efa7e409aa 100644 --- a/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStructColumnReader.cpp @@ -50,13 +50,13 @@ SelectiveStructColumnReader::SelectiveStructColumnReader( const auto& rowType = requestedType_->asRow(); for (auto i = 0; i < childSpecs.size(); ++i) { auto* childSpec = childSpecs[i]; - if (childSpec->isExplicitRowNumber()) { - continue; - } if (childSpec->isConstant() || isChildMissing(*childSpec)) { childSpec->setSubscript(kConstantChildSpecSubscript); continue; } + if (!childSpec->readFromFile()) { + continue; + } const auto childFileType = fileType_->childByName(childSpec->fieldName()); const auto childRequestedType = rowType.findChild(childSpec->fieldName()); diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index daeb070d1c7c..1f9fac33ab0a 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -2054,13 +2054,19 @@ namespace { void verifyRowNumbers( RowReader& rowReader, memory::MemoryPool* pool, - int expectedNumRows) { - auto result = BaseVector::create(ROW({{"c0", INTEGER()}}), 0, pool); + int expectedNumRows, + bool explicitRowNumber = false) { + auto result = explicitRowNumber + ? BaseVector::create( + ROW({{"c0", INTEGER()}, {"$row_number", BIGINT()}}), 0, pool) + : BaseVector::create(ROW({{"c0", INTEGER()}}), 0, pool); int numRows = 0; while (rowReader.next(10, result) > 0) { auto* rowVector = result->asUnchecked(); ASSERT_EQ(2, rowVector->childrenSize()); - ASSERT_EQ(rowVector->type()->asRow().nameOf(1), ""); + ASSERT_EQ( + rowVector->type()->asRow().nameOf(1), + explicitRowNumber ? "$row_number" : ""); DecodedVector values(*rowVector->childAt(0)); DecodedVector rowNumbers(*rowVector->childAt(1)); for (size_t i = 0; i < rowVector->size(); ++i) { @@ -2118,7 +2124,6 @@ TEST_F(TestReader, setRowNumberColumnInfo) { RowNumberColumnInfo rowNumberColumnInfo; rowNumberColumnInfo.insertPosition = 1; rowNumberColumnInfo.name = ""; - rowNumberColumnInfo.isExplicit = false; rowReaderOpts.setRowNumberColumnInfo(rowNumberColumnInfo); { SCOPED_TRACE("Selective no filter"); @@ -2148,7 +2153,6 @@ TEST_F(TestReader, reuseRowNumberColumn) { RowNumberColumnInfo rowNumberColumnInfo; rowNumberColumnInfo.insertPosition = 1; rowNumberColumnInfo.name = ""; - rowNumberColumnInfo.isExplicit = false; rowReaderOpts.setRowNumberColumnInfo(rowNumberColumnInfo); { SCOPED_TRACE("Reuse passed in"); @@ -2197,6 +2201,37 @@ TEST_F(TestReader, reuseRowNumberColumn) { } } +TEST_F(TestReader, explicitRowNumberColumn) { + const std::vector> integerValues{ + {0, 1, 2, 3, 4}, + {5, 6, 7}, + {8}, + {}, + {9, 10, 11, 12, 13, 14, 15}, + }; + auto batches = createBatches(integerValues); + auto [writer, reader] = createWriterReader(batches, pool()); + auto spec = std::make_shared(""); + spec->addField("c0", 0); + spec->addField("$row_number", 1) + ->setColumnType(common::ScanSpec::ColumnType::kRowIndex); + RowReaderOptions rowReaderOpts; + rowReaderOpts.setScanSpec(spec); + { + SCOPED_TRACE("Selective no filter"); + auto rowReader = reader->createRowReader(rowReaderOpts); + verifyRowNumbers(*rowReader, pool(), 16, true); + } + spec->childByName("c0")->setFilter( + common::createBigintValues({1, 4, 5, 7, 11, 14}, false)); + spec->resetCachedValues(true); + { + SCOPED_TRACE("Selective with filter"); + auto rowReader = reader->createRowReader(rowReaderOpts); + verifyRowNumbers(*rowReader, pool(), 6, true); + } +} + TEST_F(TestReader, failToReuseReaderNulls) { auto c0 = makeRowVector( {"a", "b"}, diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 5e2d73221dec..4e83898b69ff 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -947,8 +947,6 @@ class ParquetRowReader::Impl { readerBase_->schemaWithId(), // Id is schema id params, *options_.scanSpec()); - columnReader_->setFillMutatedOutputRows( - options_.rowNumberColumnInfo().has_value()); columnReader_->setIsTopLevel(); filterRowGroups(); @@ -1021,6 +1019,7 @@ class ParquetRowReader::Impl { return 0; } VELOX_DCHECK_GT(rowsToRead, 0); + columnReader_->setCurrentRowNumber(nextRowNumber()); if (!options_.rowNumberColumnInfo().has_value()) { columnReader_->next(rowsToRead, result, mutation); } else { diff --git a/velox/dwio/parquet/reader/StructColumnReader.cpp b/velox/dwio/parquet/reader/StructColumnReader.cpp index 4dfdc4e0f756..0b808da74e9a 100644 --- a/velox/dwio/parquet/reader/StructColumnReader.cpp +++ b/velox/dwio/parquet/reader/StructColumnReader.cpp @@ -39,7 +39,7 @@ StructColumnReader::StructColumnReader( childSpec->setSubscript(kConstantChildSpecSubscript); continue; } - if (childSpecs[i]->isExplicitRowNumber()) { + if (!childSpecs[i]->readFromFile()) { continue; } auto childFileType = fileType_->childByName(childSpec->fieldName()); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 9d5d4edc1ac2..da5b7c02147f 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -5213,3 +5213,111 @@ TEST_F(TableScanTest, hugeStripe) { } ASSERT_EQ(numRows, 4'294'980'000); } + +TEST_F(TableScanTest, rowId) { + const auto rowIdType = + ROW({"row_number", + "row_group_id", + "metadata_version", + "partition_id", + "table_guid"}, + {BIGINT(), VARCHAR(), BIGINT(), BIGINT(), VARCHAR()}); + auto data = makeFlatVector(10, [](auto i) { return i + 1; }); + auto vector = makeRowVector({data}); + auto file = TempFilePath::create(); + writeToFile(file->getPath(), {vector}); + auto makeRowIdColumnHandle = [&](auto& name) { + return std::make_shared( + name, HiveColumnHandle::ColumnType::kRowId, rowIdType, rowIdType); + }; + { + SCOPED_TRACE("Preload"); + auto outputType = ROW({"c0", "c1"}, {BIGINT(), rowIdType}); + auto plan = PlanBuilder() + .startTableScan() + .outputType(outputType) + .assignments({ + {"c0", makeColumnHandle("c0", BIGINT(), {})}, + {"c1", makeRowIdColumnHandle("c1")}, + }) + .endTableScan() + .planNode(); + auto query = AssertQueryBuilder(plan); + query.config(core::QueryConfig::kMaxSplitPreloadPerDriver, "4"); + auto expected = BaseVector::create(outputType, 0, pool()); + for (int i = 0; i < 10; ++i) { + auto split = makeHiveConnectorSplit(file->getPath()); + split->rowIdProperties = { + .metadataVersion = i, + .partitionId = 2 * i, + .tableGuid = fmt::format("table-guid-{}", i), + }; + query.split(split); + auto rowGroupId = split->getFileName(); + auto newExpected = makeRowVector({ + data, + makeRowVector({ + makeFlatVector(10, folly::identity), + makeConstant(StringView(rowGroupId), 10), + makeConstant(split->rowIdProperties->metadataVersion, 10), + makeConstant(split->rowIdProperties->partitionId, 10), + makeConstant(StringView(split->rowIdProperties->tableGuid), 10), + }), + }); + expected->append(newExpected.get()); + } + auto task = query.assertResults(expected); + auto stats = getTableScanRuntimeStats(task); + ASSERT_GT(stats.at("preloadedSplits").sum, 0); + } + { + SCOPED_TRACE("Remaining filter only"); + auto remainingFilter = + parseExpr("c1.row_number % 2 == 0", ROW({"c1"}, {rowIdType})); + auto plan = PlanBuilder() + .startTableScan() + .tableHandle(makeTableHandle({}, remainingFilter)) + .outputType(ROW({"c0"}, {BIGINT()})) + .assignments({ + {"c0", makeColumnHandle("c0", BIGINT(), {})}, + {"c1", makeRowIdColumnHandle("c1")}, + }) + .endTableScan() + .planNode(); + auto split = makeHiveConnectorSplit(file->getPath()); + split->rowIdProperties = { + .metadataVersion = 42, + .partitionId = 24, + .tableGuid = "foo", + }; + auto expected = makeRowVector( + {makeFlatVector(5, [](auto i) { return 1 + 2 * i; })}); + AssertQueryBuilder(plan).split(split).assertResults(expected); + } + { + SCOPED_TRACE("Row ID only"); + auto plan = PlanBuilder() + .startTableScan() + .outputType(ROW({"c0"}, {rowIdType})) + .assignments({{"c0", makeRowIdColumnHandle("c0")}}) + .endTableScan() + .planNode(); + auto split = makeHiveConnectorSplit(file->getPath()); + split->rowIdProperties = { + .metadataVersion = 42, + .partitionId = 24, + .tableGuid = "foo", + }; + auto rowGroupId = split->getFileName(); + auto expected = makeRowVector({ + makeRowVector({ + makeFlatVector(10, folly::identity), + makeConstant(StringView(rowGroupId), 10), + makeConstant(split->rowIdProperties->metadataVersion, 10), + makeConstant(split->rowIdProperties->partitionId, 10), + makeConstant(StringView(split->rowIdProperties->tableGuid), 10), + }), + }); + AssertQueryBuilder(plan).split(split).assertResults(expected); + } +}