Skip to content

Commit

Permalink
Support struct column reading with different schemas (5962)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Oct 17, 2023
1 parent c089656 commit 31dfb6d
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 29 deletions.
15 changes: 12 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,18 @@ std::vector<TypePtr> SplitReader::adaptColumns(
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
// If field name exists in the user-specified output type,
// set the column as null constant.
// Related PR: https://github.com/facebookincubator/velox/pull/6427.
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
if (outputTypeIdx.has_value()) {
setNullConstantValue(
childSpec, readerOutputType_->childAt(outputTypeIdx.value()));
} else {
// Column is missing. Most likely due to schema evolution.
VELOX_CHECK(tableSchema);
setNullConstantValue(childSpec, tableSchema->findChild(fieldName));
}
} else {
// Column no longer missing, reset constant value set on the spec.
childSpec->setConstantValue(nullptr);
Expand Down
8 changes: 3 additions & 5 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ void SelectiveStructColumnReaderBase::read(
activeRows = outputRows_;
}

auto& childSpecs = scanSpec_->children();
VELOX_CHECK(!childSpecs.empty());
auto& childSpecs = scanSpec_->stableChildren();
for (size_t i = 0; i < childSpecs.size(); ++i) {
auto& childSpec = childSpecs[i];
if (isChildConstant(*childSpec)) {
Expand Down Expand Up @@ -218,7 +217,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
fileType_->type()->kind() !=
TypeKind::MAP && // If this is the case it means this is a flat map,
// so it can't have "missing" fields.
childSpec.channel() >= fileType_->size());
!fileType_->containsChild(childSpec.fieldName()));
}

