Skip to content

Commit

Permalink
feat(cpp/acero): apache#38074 Add unit test for inner join about larg…
Browse files Browse the repository at this point in the history
…e_utf8 and large_binary
  • Loading branch information
llama90 committed Oct 10, 2023
1 parent 6d04345 commit 3894147
Showing 1 changed file with 261 additions and 0 deletions.
261 changes: 261 additions & 0 deletions cpp/src/arrow/acero/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2124,5 +2124,266 @@ TEST(HashJoin, ChainedIntegerHashJoins) {
}
}

// This test case is related to GH-38147
// To verify that the issue with offset handling has been fixed, the number of matching
// records needs to be larger than the mini-batch size (1024).
constexpr uint64_t NUM_MATCH_RECORDS = 1234;
constexpr uint64_t NUM_LEFT_RECORDS = 2000;
constexpr uint64_t NUM_RIGHT_RECORDS = 1500;

std::string GenerateTimestamp() {
auto now = std::chrono::system_clock::now();
auto now_time_t = std::chrono::system_clock::to_time_t(now);
auto now_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) %
1000;

std::stringstream ss;
ss << std::put_time(std::localtime(&now_time_t), "%Y-%m-%d %H:%M:%S");
ss << '.' << std::setw(6) << std::setfill('0') << now_ms.count();
return ss.str();
}

std::string GenerateRecord(
uint64_t index, const std::string& timestamp, bool useNegativeOne,
std::function<std::vector<std::string>(uint64_t)> getExtraValues) {
std::stringstream record;
record << R"([")" << timestamp << R"(",)";
if (useNegativeOne && index >= NUM_MATCH_RECORDS) {
record << "\"Not Matched\"";
} else {
record << "\"" << index + 1 << "\"";
}
auto extraValues = getExtraValues(index);
for (const auto& value : extraValues) {
record << R"(, ")" << value << R"(")";
}
record << R"(])";
return record.str();
}

std::string GenerateLeftRecords(
uint64_t recordCount, const std::string& timestamp,
std::function<std::vector<std::string>(uint64_t)> getExtraValues) {
std::stringstream ss;
ss << R"([)";
for (uint64_t i = 0; i < recordCount; ++i) {
if (i != 0) ss << ",";
ss << GenerateRecord(i, timestamp, true, getExtraValues);
}
ss << R"(])";
return ss.str();
}

std::string GenerateRightRecords(
uint64_t recordCount, const std::string& timestamp,
std::function<std::vector<std::string>(uint64_t)> getExtraValues) {
std::stringstream ss;
ss << R"([)";
for (uint64_t i = 0; i < recordCount; ++i) {
if (i != 0) ss << ",";
ss << GenerateRecord(i, timestamp, false, getExtraValues);
}
ss << R"(])";
return ss.str();
}

std::string GenerateExpectedRecords(
uint64_t recordCount, const std::string& timestamp,
std::function<std::vector<std::string>(uint64_t)> getLeftExtraValues,
std::function<std::vector<std::string>(uint64_t)> getRightExtraValues) {
std::stringstream ss;
ss << R"([)";
for (uint64_t i = 0; i < recordCount; ++i) {
if (i >= NUM_MATCH_RECORDS) break; // Consider only matched records

if (i != 0) ss << ",";

auto leftExtraValues = getLeftExtraValues(i);
auto rightExtraValues = getRightExtraValues(i);

// Construct a matched record (inner join result)
ss << R"([")" << timestamp << R"(",)"
<< "\"" << i + 1 << "\"";
for (const auto& value : leftExtraValues) {
ss << R"(,")" << value << R"(")";
}
ss << R"(,")" << timestamp << R"(",)"
<< "\"" << i + 1 << "\"";
for (const auto& value : rightExtraValues) {
ss << R"(,")" << value << R"(")";
}
ss << R"(])";
}
ss << R"(])";
return ss.str();
}

