Skip to content

Commit

Permalink
Row index metadata column support for table scan (#9174)
Browse files Browse the repository at this point in the history
Summary:
Spark support people query row_index metadata column of parquet file. Checking this spark part implement -apache/spark@95aebcb

however, velox doesn't support row_index metadata, below spark query would return null for row index column

```
select a, _tmp_metadata_row_index from table;
```

related issue #9165

The PR introduces a new column handle type `kRowIndex` which can be used to indicate which column is row index column need be generated if you want to add a new column to the results containing the row numbers.  The new column contains row number of type `BIGINT` in the file starting from 0 before any filtering and mutation, and works on all file formats supported.

Pull Request resolved: #9174

Reviewed By: mbasmanova

Differential Revision: D56472291

Pulled By: Yuhta

fbshipit-source-id: 848693a9ccc5ee5e3279f012d6198721b7691d6f
  • Loading branch information
gaoyangxiaozhu authored and facebook-github-bot committed May 9, 2024
1 parent 3e98d40 commit 49c3ebb
Show file tree
Hide file tree
Showing 24 changed files with 335 additions and 122 deletions.
11 changes: 11 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ inline bool isSynthesizedColumn(
return name == kPath || name == kBucket || infoColumns.count(name) != 0;
}

inline bool isRowIndexColumn(
const std::string& name,
std::shared_ptr<HiveColumnHandle> rowIndexColumn) {
return rowIndexColumn != nullptr && rowIndexColumn->name() == name;
}

} // namespace

const std::string& getColumnName(const common::Subfield& subfield) {
Expand Down Expand Up @@ -343,6 +349,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
Expand All @@ -351,6 +358,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
for (auto& [subfield, _] : filters) {
if (auto name = subfield.toString();
!isSynthesizedColumn(name, infoColumns) &&
!isRowIndexColumn(name, rowIndexColumn) &&
partitionKeys.count(name) == 0) {
filterSubfields[getColumnName(subfield)].push_back(&subfield);
}
Expand All @@ -360,6 +368,9 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
for (int i = 0; i < rowType->size(); ++i) {
auto& name = rowType->nameOf(i);
auto& type = rowType->childAt(i);
if (isRowIndexColumn(name, rowIndexColumn)) {
continue;
}
auto it = outputSubfields.find(name);
if (it == outputSubfields.end()) {
auto* fieldSpec = spec->addFieldRecursively(name, *type, i);
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ HiveDataSource::HiveDataSource(
if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) {
infoColumns_.emplace(handle->name(), handle);
}

if (handle->columnType() == HiveColumnHandle::ColumnType::kRowIndex) {
rowIndexColumn_ = handle;
}
}

std::vector<std::string> readerRowNames;
Expand Down Expand Up @@ -153,6 +157,7 @@ HiveDataSource::HiveDataSource(
hiveTableHandle_->dataColumns(),
partitionKeys_,
infoColumns_,
rowIndexColumn_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down Expand Up @@ -193,7 +198,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_);
}

std::optional<RowVectorPtr> HiveDataSource::next(
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class HiveDataSource : public DataSource {
const ConnectorQueryCtx* const connectorQueryCtx_;
const std::shared_ptr<HiveConfig> hiveConfig_;
std::shared_ptr<io::IoStatistics> ioStats_;
std::shared_ptr<HiveColumnHandle> rowIndexColumn_;

private:
// Evaluates remainingFilter_ on the specified vector. Returns number of rows
Expand Down
31 changes: 27 additions & 4 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,16 @@ void SplitReader::configureReaderOptions(

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader(metadataFilter);
createRowReader(metadataFilter, rowIndexColumn);
}

uint64_t SplitReader::next(uint64_t size, VectorPtr& output) {
Expand Down Expand Up @@ -278,23 +279,45 @@ bool SplitReader::checkIfSplitIsEmpty(
}

void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter) {
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.getFileSchema());
auto columnNames = fileType->names();
if (rowIndexColumn != nullptr) {
setRowIndexColumn(fileType, rowIndexColumn);
}

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::vector<std::string>(fileType->names()), std::move(columnTypes)),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

void SplitReader::setRowIndexColumn(
const RowTypePtr& fileType,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
auto rowIndexColumnName = rowIndexColumn->name();
auto rowIndexMetaColIdx =
readerOutputType_->getChildIdxIfExists(rowIndexColumnName);
if (rowIndexMetaColIdx.has_value() &&
!fileType->containsChild(rowIndexColumnName) &&
hiveSplit_->partitionKeys.find(rowIndexColumnName) ==
hiveSplit_->partitionKeys.end()) {
dwio::common::RowNumberColumnInfo rowNumberColumnInfo;
rowNumberColumnInfo.insertPosition = rowIndexMetaColIdx.value();
rowNumberColumnInfo.name = rowIndexColumnName;
baseRowReaderOpts_.setRowNumberColumnInfo(std::move(rowNumberColumnInfo));
}
}

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
Expand Down
11 changes: 9 additions & 2 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class SplitReader {
/// would be called only once per incoming split
virtual void prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats);
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);

virtual uint64_t next(uint64_t size, VectorPtr& output);

Expand Down Expand Up @@ -126,14 +127,20 @@ class SplitReader {

/// Create the dwio::common::RowReader object baseRowReader_, which owns the
/// ColumnReaders that will be used to read the data
void createRowReader(std::shared_ptr<common::MetadataFilter> metadataFilter);
void createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);

/// Different table formats may have different meatadata columns.
/// This function will be used to update the scanSpec for these columns.
virtual std::vector<TypePtr> adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema);

void setRowIndexColumn(
const RowTypePtr& fileType,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);

void setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
Expand Down
9 changes: 8 additions & 1 deletion velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ using SubfieldFilters =

class HiveColumnHandle : public ColumnHandle {
public:
enum class ColumnType { kPartitionKey, kRegular, kSynthesized };
enum class ColumnType {
kPartitionKey,
kRegular,
kSynthesized,
/// A zero-based row number of type BIGINT auto-generated by the connector.
/// Rows numbers are unique within a single file only.
kRowIndex
};

/// NOTE: 'dataType' is the column type in target write table. 'hiveType' is
/// converted type of the corresponding column in source table which might not
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ IcebergSplitReader::IcebergSplitReader(

void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader(metadataFilter);
createRowReader(metadataFilter, rowIndexColumn);

std::shared_ptr<const HiveIcebergSplit> icebergSplit =
std::dynamic_pointer_cast<const HiveIcebergSplit>(hiveSplit_);
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/iceberg/IcebergSplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class IcebergSplitReader : public SplitReader {

void prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) override;
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) override;

uint64_t next(uint64_t size, VectorPtr& output) override;

Expand Down
33 changes: 30 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) {
auto rowType = ROW({{"c0", columnType}});
auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"});
auto scanSpec = makeScanSpec(
rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get());
rowType,
groupSubfields(subfields),
{},
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
validateNullConstant(*c0c0, *BIGINT());
auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1");
Expand Down Expand Up @@ -128,6 +135,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant());
Expand All @@ -151,6 +159,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->maxArrayElementsCount(), 2);
Expand All @@ -168,7 +177,8 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) {
auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"});
auto groupedSubfields = groupSubfields(subfields);
VELOX_ASSERT_USER_THROW(
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()),
makeScanSpec(
rowType, groupedSubfields, {}, nullptr, {}, {}, nullptr, pool_.get()),
"Non-positive array subscript cannot be push down");
}

