Skip to content

Commit

Permalink
Fix parquet complex type handling (#9187)
Browse files Browse the repository at this point in the history
Summary:
Fixes #7776

Parquet has notion of optional and repeated layers which is needed in arrow calls like [DefLevelsToBitmap](https://github.com/facebookincubator/velox/blob/7fc09667d5e22c684fdeff81da529b79cc974fee/velox/dwio/parquet/reader/PageReader.cpp#L573).

This info is passed using arrow:LevelInfo. We were incorrectly computing **repeatedAncestor** by ignoring optional fields which is fixed in this PR.

Parquet has 3 level structure for nested types
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
```
// List<String> (list non-null, elements nullable)
1. required group my_list (LIST) {
2.   repeated group list {
3.    optional binary element (UTF8);
  }
}
```

However when we read this and convert to **ParquetTypeWithId** in current velox parquet reader, we ignore the intermediated layer 2.  **repeated group list** (grandfather logic) in https://github.com/facebookincubator/velox/pull/9187/files#diff-64787e76c1b0ad12b5764770a94acd62054896a762ccead8f083a71a060f2f44R325.

Pull Request resolved: #9187

Reviewed By: mbasmanova

Differential Revision: D55975472

Pulled By: Yuhta

fbshipit-source-id: d0972b3134cc710645a9f50cd74a23efac830751
  • Loading branch information
jaystarshot authored and facebook-github-bot committed Apr 29, 2024
1 parent a1706c3 commit 37f4700
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 30 deletions.
44 changes: 37 additions & 7 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
auto& schema = fileMetaData_->schema;
uint32_t curSchemaIdx = schemaIdx;
auto& schemaElement = schema[curSchemaIdx];
bool isRepeated = false;
bool isOptional = false;

if (schemaElement.__isset.repetition_type) {
if (schemaElement.repetition_type !=
Expand All @@ -244,6 +246,11 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
if (schemaElement.repetition_type ==
thrift::FieldRepetitionType::REPEATED) {
maxRepeat++;
isRepeated = true;
}
if (schemaElement.repetition_type ==
thrift::FieldRepetitionType::OPTIONAL) {
isOptional = true;
}
}

Expand Down Expand Up @@ -300,7 +307,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}

// For backward-compatibility, a group annotated with MAP_KEY_VALUE
Expand All @@ -313,6 +322,12 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
VELOX_CHECK_EQ(children.size(), 1);
const auto& child = children[0];
auto type = child->type();
isRepeated = true;
// This level will not have the "isRepeated" info in the parquet
// schema since parquet schema will have a child layer which will have
// the "repeated info" which we are ignoring here, hence we set the
// isRepeated to true eg
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
return std::make_unique<ParquetTypeWithId>(
std::move(type),
std::move(*(ParquetTypeWithId*)child.get()).moveChildren(),
Expand All @@ -323,7 +338,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat + 1,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}

default:
Expand Down Expand Up @@ -354,7 +371,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
} else if (
schema[parentSchemaIdx].converted_type ==
thrift::ConvertedType::MAP ||
Expand All @@ -374,7 +393,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}
} else {
// Row type
Expand All @@ -389,7 +410,9 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine);
maxDefine,
isOptional,
isRepeated);
}
}
} else { // leaf node
Expand All @@ -415,6 +438,8 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
logicalType_,
maxRepeat,
maxDefine,
isOptional,
isRepeated,
precision,
scale,
type_length);
Expand All @@ -430,12 +455,14 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
std::move(children),
curSchemaIdx,
maxSchemaElementIdx,
columnIdx++,
columnIdx - 1, // was already incremented for leafTypePtr
std::move(name),
std::nullopt,
std::nullopt,
maxRepeat,
maxDefine - 1);
maxDefine - 1,
isOptional,
isRepeated);
}
return leafTypePtr;
}
Expand Down Expand Up @@ -631,6 +658,9 @@ int64_t ReaderBase::rowGroupUncompressedSize(
int32_t rowGroupIndex,
const dwio::common::TypeWithId& type) const {
if (type.column() != ParquetTypeWithId::kNonLeaf) {
VELOX_CHECK_LT(rowGroupIndex, fileMetaData_->row_groups.size());
VELOX_CHECK_LT(
type.column(), fileMetaData_->row_groups[rowGroupIndex].columns.size());
return fileMetaData_->row_groups[rowGroupIndex]
.columns[type.column()]
.meta_data.total_uncompressed_size;
Expand Down
19 changes: 11 additions & 8 deletions velox/dwio/parquet/reader/ParquetTypeWithId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ ParquetTypeWithId::moveChildren() && {
auto logicalType = parquetChild->logicalType_;
auto maxRepeat = parquetChild->maxRepeat_;
auto maxDefine = parquetChild->maxDefine_;
auto isOptional = parquetChild->isOptional_;
auto isRepeated = parquetChild->isRepeated_;
auto precision = parquetChild->precision_;
auto scale = parquetChild->scale_;
auto typeLength = parquetChild->typeLength_;
Expand All @@ -62,6 +64,8 @@ ParquetTypeWithId::moveChildren() && {
std::move(logicalType),
maxRepeat,
maxDefine,
isOptional,
isRepeated,
precision,
scale,
typeLength));
Expand All @@ -86,15 +90,14 @@ bool ParquetTypeWithId::hasNonRepeatedLeaf() const {
}

LevelMode ParquetTypeWithId::makeLevelInfo(LevelInfo& info) const {
int16_t repeatedAncestor = 0;
for (auto parent = parquetParent(); parent;
parent = parent->parquetParent()) {
if (parent->type()->kind() == TypeKind::ARRAY ||
parent->type()->kind() == TypeKind::MAP) {
repeatedAncestor = parent->maxDefine_;
break;
int repeatedAncestor = maxDefine_;
auto node = this;
do {
if (node->isOptional_) {
repeatedAncestor--;
}
}
node = node->parquetParent();
} while (node && !node->isRepeated_);
bool isList = type()->kind() == TypeKind::ARRAY;
bool isStruct = type()->kind() == TypeKind::ROW;
bool isMap = type()->kind() == TypeKind::MAP;
Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/parquet/reader/ParquetTypeWithId.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
std::optional<thrift::LogicalType> logicalType,
uint32_t maxRepeat,
uint32_t maxDefine,
bool isOptional,
bool isRepeated,
int32_t precision = 0,
int32_t scale = 0,
int32_t typeLength = 0)
Expand All @@ -54,6 +56,8 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
logicalType_(std::move(logicalType)),
maxRepeat_(maxRepeat),
maxDefine_(maxDefine),
isOptional_(isOptional),
isRepeated_(isRepeated),
precision_(precision),
scale_(scale),
typeLength_(typeLength) {}
Expand Down Expand Up @@ -81,6 +85,8 @@ class ParquetTypeWithId : public dwio::common::TypeWithId {
const std::optional<thrift::LogicalType> logicalType_;
const uint32_t maxRepeat_;
const uint32_t maxDefine_;
const bool isOptional_;
const bool isRepeated_;
const int32_t precision_;
const int32_t scale_;
const int32_t typeLength_;
Expand Down
Binary file not shown.
56 changes: 41 additions & 15 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
createDuckDbTable({data});
}

void loadDataWithRowType(const std::string& filePath, RowVectorPtr data) {
splits_ = {makeSplit(filePath)};
auto pool = facebook::velox::memory::memoryManager()->addLeafPool();
dwio::common::ReaderOptions readerOpts{pool.get()};
auto reader = std::make_unique<ParquetReader>(
std::make_unique<facebook::velox::dwio::common::BufferedInput>(
std::make_shared<LocalReadFile>(filePath),
readerOpts.getMemoryPool()),
readerOpts);
rowType_ = reader->rowType();
createDuckDbTable({data});
}

std::string getExampleFilePath(const std::string& fileName) {
return facebook::velox::test::getDataFilePath(
"velox/dwio/parquet/tests/reader", "../examples/" + fileName);
Expand Down Expand Up @@ -303,9 +316,8 @@ TEST_F(ParquetTableScanTest, singleRowStruct) {
}

// Core dump and incorrect result are fixed.
TEST_F(ParquetTableScanTest, DISABLED_array) {
auto vector = makeArrayVector<int32_t>({{1, 2, 3}});

TEST_F(ParquetTableScanTest, array) {
auto vector = makeArrayVector<int32_t>({});
loadData(
getExampleFilePath("old_repeated_int.parquet"),
ROW({"repeatedInt"}, {ARRAY(INTEGER())}),
Expand All @@ -316,12 +328,11 @@ TEST_F(ParquetTableScanTest, DISABLED_array) {
}));

assertSelectWithFilter(
{"repeatedInt"}, {}, "", "SELECT repeatedInt FROM tmp");
{"repeatedInt"}, {}, "", "SELECT UNNEST(array[array[1,2,3]])");
}

// Optional array with required elements.
// Incorrect result.
TEST_F(ParquetTableScanTest, DISABLED_optArrayReqEle) {
TEST_F(ParquetTableScanTest, optArrayReqEle) {
auto vector = makeArrayVector<StringView>({});

loadData(
Expand All @@ -341,8 +352,7 @@ TEST_F(ParquetTableScanTest, DISABLED_optArrayReqEle) {
}

// Required array with required elements.
// Core dump is fixed, but the result is incorrect.
TEST_F(ParquetTableScanTest, DISABLED_reqArrayReqEle) {
TEST_F(ParquetTableScanTest, reqArrayReqEle) {
auto vector = makeArrayVector<StringView>({});

loadData(
Expand All @@ -362,8 +372,7 @@ TEST_F(ParquetTableScanTest, DISABLED_reqArrayReqEle) {
}

// Required array with optional elements.
// Incorrect result.
TEST_F(ParquetTableScanTest, DISABLED_reqArrayOptEle) {
TEST_F(ParquetTableScanTest, reqArrayOptEle) {
auto vector = makeArrayVector<StringView>({});

loadData(
Expand All @@ -382,22 +391,39 @@ TEST_F(ParquetTableScanTest, DISABLED_reqArrayOptEle) {
"SELECT UNNEST(array[array['a', null], array[], array[null, 'b']])");
}

TEST_F(ParquetTableScanTest, arrayOfArrayTest) {
auto vector = makeArrayVector<StringView>({});

loadDataWithRowType(
getExampleFilePath("array_of_array1.parquet"),
makeRowVector(
{"_1"},
{
vector,
}));

assertSelectWithFilter(
{"_1"},
{},
"",
"SELECT UNNEST(array[null, array[array['g', 'h'], null]])");
}

// Required array with legacy format.
// Incorrect result.
TEST_F(ParquetTableScanTest, DISABLED_reqArrayLegacy) {
TEST_F(ParquetTableScanTest, reqArrayLegacy) {
auto vector = makeArrayVector<StringView>({});

loadData(
getExampleFilePath("array_3.parquet"),
ROW({"_1"}, {ARRAY(VARCHAR())}),
ROW({"element"}, {ARRAY(VARCHAR())}),
makeRowVector(
{"_1"},
{"element"},
{
vector,
}));

assertSelectWithFilter(
{"_1"},
{"element"},
{},
"",
"SELECT UNNEST(array[array['a', 'b'], array[], array['c', 'd']])");
Expand Down

0 comments on commit 37f4700

Please sign in to comment.