// This test case is related to GH-38147
// In the previous code, only 1030 matching records are returned.
TEST(HashJoin, InnerJoinTestLargerThanMiniBatchSize) {
// Just as an example, returning "foo" for every record
auto leftExtraValues = [](uint64_t index) -> std::vector<std::string> {
return {"foo"};
};

// Just as an example, returning "foo" and "bar" for every record
auto rightExtraValues = [](uint64_t index) -> std::vector<std::string> {
return {"foo", "bar"};
};

std::string currTS = GenerateTimestamp();

std::string leftRecords =
GenerateLeftRecords(NUM_LEFT_RECORDS, currTS, leftExtraValues);
std::string rightRecords =
GenerateRightRecords(NUM_RIGHT_RECORDS, currTS, rightExtraValues);
std::string expectedRecords = GenerateExpectedRecords(
NUM_MATCH_RECORDS, currTS, leftExtraValues, rightExtraValues);

// Initialize left input data with the specific timestamp and large utf8 data
BatchesWithSchema input_left;
input_left.batches = {ExecBatchFromJSON(
{timestamp(TimeUnit::MICRO, "UTC"), large_utf8(), large_utf8()}, leftRecords)};
input_left.schema =
schema({field("col_1", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2", large_utf8()), field("col_3", large_utf8())});

// Initialize right input data with an extra column and missing keys
BatchesWithSchema input_right;
input_right.batches = {ExecBatchFromJSON(
{timestamp(TimeUnit::MICRO, "UTC"), large_utf8(), large_utf8(), large_utf8()},
rightRecords)};
input_right.schema = schema({field("col_1", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2", large_utf8()), field("col_3", large_utf8()),
field("col_4", large_utf8())});

// Hash join options for inner join
HashJoinNodeOptions join_opts{JoinType::INNER,
/*left_keys=*/{"col_1", "col_2", "col_3"},
/*right_keys=*/{"col_1", "col_2", "col_3"},
literal(true),
"_l",
"_r",
false};

// Creating join declaration
Declaration left{"source",
SourceNodeOptions{input_left.schema,
input_left.gen(/*parallel=*/true, /*slow=*/false)}};
Declaration right{
"source", SourceNodeOptions{input_right.schema,
input_right.gen(/*parallel=*/true, /*slow=*/false)}};
Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts};

// Executing and getting the actual result
ASSERT_OK_AND_ASSIGN(auto actual, DeclarationToExecBatches(std::move(join), false));

// Print actual schema and batch for verification
std::cout << "Actual schema: " << actual.schema->ToString() << std::endl;
for (const auto& batch : actual.batches) {
std::cout << "Batch: " << batch.ToString() << std::endl;
}

// Initialize expected data after inner join with the specific criteria
BatchesWithSchema expected;
expected.batches = {ExecBatchFromJSON(
{timestamp(TimeUnit::MICRO, "UTC"), large_utf8(), large_utf8(),
timestamp(TimeUnit::MICRO, "UTC"), large_utf8(), large_utf8(), large_utf8()},
expectedRecords)};
expected.schema =
schema({field("col_1_l", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2_l", large_utf8()), field("col_3_l", large_utf8()),
field("col_1_r", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2_r", large_utf8()), field("col_3_r", large_utf8()),
field("col_4", large_utf8())});

// Assertions to check the equality and validity of the expected and actual results
AssertExecBatchesEqualIgnoringOrder(expected.schema, expected.batches, actual.batches);
AssertSchemaEqual(expected.schema, actual.schema);
}

// This test case is related to GH-38147
// In the previous code, only 1026 matching records are returned.
TEST(HashJoin, InnerJoinTestLargerThanMiniBatchSizeBinary) {
auto leftExtraValues = [](uint64_t index) -> std::vector<std::string> {
return {"foo"};
};

auto rightExtraValues = [](uint64_t index) -> std::vector<std::string> {
return {"foo", "bar"};
};

std::string currTS = GenerateTimestamp();

std::string leftRecords =
GenerateLeftRecords(NUM_LEFT_RECORDS, currTS, leftExtraValues);
std::string rightRecords =
GenerateRightRecords(NUM_RIGHT_RECORDS, currTS, rightExtraValues);
std::string expectedRecords = GenerateExpectedRecords(
NUM_MATCH_RECORDS, currTS, leftExtraValues, rightExtraValues);

// Initialize left input data with the specific timestamp and large binary data
BatchesWithSchema input_left;
input_left.batches = {ExecBatchFromJSON(
{timestamp(TimeUnit::MICRO, "UTC"), large_binary(), large_binary()}, leftRecords)};
input_left.schema =
schema({field("col_1", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2", large_binary()), field("col_3", large_binary())});

// Initialize right input data with an extra column and binary data
BatchesWithSchema input_right;
input_right.batches = {ExecBatchFromJSON(
{timestamp(TimeUnit::MICRO, "UTC"), large_binary(), large_binary(), large_binary()},
rightRecords)};
input_right.schema = schema(
{field("col_1", timestamp(TimeUnit::MICRO, "UTC")), field("col_2", large_binary()),
field("col_3", large_binary()), field("col_4", large_binary())});

// Hash join options for inner join
HashJoinNodeOptions join_opts{JoinType::INNER,
/*left_keys=*/{"col_1", "col_2", "col_3"},
/*right_keys=*/{"col_1", "col_2", "col_3"},
literal(true),
"_l",
"_r",
false};

// Creating join declaration
Declaration left{"source",
SourceNodeOptions{input_left.schema,
input_left.gen(/*parallel=*/true, /*slow=*/false)}};
Declaration right{
"source", SourceNodeOptions{input_right.schema,
input_right.gen(/*parallel=*/true, /*slow=*/false)}};
Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts};

// Executing and getting the actual result
ASSERT_OK_AND_ASSIGN(auto actual, DeclarationToExecBatches(std::move(join), false));

// Print actual schema and batch for verification
std::cout << "Actual schema: " << actual.schema->ToString() << std::endl;
for (const auto& batch : actual.batches) {
std::cout << "Batch: " << batch.ToString() << std::endl;
}

// Initialize expected data after inner join with the specific criteria
BatchesWithSchema expected;
expected.batches = {ExecBatchFromJSON(
{timestamp(TimeUnit::MICRO, "UTC"), large_binary(), large_binary(),
timestamp(TimeUnit::MICRO, "UTC"), large_binary(), large_binary(), large_binary()},
expectedRecords)};
expected.schema =
schema({field("col_1_l", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2_l", large_binary()), field("col_3_l", large_binary()),
field("col_1_r", timestamp(TimeUnit::MICRO, "UTC")),
field("col_2_r", large_binary()), field("col_3_r", large_binary()),
field("col_4", large_binary())});

// Assertions to check the equality and validity of the expected and actual results
AssertExecBatchesEqualIgnoringOrder(expected.schema, expected.batches, actual.batches);
AssertSchemaEqual(expected.schema, actual.schema);
}

} // namespace acero
} // namespace arrow

0 comments on commit 3894147

Please sign in to comment.