Skip to content

Commit

Permalink
Add Row ID column reading support
Browse files Browse the repository at this point in the history
Summary:
Create framework in selective readers for generating row number field
and composite with other constant fields.  Then use this framework to add
support for Row ID column.  We move the row number generation logic into column
reader during this process, so that we could filter them if needed in the
future.

Differential Revision: D65028172
  • Loading branch information
Yuhta authored and facebook-github-bot committed Oct 28, 2024
1 parent a1d923e commit b2cbd60
Show file tree
Hide file tree
Showing 28 changed files with 586 additions and 368 deletions.
3 changes: 2 additions & 1 deletion velox/common/base/RawVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
namespace facebook::velox {

namespace {
std::vector<int32_t> iotaData;
raw_vector<int32_t> iotaData;

bool initializeIota() {
iotaData.resize(10000);
iotaData.resize(iotaData.capacity());
std::iota(iotaData.begin(), iotaData.end(), 0);
return true;
}
Expand Down
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ struct HiveBucketConversion {
std::vector<std::unique_ptr<HiveColumnHandle>> bucketColumnHandles;
};

struct RowIdProperties {
int64_t metadataVersion;
int64_t partitionId;
std::string tableGuid;
};

struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::string filePath;
dwio::common::FileFormat fileFormat;
Expand All @@ -62,6 +68,8 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
/// the file handle.
std::optional<FileProperties> properties;

std::optional<RowIdProperties> rowIdProperties;

HiveConnectorSplit(
const std::string& connectorId,
const std::string& _filePath,
Expand Down
40 changes: 24 additions & 16 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ inline bool isSynthesizedColumn(
return infoColumns.count(name) != 0;
}

inline bool isRowIndexColumn(
bool isSpecialColumn(
const std::string& name,
std::shared_ptr<HiveColumnHandle> rowIndexColumn) {
return rowIndexColumn != nullptr && rowIndexColumn->name() == name;
const std::optional<std::string>& specialName) {
return specialName.has_value() && name == *specialName;
}

} // namespace
Expand Down Expand Up @@ -368,7 +368,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn,
const SpecialColumnNames& specialColumns,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
Expand All @@ -377,8 +377,9 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
for (auto& [subfield, _] : filters) {
if (auto name = subfield.toString();
!isSynthesizedColumn(name, infoColumns) &&
!isRowIndexColumn(name, rowIndexColumn) &&
partitionKeys.count(name) == 0) {
VELOX_CHECK(!isSpecialColumn(name, specialColumns.rowIndex));
VELOX_CHECK(!isSpecialColumn(name, specialColumns.rowId));
filterSubfields[getColumnName(subfield)].push_back(&subfield);
}
}
Expand All @@ -387,13 +388,27 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
for (int i = 0; i < rowType->size(); ++i) {
auto& name = rowType->nameOf(i);
auto& type = rowType->childAt(i);
if (isSpecialColumn(name, specialColumns.rowIndex)) {
VELOX_CHECK(type->isBigint());
auto* fieldSpec = spec->addField(name, i);
fieldSpec->setColumnType(common::ScanSpec::ColumnType::kRowIndex);
continue;
}
if (isSpecialColumn(name, specialColumns.rowId)) {
VELOX_CHECK(type->isRow() && type->size() == 5);
auto* rowId = spec->addField(name, i);
rowId->setColumnType(common::ScanSpec::ColumnType::kComposite);
rowId->addField("rowNumber", 0)
->setColumnType(common::ScanSpec::ColumnType::kRowIndex);
rowId->addField("rowGroupId", 1);
rowId->addField("metadataVersion", 2);
rowId->addField("partitionId", 3);
rowId->addField("rowGuid", 4);
continue;
}
auto it = outputSubfields.find(name);
if (it == outputSubfields.end()) {
auto* fieldSpec = spec->addFieldRecursively(name, *type, i);
if (isRowIndexColumn(name, rowIndexColumn)) {
VELOX_CHECK(type->isBigint());
fieldSpec->setExplicitRowNumber(true);
}
processFieldSpec(dataColumns, type, *fieldSpec);
filterSubfields.erase(name);
continue;
Expand All @@ -409,12 +424,6 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
filterSubfields.erase(it);
}
auto* fieldSpec = spec->addField(name, i);
if (isRowIndexColumn(name, rowIndexColumn)) {
VELOX_CHECK(type->isBigint());
// Set the flag for the case that the row index column only exists in
// remaining filters.
fieldSpec->setExplicitRowNumber(true);
}
addSubfields(*type, subfieldSpecs, 1, pool, *fieldSpec);
processFieldSpec(dataColumns, type, *fieldSpec);
subfieldSpecs.clear();
Expand Down Expand Up @@ -448,7 +457,6 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
if (isSynthesizedColumn(name, infoColumns)) {
continue;
}
VELOX_CHECK(!isRowIndexColumn(name, rowIndexColumn));
auto fieldSpec = spec->getOrCreateChild(pair.first);
fieldSpec->addFilter(*pair.second);
}
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ void checkColumnNameLowerCase(

void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr);

struct SpecialColumnNames {
std::optional<std::string> rowIndex;
std::optional<std::string> rowId;
};

std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& rowType,
const folly::F14FastMap<std::string, std::vector<const common::Subfield*>>&
Expand All @@ -54,7 +59,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn,
const SpecialColumnNames& specialColumns,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
56 changes: 40 additions & 16 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/expression/FieldReference.h"

Expand Down Expand Up @@ -81,18 +80,21 @@ HiveDataSource::HiveDataSource(
handle,
"ColumnHandle must be an instance of HiveColumnHandle for {}",
canonicalizedName);

if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) {
partitionKeys_.emplace(handle->name(), handle);
}

if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) {
infoColumns_.emplace(handle->name(), handle);
}

if (handle->columnType() == HiveColumnHandle::ColumnType::kRowIndex) {
VELOX_CHECK_NULL(rowIndexColumn_);
rowIndexColumn_ = handle;
switch (handle->columnType()) {
case HiveColumnHandle::ColumnType::kRegular:
break;
case HiveColumnHandle::ColumnType::kPartitionKey:
partitionKeys_.emplace(handle->name(), handle);
break;
case HiveColumnHandle::ColumnType::kSynthesized:
infoColumns_.emplace(handle->name(), handle);
break;
case HiveColumnHandle::ColumnType::kRowIndex:
specialColumns_.rowIndex = handle->name();
break;
case HiveColumnHandle::ColumnType::kRowId:
specialColumns_.rowId = handle->name();
break;
}
}

Expand Down Expand Up @@ -192,7 +194,7 @@ HiveDataSource::HiveDataSource(
hiveTableHandle_->dataColumns(),
partitionKeys_,
infoColumns_,
rowIndexColumn_,
specialColumns_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down Expand Up @@ -257,7 +259,7 @@ std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
hiveTableHandle_->dataColumns(),
partitionKeys_,
infoColumns_,
rowIndexColumn_,
specialColumns_,
pool_);
newScanSpec->moveAdaptationFrom(*scanSpec_);
scanSpec_ = std::move(newScanSpec);
Expand All @@ -266,6 +268,25 @@ std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
split_->bucketConversion->tableBucketCount, std::move(bucketChannels));
}

