From a00c40e9eb7019e1f9337da0d53729497ed6a9a0 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 23 Nov 2023 01:17:27 +0800 Subject: [PATCH] finish the path part impl --- cpp/src/arrow/dataset/dataset_writer_test.cc | 2 +- cpp/src/arrow/filesystem/mockfs.cc | 4 ++ cpp/src/parquet/arrow/path_internal.cc | 63 +++++++++++++++----- cpp/src/parquet/arrow/path_internal_test.cc | 2 + cpp/src/parquet/arrow/schema.cc | 2 + 5 files changed, 56 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index c76e79d79b449..5b1cffa9211f4 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -96,7 +96,7 @@ class DatasetWriterTestFixture : public testing::Test { void TearDown() override { if (!test_done_with_tasks_.is_finished()) { - test_done_with_tasks_.MarkFinished(); + test_done_with_tasks_.Wait(); ASSERT_FINISHES_OK(scheduler_finished_); } } diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc index 8eff8ecc2f887..a254ae0a288cb 100644 --- a/cpp/src/arrow/filesystem/mockfs.cc +++ b/cpp/src/arrow/filesystem/mockfs.cc @@ -206,6 +206,10 @@ class MockFSOutputStream : public io::OutputStream { return Status::OK(); } + Future<> CloseAsync() override { + return Future<>::MakeFinished(Close()); + } + Status Abort() override { if (!closed_) { // MockFSOutputStream is mainly used for debugging and testing, so diff --git a/cpp/src/parquet/arrow/path_internal.cc b/cpp/src/parquet/arrow/path_internal.cc index 2d20403eac075..2e04069829afd 100644 --- a/cpp/src/parquet/arrow/path_internal.cc +++ b/cpp/src/parquet/arrow/path_internal.cc @@ -312,7 +312,7 @@ struct NullableTerminalNode { // at least one other node). // // Type parameters: -// |RangeSelector| - A strategy for determine the the range of the child node to +// |RangeSelector| - A strategy for determine the range of the child node to // process. // this varies depending on the type of list (int32_t* offsets, int64_t* offsets of // fixed. @@ -450,6 +450,17 @@ struct FixedSizedRangeSelector { int list_size; }; +template +struct VarRangeViewSelector { + ElementRange GetRange(int64_t index) const { + return ElementRange{offsets[index], offsets[index] + sizes[index]}; + } + + // Either int32_t* or int64_t*. + const OffsetType* offsets; + const OffsetType* sizes; +}; + // An intermediate node that handles null values. class NullableNode { public: @@ -510,6 +521,8 @@ class NullableNode { using ListNode = ListPathNode>; using LargeListNode = ListPathNode>; +using ListViewNode = ListPathNode>; +using LargeListViewNode = ListPathNode>; using FixedSizeListNode = ListPathNode; // Contains static information derived from traversing the schema. @@ -517,9 +530,9 @@ struct PathInfo { // The vectors are expected to the same length info. // Note index order matters here. - using Node = - std::variant; + using Node = std::variant; std::vector path; std::shared_ptr primitive_array; @@ -579,15 +592,21 @@ Status WritePath(ElementRange root_range, PathInfo* path_info, IterationResult operator()(NullableNode& node) { return node.Run(stack_position, stack_position + 1, context); } - IterationResult operator()(ListNode& node) { - return node.Run(stack_position, stack_position + 1, context); - } IterationResult operator()(NullableTerminalNode& node) { return node.Run(*stack_position, context); } + IterationResult operator()(ListNode& node) { + return node.Run(stack_position, stack_position + 1, context); + } IterationResult operator()(FixedSizeListNode& node) { return node.Run(stack_position, stack_position + 1, context); } + IterationResult operator()(ListViewNode& node) { + return node.Run(stack_position, stack_position + 1, context); + } + IterationResult operator()(LargeListViewNode& node) { + return node.Run(stack_position, stack_position + 1, context); + } IterationResult operator()(AllPresentTerminalNode& node) { return node.Run(*stack_position, context); } @@ -651,6 +670,8 @@ struct FixupVisitor { void operator()(ListNode& node) { HandleListNode(node); } void operator()(LargeListNode& node) { HandleListNode(node); } void operator()(FixedSizeListNode& node) { HandleListNode(node); } + void operator()(ListViewNode& node) { HandleListNode(node); } + void operator()(LargeListViewNode& node) { HandleListNode(node); } // For non-list intermediate nodes. template @@ -724,19 +745,31 @@ class PathBuilder { template ::arrow::enable_if_t::value || - std::is_same<::arrow::LargeListArray, T>::value, + std::is_same<::arrow::LargeListArray, T>::value || + std::is_same<::arrow::ListViewArray, T>::value || + std::is_same<::arrow::LargeListViewArray, T>::value, Status> Visit(const T& array) { MaybeAddNullable(array); // Increment necessary due to empty lists. info_.max_def_level++; info_.max_rep_level++; - // raw_value_offsets() accounts for any slice offset. - ListPathNode> node( - VarRangeSelector{array.raw_value_offsets()}, - info_.max_rep_level, info_.max_def_level - 1); - info_.path.emplace_back(std::move(node)); - nullable_in_parent_ = array.list_type()->value_field()->nullable(); + // raw_value_offsets() and raw_value_sizes() accounts for any slice offset/size. + if constexpr (std::is_same<::arrow::ListViewArray, T>::value || + std::is_same<::arrow::LargeListViewArray, T>::value) { + ListPathNode> node( + VarRangeViewSelector{array.raw_value_offsets(), + array.raw_value_sizes()}, + info_.max_rep_level, info_.max_def_level - 1); + info_.path.emplace_back(std::move(node)); + nullable_in_parent_ = array.list_view_type()->value_field()->nullable(); + } else { + ListPathNode> node( + VarRangeSelector{array.raw_value_offsets()}, + info_.max_rep_level, info_.max_def_level - 1); + info_.path.emplace_back(std::move(node)); + nullable_in_parent_ = array.list_type()->value_field()->nullable(); + } return VisitInline(*array.values()); } @@ -830,8 +863,6 @@ class PathBuilder { // Types not yet supported in Parquet. NOT_IMPLEMENTED_VISIT(Union) NOT_IMPLEMENTED_VISIT(RunEndEncoded); - NOT_IMPLEMENTED_VISIT(ListView); - NOT_IMPLEMENTED_VISIT(LargeListView); #undef NOT_IMPLEMENTED_VISIT std::vector& paths() { return paths_; } diff --git a/cpp/src/parquet/arrow/path_internal_test.cc b/cpp/src/parquet/arrow/path_internal_test.cc index fb9c404247f3b..7d866adae4e68 100644 --- a/cpp/src/parquet/arrow/path_internal_test.cc +++ b/cpp/src/parquet/arrow/path_internal_test.cc @@ -643,4 +643,6 @@ TEST_F(MultipathLevelBuilderTest, TestPrimitiveNonNullable) { EXPECT_THAT(results_[0].post_list_elements[0].end, Eq(4)); } +// TODO(mwish): testing ListView and LargeListView. + } // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index f5484f131eb07..45e4da44efed3 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -78,6 +78,7 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, const WriterProperties& properties, const ArrowWriterProperties& arrow_properties, NodePtr* out); +// TODO(mwish): handle LIST_VIEW Status ListToNode(const std::shared_ptr<::arrow::BaseListType>& type, const std::string& name, bool nullable, int field_id, const WriterProperties& properties, @@ -839,6 +840,7 @@ Status GetOriginSchema(const std::shared_ptr& metadata, Result ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred); +// TODO(mwish): handle {LARGE_}LIST_VIEW here. std::function(FieldVector)> GetNestedFactory( const ArrowType& origin_type, const ArrowType& inferred_type) { switch (inferred_type.id()) {