Expand All @@ -184,6 +194,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(
Expand Down Expand Up @@ -212,6 +223,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_TRUE(c0->flatMapFeatureSelection().empty());
Expand All @@ -232,6 +244,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_TRUE(mapKeyIsNotNull(*c0));
Expand All @@ -255,6 +268,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -283,6 +297,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
nullptr,
{},
{},
nullptr,
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -302,6 +317,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
nullptr,
{},
{},
nullptr,
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -318,6 +334,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
nullptr,
{},
{},
nullptr,
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -354,6 +371,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) {
ROW({{"c0", c0Type}, {"c1", c1Type}}),
{},
{},
nullptr,
pool_.get());
auto c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->isConstant());
Expand Down Expand Up @@ -399,6 +417,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->children().size(), 2);
Expand All @@ -412,7 +431,14 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) {
SubfieldFilters filters;
filters.emplace(Subfield("ds"), exec::equal("2023-10-13"));
auto scanSpec = makeScanSpec(
rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get());
rowType,
{},
filters,
rowType,
{{"ds", nullptr}},
{},
nullptr,
pool_.get());
ASSERT_TRUE(scanSpec->childByName("c0")->projectOut());
ASSERT_FALSE(scanSpec->childByName("ds")->projectOut());
}
Expand All @@ -430,6 +456,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_prunedMapNonNullMapKey) {
nullptr,
{},
{},
nullptr,
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->children().size(), 2);
Expand Down
Loading

0 comments on commit 49c3ebb

Please sign in to comment.