Skip to content

Commit

Permalink
finish the path part impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Nov 22, 2023
1 parent 8cc71ab commit a00c40e
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DatasetWriterTestFixture : public testing::Test {

void TearDown() override {
if (!test_done_with_tasks_.is_finished()) {
test_done_with_tasks_.MarkFinished();
test_done_with_tasks_.Wait();
ASSERT_FINISHES_OK(scheduler_finished_);
}
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/filesystem/mockfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ class MockFSOutputStream : public io::OutputStream {
return Status::OK();
}

Future<> CloseAsync() override {
return Future<>::MakeFinished(Close());
}

Status Abort() override {
if (!closed_) {
// MockFSOutputStream is mainly used for debugging and testing, so
Expand Down
63 changes: 47 additions & 16 deletions cpp/src/parquet/arrow/path_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ struct NullableTerminalNode {
// at least one other node).
//
// Type parameters:
// |RangeSelector| - A strategy for determine the the range of the child node to
// |RangeSelector| - A strategy for determine the range of the child node to
// process.
// this varies depending on the type of list (int32_t* offsets, int64_t* offsets of
// fixed.
Expand Down Expand Up @@ -450,6 +450,17 @@ struct FixedSizedRangeSelector {
int list_size;
};

template <typename OffsetType>
struct VarRangeViewSelector {
ElementRange GetRange(int64_t index) const {
return ElementRange{offsets[index], offsets[index] + sizes[index]};
}

// Either int32_t* or int64_t*.
const OffsetType* offsets;
const OffsetType* sizes;
};

// An intermediate node that handles null values.
class NullableNode {
public:
Expand Down Expand Up @@ -510,16 +521,18 @@ class NullableNode {

using ListNode = ListPathNode<VarRangeSelector<int32_t>>;
using LargeListNode = ListPathNode<VarRangeSelector<int64_t>>;
using ListViewNode = ListPathNode<VarRangeViewSelector<int32_t>>;
using LargeListViewNode = ListPathNode<VarRangeViewSelector<int64_t>>;
using FixedSizeListNode = ListPathNode<FixedSizedRangeSelector>;

// Contains static information derived from traversing the schema.
struct PathInfo {
// The vectors are expected to the same length info.

// Note index order matters here.
using Node =
std::variant<NullableTerminalNode, ListNode, LargeListNode, FixedSizeListNode,
NullableNode, AllPresentTerminalNode, AllNullsTerminalNode>;
using Node = std::variant<NullableTerminalNode, ListNode, LargeListNode, ListViewNode,
LargeListViewNode, FixedSizeListNode, NullableNode,
AllPresentTerminalNode, AllNullsTerminalNode>;

std::vector<Node> path;
std::shared_ptr<Array> primitive_array;
Expand Down Expand Up @@ -579,15 +592,21 @@ Status WritePath(ElementRange root_range, PathInfo* path_info,
IterationResult operator()(NullableNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(ListNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(NullableTerminalNode& node) {
return node.Run(*stack_position, context);
}
IterationResult operator()(ListNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(FixedSizeListNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(ListViewNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(LargeListViewNode& node) {
return node.Run(stack_position, stack_position + 1, context);
}
IterationResult operator()(AllPresentTerminalNode& node) {
return node.Run(*stack_position, context);
}
Expand Down Expand Up @@ -651,6 +670,8 @@ struct FixupVisitor {
void operator()(ListNode& node) { HandleListNode(node); }
void operator()(LargeListNode& node) { HandleListNode(node); }
void operator()(FixedSizeListNode& node) { HandleListNode(node); }
void operator()(ListViewNode& node) { HandleListNode(node); }
void operator()(LargeListViewNode& node) { HandleListNode(node); }

// For non-list intermediate nodes.
template <typename T>
Expand Down Expand Up @@ -724,19 +745,31 @@ class PathBuilder {

template <typename T>
::arrow::enable_if_t<std::is_same<::arrow::ListArray, T>::value ||
std::is_same<::arrow::LargeListArray, T>::value,
std::is_same<::arrow::LargeListArray, T>::value ||
std::is_same<::arrow::ListViewArray, T>::value ||
std::is_same<::arrow::LargeListViewArray, T>::value,
Status>
Visit(const T& array) {
MaybeAddNullable(array);
// Increment necessary due to empty lists.
info_.max_def_level++;
info_.max_rep_level++;
// raw_value_offsets() accounts for any slice offset.
ListPathNode<VarRangeSelector<typename T::offset_type>> node(
VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
info_.max_rep_level, info_.max_def_level - 1);
info_.path.emplace_back(std::move(node));
nullable_in_parent_ = array.list_type()->value_field()->nullable();
// raw_value_offsets() and raw_value_sizes() accounts for any slice offset/size.
if constexpr (std::is_same<::arrow::ListViewArray, T>::value ||
std::is_same<::arrow::LargeListViewArray, T>::value) {
ListPathNode<VarRangeViewSelector<typename T::offset_type>> node(
VarRangeViewSelector<typename T::offset_type>{array.raw_value_offsets(),
array.raw_value_sizes()},
info_.max_rep_level, info_.max_def_level - 1);
info_.path.emplace_back(std::move(node));
nullable_in_parent_ = array.list_view_type()->value_field()->nullable();
} else {
ListPathNode<VarRangeSelector<typename T::offset_type>> node(
VarRangeSelector<typename T::offset_type>{array.raw_value_offsets()},
info_.max_rep_level, info_.max_def_level - 1);
info_.path.emplace_back(std::move(node));
nullable_in_parent_ = array.list_type()->value_field()->nullable();
}
return VisitInline(*array.values());
}

Expand Down Expand Up @@ -830,8 +863,6 @@ class PathBuilder {
// Types not yet supported in Parquet.
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(RunEndEncoded);
NOT_IMPLEMENTED_VISIT(ListView);
NOT_IMPLEMENTED_VISIT(LargeListView);

#undef NOT_IMPLEMENTED_VISIT
std::vector<PathInfo>& paths() { return paths_; }
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/arrow/path_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,6 @@ TEST_F(MultipathLevelBuilderTest, TestPrimitiveNonNullable) {
EXPECT_THAT(results_[0].post_list_elements[0].end, Eq(4));
}

// TODO(mwish): testing ListView and LargeListView.

} // namespace parquet::arrow
2 changes: 2 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
const WriterProperties& properties,
const ArrowWriterProperties& arrow_properties, NodePtr* out);

// TODO(mwish): handle LIST_VIEW
Status ListToNode(const std::shared_ptr<::arrow::BaseListType>& type,
const std::string& name, bool nullable, int field_id,
const WriterProperties& properties,
Expand Down Expand Up @@ -839,6 +840,7 @@ Status GetOriginSchema(const std::shared_ptr<const KeyValueMetadata>& metadata,

Result<bool> ApplyOriginalMetadata(const Field& origin_field, SchemaField* inferred);

// TODO(mwish): handle {LARGE_}LIST_VIEW here.
std::function<std::shared_ptr<::arrow::DataType>(FieldVector)> GetNestedFactory(
const ArrowType& origin_type, const ArrowType& inferred_type) {
switch (inferred_type.id()) {
Expand Down

0 comments on commit a00c40e

Please sign in to comment.