Skip to content

Commit

Permalink
add interop test
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Oct 24, 2024
1 parent 4c84446 commit f70ae79
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 53 deletions.
123 changes: 71 additions & 52 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4094,59 +4094,78 @@ TEST(TestArrowReaderAdHoc, OldDataPageV2) {
}

TEST(TestArrowReaderAdHoc, LegacyTwoLevelList) {
// Create schema with a nested list of two-level encoding:
constexpr std::string_view kExpectedSchema =
"required group field_id=-1 schema {\n"
" optional group field_id=-1 nested_list (List) {\n"
" repeated group field_id=-1 array (List) {\n"
" repeated int32 field_id=-1 array;\n"
" }\n"
" }\n"
"}\n";
auto inner_element = PrimitiveNode::Make("array", Repetition::REPEATED, Type::INT32);
auto outer_element = GroupNode::Make("array", Repetition::REPEATED, {inner_element},
ConvertedType::LIST);
auto nested_list = GroupNode::Make("nested_list", Repetition::OPTIONAL, {outer_element},
ConvertedType::LIST);
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, {nested_list});

// Create a Parquet writer to write values of nested list
auto sink = CreateOutputStream();
auto file_writer = ParquetFileWriter::Open(
sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_node));
auto row_group_writer = file_writer->AppendRowGroup();
auto int_writer = dynamic_cast<Int32Writer*>(row_group_writer->NextColumn());
ASSERT_TRUE(int_writer != nullptr);

// Directly write a single row of nested list: [[1, 2],[3, 4]]
constexpr int64_t kNumValues = 4;
std::array<int16_t, kNumValues> rep_levels = {0, 2, 1, 2};
std::array<int16_t, kNumValues> def_levels = {3, 3, 3, 3};
std::array<int32_t, kNumValues> values = {1, 2, 3, 4};
int_writer->WriteBatch(kNumValues, def_levels.data(), rep_levels.data(), values.data());
file_writer->Close();

// Read schema and verify it applies two-level encoding of list type
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
ASSERT_EQ(kExpectedSchema, file_reader->metadata()->schema()->ToString());
auto VerifyData = [](std::unique_ptr<ParquetFileReader> file_reader) {
// Expected Parquet schema of legacy two-level encoding
constexpr std::string_view kExpectedLegacyList =
"required group field_id=-1 a (List) {\n"
" repeated group field_id=-1 array (List) {\n"
" repeated int32 field_id=-1 array;\n"
" }\n"
"}\n";

// Expected Arrow schema and data
auto arrow_inner_list =
field("array", list(field("array", ::arrow::int32(), /*nullable=*/false)),
/*nullable=*/false);
auto arrow_outer_list = list(arrow_inner_list);
auto arrow_schema =
::arrow::schema({field("a", arrow_outer_list, /*nullable=*/false)});
auto expected_table = TableFromJSON(arrow_schema, {R"([[[[1,2],[3,4]]]])"});

// Verify Parquet schema
auto root_group = file_reader->metadata()->schema()->group_node();
ASSERT_EQ(1, root_group->field_count());
std::stringstream nodeStr;
PrintSchema(root_group->field(0).get(), nodeStr);
ASSERT_EQ(kExpectedLegacyList, nodeStr.str());

// Verify Arrow schema and data
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
FileReader::Make(default_memory_pool(), std::move(file_reader), &reader));
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(&table));
AssertTablesEqual(*expected_table, *table);
};

// Read and verify data
std::unique_ptr<FileReader> reader;
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(file_reader), &reader));
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(&table));

auto arrow_inner_element =
::arrow::field("array", ::arrow::int32(), /*nullable=*/false);
auto arrow_outer_element =
::arrow::field("array", ::arrow::list(arrow_inner_element), /*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_outer_element);
auto arrow_schema =
::arrow::schema({::arrow::field("nested_list", arrow_list, /*nullable=*/true)});
auto expected_table = ::arrow::TableFromJSON(arrow_schema, {R"([[[[1,2],[3,4]]]])"});
::arrow::AssertTablesEqual(*expected_table, *table);
// Round-trip test for Parquet C++ reader and writer
{
// Create Parquet schema of legacy two-level encoding
auto inner_list = GroupNode::Make("array", Repetition::REPEATED,
{schema::Int32("array", Repetition::REPEATED)},
LogicalType::List());
auto outer_list =
GroupNode::Make("a", Repetition::REQUIRED, {inner_list}, LogicalType::List());
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, {outer_list});

// Create a Parquet writer to write values of nested list
auto sink = CreateOutputStream();
auto file_writer =
ParquetFileWriter::Open(sink, std::dynamic_pointer_cast<GroupNode>(schema_node));
auto row_group_writer = file_writer->AppendRowGroup();
auto int_writer = dynamic_cast<Int32Writer*>(row_group_writer->NextColumn());
ASSERT_TRUE(int_writer != nullptr);

// Directly write a single row of nested list: [[1, 2],[3, 4]]
constexpr int64_t kNumValues = 4;
constexpr std::array<int16_t, kNumValues> kRepLevels = {0, 2, 1, 2};
constexpr std::array<int16_t, kNumValues> kDefLevels = {2, 2, 2, 2};
constexpr std::array<int32_t, kNumValues> kValues = {1, 2, 3, 4};
int_writer->WriteBatch(kNumValues, kDefLevels.data(), kRepLevels.data(),
kValues.data());
file_writer->Close();
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Read schema and verify it applies two-level encoding of list type
ASSERT_NO_FATAL_FAILURE(
VerifyData(ParquetFileReader::Open(std::make_shared<BufferReader>(buffer))));
}

// Interoperability test for Parquet file generated by parquet-java
{
auto path = std::string(test::get_data_dir()) + "/old_list_structure.parquet";
ASSERT_NO_FATAL_FAILURE(VerifyData(ParquetFileReader::OpenFile(path)));
}
}

class TestArrowReaderAdHocSparkAndHvr
Expand Down

0 comments on commit f70ae79

Please sign in to comment.