From df98801383ce0a770ef1fa9b1056faa4c976d8cf Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 18 Oct 2024 13:38:56 +0200 Subject: [PATCH] Fix linting, improved comments --- cpp/src/arrow/acero/options.h | 5 +++- cpp/src/arrow/acero/source_node.cc | 3 ++- cpp/src/arrow/dataset/file_base.cc | 12 ++++++--- cpp/src/arrow/dataset/file_test.cc | 13 +++++---- cpp/src/arrow/dataset/scanner.cc | 4 ++- cpp/src/arrow/dataset/write_node_test.cc | 34 +++++++++++++++--------- 6 files changed, 46 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index 223c333bbcf0c..2eb7df0085155 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -95,12 +95,15 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions { SourceNodeOptions(std::shared_ptr output_schema, std::function>()> generator, Ordering ordering = Ordering::Unordered()) - : output_schema(std::move(output_schema)), generator(std::move(generator)), ordering(std::move(ordering)) {} + : output_schema(std::move(output_schema)), + generator(std::move(generator)), + ordering(std::move(ordering)) {} /// \brief the schema for batches that will be generated by this source std::shared_ptr output_schema; /// \brief an asynchronous stream of batches ending with std::nullopt std::function>()> generator; + /// \brief the order of the data, defaults to Ordering::Unordered Ordering ordering; }; diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index d4f0bb3c43f26..2d3e2a1da1735 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode")); const auto& source_options = checked_cast(options); return plan->EmplaceNode(plan, source_options.output_schema, - source_options.generator, source_options.ordering); + source_options.generator, + source_options.ordering); } const char* kind_name() const override { return "SourceNode"; } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 931cb1ed1e35c..055d8c745fd55 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -475,7 +475,8 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio bool implicit_ordering = write_node_options.write_options.preserve_order; acero::Declaration plan = acero::Declaration::Sequence({ - {"scan", ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false, implicit_ordering}}, + {"scan", ScanNodeOptions{dataset, scanner->options(), + /*require_sequenced_output=*/false, implicit_ordering}}, {"filter", acero::FilterNodeOptions{scanner->options()->filter}}, {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}}, {"write", std::move(write_node_options)}, @@ -541,10 +542,13 @@ Result MakeWriteNode(acero::ExecPlan* plan, ARROW_ASSIGN_OR_RAISE( auto node, - // sequence the exec batches to preserve order + // to preserve order explicitly sequence the exec batches + // this requires exec batch index to be set upstream (e.g. by SourceNode) acero::MakeExecNode("consuming_sink", plan, std::move(inputs), - acero::ConsumingSinkNodeOptions{std::move(consumer), {}, - /*sequence_output=*/write_options.preserve_order})); + acero::ConsumingSinkNodeOptions{ + std::move(consumer), + {}, + /*sequence_output=*/write_options.preserve_order})); return node; } diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 8ee45b2246b8b..af40ce52d2cf0 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -390,16 +390,19 @@ TEST_F(TestFileSystemDataset, WritePersistOrder) { while (batch != nullptr) { for (int row = 0; row < batch->num_rows(); ++row) { auto scalar = batch->column(0)->GetScalar(row).ValueOrDie(); - auto numeric_scalar = std::static_pointer_cast>(scalar); + auto numeric_scalar = + std::static_pointer_cast>(scalar); int32_t value = numeric_scalar->value; - if (value <= prev) { out_of_order = true; } + if (value <= prev) { + out_of_order = true; + } prev = value; } ABORT_NOT_OK(reader.ReadNext(&batch)); } - // TODO: this currently fails because out-of-order batches cannot be reproduced with this test - // how can we guarantee that table written with preserve_order = false is out of order? - // Other than SimpleWriteNodeTest.SequenceOutput in write_node_test.cc + // TODO: this currently fails because out-of-order batches cannot be reproduced with + // this test how can we guarantee that table written with preserve_order = false is + // out of order? Other than SimpleWriteNodeTest.SequenceOutput in write_node_test.cc ASSERT_EQ(out_of_order, !preserve_order); } } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 442e32c0121ff..f57992140910c 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -1069,7 +1069,9 @@ Result MakeScanNode(acero::ExecPlan* plan, } Ordering ordering = Ordering::Unordered(); - if (implicit_ordering) { ordering = Ordering::Implicit(); } + if (implicit_ordering) { + ordering = Ordering::Implicit(); + } return acero::MakeExecNode( "source", plan, {}, diff --git a/cpp/src/arrow/dataset/write_node_test.cc b/cpp/src/arrow/dataset/write_node_test.cc index 48f209e8d6bff..ff95d26d42e05 100644 --- a/cpp/src/arrow/dataset/write_node_test.cc +++ b/cpp/src/arrow/dataset/write_node_test.cc @@ -18,8 +18,8 @@ #include #include -#include #include +#include #include "arrow/acero/exec_plan.h" #include "arrow/acero/options.h" @@ -186,7 +186,8 @@ TEST_F(SimpleWriteNodeTest, SequenceOutput) { acero::RegisterTestNodes(); // Create an input table - std::shared_ptr table = gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches); + std::shared_ptr
table = + gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches); auto dataset = std::make_shared(table); auto scan_options = std::make_shared(); scan_options->use_threads = true; @@ -196,25 +197,29 @@ TEST_F(SimpleWriteNodeTest, SequenceOutput) { EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); auto exprs = scan_options->projection.call()->arguments; auto names = checked_cast( - scan_options->projection.call()->options.get() - )->field_names; + scan_options->projection.call()->options.get()) + ->field_names; auto fs = std::make_shared(fs::kNoTime); dataset::WriteNodeOptions write_options(fs_write_options_); write_options.write_options.file_write_options = format->DefaultWriteOptions(); write_options.write_options.base_dir = "root"; - write_options.write_options.partitioning = std::make_shared(schema({})); + write_options.write_options.partitioning = + std::make_shared(schema({})); write_options.write_options.basename_template = "{i}.feather"; write_options.write_options.filesystem = fs; write_options.write_options.preserve_order = preserve_order; - // test plan of FileSystemDataset::Write with a jitter node that guarantees exec batches are out of order + // test plan of FileSystemDataset::Write with a jitter node that guarantees exec + // batches are out of order acero::Declaration plan = acero::Declaration::Sequence({ - {"scan", ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false, /*implicit_ordering=*/true}}, - {"filter", acero::FilterNodeOptions{scanner->options()->filter}}, - {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)}, - {"write", write_options}, + {"scan", + ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false, + /*implicit_ordering=*/true}}, + {"filter", acero::FilterNodeOptions{scanner->options()->filter}}, + {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}}, + {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)}, + {"write", write_options}, }); ASSERT_OK(DeclarationToStatus(plan)); @@ -235,9 +240,12 @@ TEST_F(SimpleWriteNodeTest, SequenceOutput) { while (batch != nullptr) { for (int row = 0; row < batch->num_rows(); ++row) { auto scalar = batch->column(0)->GetScalar(row).ValueOrDie(); - auto numeric_scalar = std::static_pointer_cast>(scalar); + auto numeric_scalar = + std::static_pointer_cast>(scalar); int32_t value = numeric_scalar->value; - if (value <= prev) { out_of_order = true; } + if (value <= prev) { + out_of_order = true; + } prev = value; } ABORT_NOT_OK(reader.ReadNext(&batch));