namespace {
Expand Down Expand Up @@ -298,7 +297,6 @@ void setNullField(vector_size_t size, VectorPtr& field) {
void SelectiveStructColumnReaderBase::getValues(
RowSet rows,
VectorPtr* result) {
VELOX_CHECK(!scanSpec_->children().empty());
VELOX_CHECK(
*result != nullptr,
"SelectiveStructColumnReaderBase expects a non-null result");
Expand Down Expand Up @@ -335,7 +333,7 @@ void SelectiveStructColumnReaderBase::getValues(
resultRow->clearNulls(0, rows.size());
}
bool lazyPrepared = false;
for (auto& childSpec : scanSpec_->children()) {
for (auto& childSpec : scanSpec_->stableChildren()) {
if (!childSpec->projectOut()) {
continue;
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/common/TypeWithId.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {

const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;

bool containsChild(const std::string& name) const {
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
return type_->as<velox::TypeKind::ROW>().containsChild(name);
}

const std::shared_ptr<const TypeWithId>& childByName(
const std::string& name) const {
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
Expand Down
9 changes: 5 additions & 4 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec) {
common::ScanSpec& scanSpec,
memory::MemoryPool& pool) {
auto colName = scanSpec.fieldName();

switch (dataType->type()->kind()) {
Expand All @@ -59,19 +60,19 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(

case TypeKind::ROW:
return std::make_unique<StructColumnReader>(
requestedType, dataType, params, scanSpec);
requestedType, dataType, params, scanSpec, pool);

case TypeKind::VARBINARY:
case TypeKind::VARCHAR:
return std::make_unique<StringColumnReader>(dataType, params, scanSpec);

case TypeKind::ARRAY:
return std::make_unique<ListColumnReader>(
requestedType, dataType, params, scanSpec);
requestedType, dataType, params, scanSpec, pool);

case TypeKind::MAP:
return std::make_unique<MapColumnReader>(
requestedType, dataType, params, scanSpec);
requestedType, dataType, params, scanSpec, pool);

case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/ParquetColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ParquetColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);
};
} // namespace facebook::velox::parquet
40 changes: 38 additions & 2 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ class ReaderBase {
/// the data still exists in the buffered inputs.
bool isRowGroupBuffered(int32_t rowGroupIndex) const;

static std::shared_ptr<const dwio::common::TypeWithId> createTypeWithId(
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
const RowTypePtr& rowTypePtr,
bool fileColumnNamesReadAsLowerCase);

private:
// Reads and parses file footer.
void loadFileMetaData();
Expand Down Expand Up @@ -563,6 +568,33 @@ std::shared_ptr<const RowType> ReaderBase::createRowType(
std::move(childNames), std::move(childTypes));
}

std::shared_ptr<const dwio::common::TypeWithId> ReaderBase::createTypeWithId(
const std::shared_ptr<const dwio::common::TypeWithId>& inputType,
const RowTypePtr& rowTypePtr,
bool fileColumnNamesReadAsLowerCase) {
if (!fileColumnNamesReadAsLowerCase) {
return inputType;
}
std::vector<std::string> names;
names.reserve(rowTypePtr->names().size());
std::vector<TypePtr> types = rowTypePtr->children();
for (const auto& name : rowTypePtr->names()) {
std::string childName = name;
folly::toLowerAscii(childName);
names.emplace_back(childName);
}
auto convertedType =
TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));

auto children = inputType->getChildren();
return std::make_shared<const dwio::common::TypeWithId>(
convertedType,
std::move(children),
inputType->id(),
inputType->maxId(),
inputType->column());
}

void ReaderBase::scheduleRowGroups(
const std::vector<uint32_t>& rowGroupIds,
int32_t currentGroup,
Expand Down Expand Up @@ -639,10 +671,14 @@ ParquetRowReader::ParquetRowReader(
ParquetParams params(pool_, columnReaderStats_, readerBase_->fileMetaData());
auto columnSelector = options_.getSelector();
columnReader_ = ParquetColumnReader::build(
columnSelector->getSchemaWithId(),
ReaderBase::createTypeWithId(
columnSelector->getSchemaWithId(),
asRowType(options_.getSelector()->getSchemaWithId()->type()),
readerBase_->isFileColumnNamesReadAsLowerCase()),
readerBase_->schemaWithId(), // Id is schema id
params,
*options_.getScanSpec());
*options_.getScanSpec(),
pool_);

filterRowGroups();
if (!rowGroupIds_.empty()) {
Expand Down
23 changes: 18 additions & 5 deletions velox/dwio/parquet/reader/RepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs(
return nullptr;
}
auto pageReader = reader->formatData().as<ParquetData>().reader();
if (pageReader == nullptr) {
return nullptr;
}
pageReader->decodeRepDefs(numTop);
return pageReader;
}
Expand Down Expand Up @@ -111,7 +114,8 @@ MapColumnReader::MapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: dwio::common::SelectiveMapColumnReader(
requestedType,
dataType,
Expand All @@ -121,9 +125,17 @@ MapColumnReader::MapColumnReader(
auto& keyChildType = requestedType->childAt(0);
auto& elementChildType = requestedType->childAt(1);
keyReader_ = ParquetColumnReader::build(
keyChildType, fileType_->childAt(0), params, *scanSpec.children()[0]);
keyChildType,
fileType_->childAt(0),
params,
*scanSpec.children()[0],
pool);
elementReader_ = ParquetColumnReader::build(
elementChildType, fileType_->childAt(1), params, *scanSpec.children()[1]);
elementChildType,
fileType_->childAt(1),
params,
*scanSpec.children()[1],
pool);
reinterpret_cast<const ParquetTypeWithId*>(dataType.get())
->makeLevelInfo(levelInfo_);
children_ = {keyReader_.get(), elementReader_.get()};
Expand Down Expand Up @@ -221,15 +233,16 @@ ListColumnReader::ListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: dwio::common::SelectiveListColumnReader(
requestedType,
dataType,
params,
scanSpec) {
auto& childType = requestedType->childAt(0);
child_ = ParquetColumnReader::build(
childType, fileType_->childAt(0), params, *scanSpec.children()[0]);
childType, fileType_->childAt(0), params, *scanSpec.children()[0], pool);
reinterpret_cast<const ParquetTypeWithId*>(dataType.get())
->makeLevelInfo(levelInfo_);
children_ = {child_.get()};
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/parquet/reader/RepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void prepareRead(
vector_size_t offset,
Expand Down Expand Up @@ -115,7 +116,8 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void prepareRead(
vector_size_t offset,
Expand Down
40 changes: 34 additions & 6 deletions velox/dwio/parquet/reader/StructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,46 @@ StructColumnReader::StructColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec)
common::ScanSpec& scanSpec,
memory::MemoryPool& pool)
: SelectiveStructColumnReader(requestedType, dataType, params, scanSpec) {
auto& childSpecs = scanSpec_->stableChildren();
std::vector<int> missingFields;
for (auto i = 0; i < childSpecs.size(); ++i) {
auto childSpec = childSpecs[i];
if (childSpecs[i]->isConstant()) {
continue;
}
auto childDataType = fileType_->childByName(childSpec->fieldName());
auto childRequestedType =
requestedType_->childByName(childSpec->fieldName());
const auto& fieldName = childSpecs[i]->fieldName();
if (!fileType_->containsChild(fieldName)) {
missingFields.emplace_back(i);
continue;
}
auto childDataType = fileType_->childByName(fieldName);
auto childRequestedType = requestedType_->childByName(fieldName);
addChild(ParquetColumnReader::build(
childRequestedType, childDataType, params, *childSpec));
childRequestedType, childDataType, params, *childSpec, pool));
childSpecs[i]->setSubscript(children_.size() - 1);
}

if (missingFields.size() > 0) {
// Set the struct as null if all the children fields in the output type are
// missing and the number of child fields is more than one.
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
scanSpec_->setConstantValue(
BaseVector::createNullConstant(requestedType_->type(), 1, &pool));
} else {
// Set null constant for the missing child field of output type.
for (int channel : missingFields) {
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
requestedType_->childByName(childSpecs[channel]->fieldName())
->type(),
1,
&pool));
}
}
}

auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
if (type->parent()) {
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
Expand All @@ -47,7 +72,10 @@ StructColumnReader::StructColumnReader(
// this and the child.
auto child = childForRepDefs_;
for (;;) {
assert(child);
if (child == nullptr) {
levelMode_ = LevelMode::kNulls;
break;
}
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
child->fileType().type()->kind() == TypeKind::MAP) {
levelMode_ = LevelMode::kStructOverLists;
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/parquet/reader/StructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
ParquetParams& params,
common::ScanSpec& scanSpec);
common::ScanSpec& scanSpec,
memory::MemoryPool& pool);

void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
override;
Expand Down
Binary file not shown.
Loading

0 comments on commit 31dfb6d

Please sign in to comment.