void HiveDataSource::setupRowIdColumn() {
VELOX_CHECK(split_->rowIdProperties.has_value());
const auto& props = *split_->rowIdProperties;
auto* rowId = scanSpec_->childByName(*specialColumns_.rowId);
VELOX_CHECK_NOT_NULL(rowId);
auto rowGroupId = split_->getFileName();
rowId->childByName("rowGroupId")
->setConstantValue<StringView>(
StringView(rowGroupId), VARCHAR(), connectorQueryCtx_->memoryPool());
rowId->childByName("metadataVersion")
->setConstantValue<int64_t>(
props.metadataVersion, BIGINT(), connectorQueryCtx_->memoryPool());
rowId->childByName("partitionId")
->setConstantValue<int64_t>(
props.partitionId, BIGINT(), connectorQueryCtx_->memoryPool());
rowId->childByName("rowGuid")->setConstantValue<StringView>(
StringView(props.tableGuid), VARCHAR(), connectorQueryCtx_->memoryPool());
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
VELOX_CHECK_NULL(
split_,
Expand All @@ -284,12 +305,15 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
} else {
partitionFunction_.reset();
}
if (specialColumns_.rowId.has_value()) {
setupRowIdColumn();
}

splitReader_ = createSplitReader();
// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
}

vector_size_t HiveDataSource::applyBucketConversion(
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/FileHandle.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/SplitReader.h"
#include "velox/connectors/hive/TableHandle.h"
Expand Down Expand Up @@ -125,14 +126,15 @@ class HiveDataSource : public DataSource {
partitionKeys_;

std::shared_ptr<io::IoStatistics> ioStats_;
std::shared_ptr<HiveColumnHandle> rowIndexColumn_;

private:
std::unique_ptr<HivePartitionFunction> setupBucketConversion();
vector_size_t applyBucketConversion(
const RowVectorPtr& rowVector,
BufferPtr& indices);

void setupRowIdColumn();

// Evaluates remainingFilter_ on the specified vector. Returns number of rows
// passed. Populates filterEvalCtx_.selectedIndices and selectedBits if only
// some rows passed the filter. If none or all rows passed
Expand All @@ -157,6 +159,7 @@ class HiveDataSource : public DataSource {
// Column handles for the Split info columns keyed on their column names.
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
infoColumns_;
SpecialColumnNames specialColumns_{};
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
subfields_;
SubfieldFilters filters_;
Expand Down
26 changes: 5 additions & 21 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ void SplitReader::configureReaderOptions(

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader(std::move(metadataFilter), rowIndexColumn);
dwio::common::RuntimeStatistics& runtimeStats) {
createReader(std::move(metadataFilter));

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
Expand Down Expand Up @@ -222,8 +221,7 @@ std::string SplitReader::toString() const {
}

void SplitReader::createReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
std::shared_ptr<common::MetadataFilter> metadataFilter) {
VELOX_CHECK_NE(
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand Down Expand Up @@ -264,10 +262,6 @@ void SplitReader::createReader(
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
if (rowIndexColumn != nullptr) {
bool isExplicit = scanSpec_->childByName(rowIndexColumn->name()) != nullptr;
setRowIndexColumn(rowIndexColumn, isExplicit);
}
configureRowReaderOptions(
hiveTableHandle_->tableParameters(),
scanSpec_,
Expand Down Expand Up @@ -312,17 +306,6 @@ void SplitReader::createRowReader() {
baseRowReader_ = baseReader_->createRowReader(baseRowReaderOpts_);
}

void SplitReader::setRowIndexColumn(
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn,
bool isExplicit) {
dwio::common::RowNumberColumnInfo rowNumberColumnInfo;
rowNumberColumnInfo.insertPosition =
readerOutputType_->getChildIdx(rowIndexColumn->name());
rowNumberColumnInfo.name = rowIndexColumn->name();
rowNumberColumnInfo.isExplicit = isExplicit;
baseRowReaderOpts_.setRowNumberColumnInfo(std::move(rowNumberColumnInfo));
}

std::vector<TypePtr> SplitReader::adaptColumns(
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema) {
Expand Down Expand Up @@ -350,7 +333,8 @@ std::vector<TypePtr> SplitReader::adaptColumns(
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone());
childSpec->setConstantValue(constant);
} else if (!childSpec->isExplicitRowNumber()) {
} else if (
childSpec->columnType() == common::ScanSpec::ColumnType::kRegular) {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
Expand Down
11 changes: 2 additions & 9 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ class SplitReader {
/// would be called only once per incoming split
virtual void prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);
dwio::common::RuntimeStatistics& runtimeStats);

virtual uint64_t next(uint64_t size, VectorPtr& output);

Expand Down Expand Up @@ -114,9 +113,7 @@ class SplitReader {

/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
void createReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);
void createReader(std::shared_ptr<common::MetadataFilter> metadataFilter);

/// Check if the hiveSplit_ is empty. The split is considered empty when
/// 1) The data file is missing but the user chooses to ignore it
Expand All @@ -136,10 +133,6 @@ class SplitReader {
const RowTypePtr& fileType,
const std::shared_ptr<const velox::RowType>& tableSchema);

void setRowIndexColumn(
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn,
bool isExplicit);

void setPartitionValue(
common::ScanSpec* spec,
const std::string& partitionKey,
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class HiveColumnHandle : public ColumnHandle {
kSynthesized,
/// A zero-based row number of type BIGINT auto-generated by the connector.
/// Rows numbers are unique within a single file only.
kRowIndex
kRowIndex,
kRowId,
};

/// NOTE: 'dataType' is the column type in target write table. 'hiveType' is
Expand Down
5 changes: 2 additions & 3 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ IcebergSplitReader::IcebergSplitReader(

void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader(std::move(metadataFilter), rowIndexColumn);
dwio::common::RuntimeStatistics& runtimeStats) {
createReader(std::move(metadataFilter));

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
Expand Down
Loading

0 comments on commit b2cbd60

Please sign in to comment.