Skip to content

Commit

Permalink
Fix linting, improved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 18, 2024
1 parent 22faea4 commit df98801
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 25 deletions.
5 changes: 4 additions & 1 deletion cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
SourceNodeOptions(std::shared_ptr<Schema> output_schema,
std::function<Future<std::optional<ExecBatch>>()> generator,
Ordering ordering = Ordering::Unordered())
: output_schema(std::move(output_schema)), generator(std::move(generator)), ordering(std::move(ordering)) {}
: output_schema(std::move(output_schema)),
generator(std::move(generator)),
ordering(std::move(ordering)) {}

/// \brief the schema for batches that will be generated by this source
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;
/// \brief the order of the data, defaults to Ordering::Unordered
Ordering ordering;
};

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode"));
const auto& source_options = checked_cast<const SourceNodeOptions&>(options);
return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
source_options.generator, source_options.ordering);
source_options.generator,
source_options.ordering);
}

const char* kind_name() const override { return "SourceNode"; }
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,8 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
bool implicit_ordering = write_node_options.write_options.preserve_order;

acero::Declaration plan = acero::Declaration::Sequence({
{"scan", ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false, implicit_ordering}},
{"scan", ScanNodeOptions{dataset, scanner->options(),
/*require_sequenced_output=*/false, implicit_ordering}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"write", std::move(write_node_options)},
Expand Down Expand Up @@ -541,10 +542,13 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,

ARROW_ASSIGN_OR_RAISE(
auto node,
// sequence the exec batches to preserve order
// to preserve order explicitly sequence the exec batches
// this requires exec batch index to be set upstream (e.g. by SourceNode)
acero::MakeExecNode("consuming_sink", plan, std::move(inputs),
acero::ConsumingSinkNodeOptions{std::move(consumer), {},
/*sequence_output=*/write_options.preserve_order}));
acero::ConsumingSinkNodeOptions{
std::move(consumer),
{},
/*sequence_output=*/write_options.preserve_order}));

return node;
}
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,19 @@ TEST_F(TestFileSystemDataset, WritePersistOrder) {
while (batch != nullptr) {
for (int row = 0; row < batch->num_rows(); ++row) {
auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
auto numeric_scalar = std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
auto numeric_scalar =
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
int32_t value = numeric_scalar->value;
if (value <= prev) { out_of_order = true; }
if (value <= prev) {
out_of_order = true;
}
prev = value;
}
ABORT_NOT_OK(reader.ReadNext(&batch));
}
// TODO: this currently fails because out-of-order batches cannot be reproduced with this test
// how can we guarantee that table written with preserve_order = false is out of order?
// Other than SimpleWriteNodeTest.SequenceOutput in write_node_test.cc
// TODO: this currently fails because out-of-order batches cannot be reproduced with
// this test how can we guarantee that table written with preserve_order = false is
// out of order? Other than SimpleWriteNodeTest.SequenceOutput in write_node_test.cc
ASSERT_EQ(out_of_order, !preserve_order);
}
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,9 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
}

Ordering ordering = Ordering::Unordered();
if (implicit_ordering) { ordering = Ordering::Implicit(); }
if (implicit_ordering) {
ordering = Ordering::Implicit();
}

return acero::MakeExecNode(
"source", plan, {},
Expand Down
34 changes: 21 additions & 13 deletions cpp/src/arrow/dataset/write_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <memory>
#include <arrow/compute/api_scalar.h>
#include <memory>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
Expand Down Expand Up @@ -186,7 +186,8 @@ TEST_F(SimpleWriteNodeTest, SequenceOutput) {
acero::RegisterTestNodes();

// Create an input table
std::shared_ptr<Table> table = gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches);
std::shared_ptr<Table> table =
gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches);
auto dataset = std::make_shared<InMemoryDataset>(table);
auto scan_options = std::make_shared<ScanOptions>();
scan_options->use_threads = true;
Expand All @@ -196,25 +197,29 @@ TEST_F(SimpleWriteNodeTest, SequenceOutput) {
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
auto exprs = scan_options->projection.call()->arguments;
auto names = checked_cast<const compute::MakeStructOptions*>(
scan_options->projection.call()->options.get()
)->field_names;
scan_options->projection.call()->options.get())
->field_names;

auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.write_options.file_write_options = format->DefaultWriteOptions();
write_options.write_options.base_dir = "root";
write_options.write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
write_options.write_options.partitioning =
std::make_shared<HivePartitioning>(schema({}));
write_options.write_options.basename_template = "{i}.feather";
write_options.write_options.filesystem = fs;
write_options.write_options.preserve_order = preserve_order;

// test plan of FileSystemDataset::Write with a jitter node that guarantees exec batches are out of order
// test plan of FileSystemDataset::Write with a jitter node that guarantees exec
// batches are out of order
acero::Declaration plan = acero::Declaration::Sequence({
{"scan", ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false, /*implicit_ordering=*/true}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
{"write", write_options},
{"scan",
ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false,
/*implicit_ordering=*/true}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
{"write", write_options},
});

ASSERT_OK(DeclarationToStatus(plan));
Expand All @@ -235,9 +240,12 @@ TEST_F(SimpleWriteNodeTest, SequenceOutput) {
while (batch != nullptr) {
for (int row = 0; row < batch->num_rows(); ++row) {
auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
auto numeric_scalar = std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
auto numeric_scalar =
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
int32_t value = numeric_scalar->value;
if (value <= prev) { out_of_order = true; }
if (value <= prev) {
out_of_order = true;
}
prev = value;
}
ABORT_NOT_OK(reader.ReadNext(&batch));
Expand Down

0 comments on commit df98801

Please sign in to comment.