Skip to content

Commit

Permalink
GH-43495: [C++][Compute] Widen the row offset of the row table to 64-…
Browse files Browse the repository at this point in the history
…bit (#43389)

### Rationale for this change

The row table uses `uint32_t` as the row offset within the row data buffer, effectively limiting the row data from growing beyond 4GB. This is quite restrictive, and the impact is described in more detail in #43495. This PR proposes to widen the row offset from 32-bit to 64-bit to address this limitation.

#### Benefits
Currently, the row table has three major limitations:
1. The overall data size cannot exceed 4GB.
2. The size of a single row cannot exceed 4GB.
3. The number of rows cannot exceed 2^32.

This enhancement will eliminate the first limitation. Meanwhile, the second and third limitations are less likely to occur. Thus, this change will enable a significant range of use cases that are currently unsupported.

#### Overhead
Of course, this will introduce some overhead:
1. An extra 4 bytes of memory consumption for each row due to the offset size difference from 32-bit to 64-bit.
2. A wider offset type requires a few more SIMD instructions in each 8-row processing iteration.

In my opinion, this overhead is justified by the benefits listed above.

### What changes are included in this PR?

Change the row offset of the row table from 32-bit to 64-bit. Relative code in row comparison/encoding and swiss join has been updated accordingly.

### Are these changes tested?

Test included.

### Are there any user-facing changes?

Users could potentially see higher memory consumption when using acero's hash join and hash aggregation. However, on the other hand, certain use cases used to fail are now able to complete.

* GitHub Issue: #43495

Authored-by: Ruoxi Sun <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
zanmato1984 authored Aug 19, 2024
1 parent 1ae38d0 commit 5e68513
Show file tree
Hide file tree
Showing 15 changed files with 802 additions and 343 deletions.
192 changes: 192 additions & 0 deletions cpp/src/arrow/acero/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/compute/kernels/test_util.h"
#include "arrow/compute/light_array_internal.h"
#include "arrow/testing/extension_type.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
Expand All @@ -40,6 +41,10 @@ using testing::UnorderedElementsAreArray;

namespace arrow {

using arrow::gen::Constant;
using arrow::random::kSeedMax;
using arrow::random::RandomArrayGenerator;
using compute::and_;
using compute::call;
using compute::default_exec_context;
using compute::ExecBatchBuilder;
Expand Down Expand Up @@ -3253,5 +3258,192 @@ TEST(HashJoin, ManyJoins) {
ASSERT_OK_AND_ASSIGN(std::ignore, DeclarationToTable(std::move(root)));
}

namespace {

void AssertRowCountEq(Declaration source, int64_t expected) {
Declaration count{"aggregate",
{std::move(source)},
AggregateNodeOptions{/*aggregates=*/{{"count_all", "count(*)"}}}};
ASSERT_OK_AND_ASSIGN(auto batches, DeclarationToExecBatches(std::move(count)));
ASSERT_EQ(batches.batches.size(), 1);
ASSERT_EQ(batches.batches[0].values.size(), 1);
ASSERT_TRUE(batches.batches[0].values[0].is_scalar());
ASSERT_EQ(batches.batches[0].values[0].scalar()->type->id(), Type::INT64);
ASSERT_TRUE(batches.batches[0].values[0].scalar_as<Int64Scalar>().is_valid);
ASSERT_EQ(batches.batches[0].values[0].scalar_as<Int64Scalar>().value, expected);
}

} // namespace

// GH-43495: Test that both the key and the payload of the right side (the build side) are
// fixed length and larger than 4GB, and the 64-bit offset in the hash table can handle it
// correctly.
TEST(HashJoin, LARGE_MEMORY_TEST(BuildSideOver4GBFixedLength)) {
constexpr int64_t k5GB = 5ll * 1024 * 1024 * 1024;
constexpr int fixed_length = 128;
const auto type = fixed_size_binary(fixed_length);
constexpr uint8_t byte_no_match_min = static_cast<uint8_t>('A');
constexpr uint8_t byte_no_match_max = static_cast<uint8_t>('y');
constexpr uint8_t byte_match = static_cast<uint8_t>('z');
const auto value_match =
std::make_shared<FixedSizeBinaryScalar>(std::string(fixed_length, byte_match));
constexpr int16_t num_rows_per_batch_left = 128;
constexpr int16_t num_rows_per_batch_right = 4096;
const int64_t num_batches_left = 8;
const int64_t num_batches_right =
k5GB / (num_rows_per_batch_right * type->byte_width());

// Left side composed of num_batches_left identical batches of num_rows_per_batch_left
// rows of value_match-es.
BatchesWithSchema batches_left;
{
// A column with num_rows_per_batch_left value_match-es.
ASSERT_OK_AND_ASSIGN(auto column,
Constant(value_match)->Generate(num_rows_per_batch_left));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_left);
batches_left =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_left, std::move(batch)),
schema({field("l_key", type), field("l_payload", type)})};
}

// Right side composed of num_batches_right identical batches of
// num_rows_per_batch_right rows containing only 1 value_match.
BatchesWithSchema batches_right;
{
// A column with (num_rows_per_batch_right - 1) non-value_match-es (possibly null) and
// 1 value_match.
auto non_matches = RandomArrayGenerator(kSeedMax).FixedSizeBinary(
num_rows_per_batch_right - 1, fixed_length,
/*null_probability =*/0.01, /*min_byte=*/byte_no_match_min,
/*max_byte=*/byte_no_match_max);
ASSERT_OK_AND_ASSIGN(auto match, Constant(value_match)->Generate(1));
ASSERT_OK_AND_ASSIGN(auto column, Concatenate({non_matches, match}));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_right);
batches_right =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_right, std::move(batch)),
schema({field("r_key", type), field("r_payload", type)})};
}

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_left.schema),
std::move(batches_left.batches))};

Declaration right{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_right.schema),
std::move(batches_right.batches))};

HashJoinNodeOptions join_opts(JoinType::INNER, /*left_keys=*/{"l_key"},
/*right_keys=*/{"r_key"});
Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts};

ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join)));
Declaration result{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_result.schema),
std::move(batches_result.batches))};

// The row count of hash join should be (number of value_match-es in left side) *
// (number of value_match-es in right side).
AssertRowCountEq(result,
num_batches_left * num_rows_per_batch_left * num_batches_right);

// All rows should be value_match-es.
auto predicate = and_({equal(field_ref("l_key"), literal(value_match)),
equal(field_ref("l_payload"), literal(value_match)),
equal(field_ref("r_key"), literal(value_match)),
equal(field_ref("r_payload"), literal(value_match))});
Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}};
AssertRowCountEq(std::move(filter),
num_batches_left * num_rows_per_batch_left * num_batches_right);
}

// GH-43495: Test that both the key and the payload of the right side (the build side) are
// var length and larger than 4GB, and the 64-bit offset in the hash table can handle it
// correctly.
TEST(HashJoin, LARGE_MEMORY_TEST(BuildSideOver4GBVarLength)) {
constexpr int64_t k5GB = 5ll * 1024 * 1024 * 1024;
const auto type = utf8();
constexpr int value_no_match_length_min = 128;
constexpr int value_no_match_length_max = 129;
constexpr int value_match_length = 130;
const auto value_match =
std::make_shared<StringScalar>(std::string(value_match_length, 'X'));
constexpr int16_t num_rows_per_batch_left = 128;
constexpr int16_t num_rows_per_batch_right = 4096;
const int64_t num_batches_left = 8;
const int64_t num_batches_right =
k5GB / (num_rows_per_batch_right * value_no_match_length_min);

// Left side composed of num_batches_left identical batches of num_rows_per_batch_left
// rows of value_match-es.
BatchesWithSchema batches_left;
{
// A column with num_rows_per_batch_left value_match-es.
ASSERT_OK_AND_ASSIGN(auto column,
Constant(value_match)->Generate(num_rows_per_batch_left));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_left);
batches_left =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_left, std::move(batch)),
schema({field("l_key", type), field("l_payload", type)})};
}

// Right side composed of num_batches_right identical batches of
// num_rows_per_batch_right rows containing only 1 value_match.
BatchesWithSchema batches_right;
{
// A column with (num_rows_per_batch_right - 1) non-value_match-es (possibly null) and
// 1 value_match.
auto non_matches =
RandomArrayGenerator(kSeedMax).String(num_rows_per_batch_right - 1,
/*min_length=*/value_no_match_length_min,
/*max_length=*/value_no_match_length_max,
/*null_probability =*/0.01);
ASSERT_OK_AND_ASSIGN(auto match, Constant(value_match)->Generate(1));
ASSERT_OK_AND_ASSIGN(auto column, Concatenate({non_matches, match}));

// Use the column as both the key and the payload.
ExecBatch batch({column, column}, num_rows_per_batch_right);
batches_right =
BatchesWithSchema{std::vector<ExecBatch>(num_batches_right, std::move(batch)),
schema({field("r_key", type), field("r_payload", type)})};
}

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_left.schema),
std::move(batches_left.batches))};

Declaration right{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_right.schema),
std::move(batches_right.batches))};

HashJoinNodeOptions join_opts(JoinType::INNER, /*left_keys=*/{"l_key"},
/*right_keys=*/{"r_key"});
Declaration join{"hashjoin", {std::move(left), std::move(right)}, join_opts};

ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join)));
Declaration result{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_result.schema),
std::move(batches_result.batches))};

// The row count of hash join should be (number of value_match-es in left side) *
// (number of value_match-es in right side).
AssertRowCountEq(result,
num_batches_left * num_rows_per_batch_left * num_batches_right);

// All rows should be value_match-es.
auto predicate = and_({equal(field_ref("l_key"), literal(value_match)),
equal(field_ref("l_payload"), literal(value_match)),
equal(field_ref("r_key"), literal(value_match)),
equal(field_ref("r_payload"), literal(value_match))});
Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}};
AssertRowCountEq(std::move(filter),
num_batches_left * num_rows_per_batch_left * num_batches_right);
}

} // namespace acero
} // namespace arrow
26 changes: 11 additions & 15 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
if (!is_fixed_length_column) {
int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id);
const uint8_t* row_ptr_base = rows.data(2);
const uint32_t* row_offsets = rows.offsets();
const RowTableImpl::offset_type* row_offsets = rows.offsets();
uint32_t field_offset_within_row, field_length;

if (varbinary_column_id == 0) {
Expand Down Expand Up @@ -173,7 +173,7 @@ void RowArrayAccessor::Visit(const RowTableImpl& rows, int column_id, int num_ro
// Case 4: This is a fixed length column in a varying length row
//
const uint8_t* row_ptr_base = rows.data(2) + field_offset_within_row;
const uint32_t* row_offsets = rows.offsets();
const RowTableImpl::offset_type* row_offsets = rows.offsets();
for (int i = 0; i < num_rows; ++i) {
uint32_t row_id = row_ids[i];
const uint8_t* row_ptr = row_ptr_base + row_offsets[row_id];
Expand Down Expand Up @@ -473,17 +473,10 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target,
(*first_target_row_id)[sources.size()] = num_rows;
}

if (num_bytes > std::numeric_limits<uint32_t>::max()) {
return Status::Invalid(
"There are more than 2^32 bytes of key data. Acero cannot "
"process a join of this magnitude");
}

// Allocate target memory
//
target->rows_.Clean();
RETURN_NOT_OK(target->rows_.AppendEmpty(static_cast<uint32_t>(num_rows),
static_cast<uint32_t>(num_bytes)));
RETURN_NOT_OK(target->rows_.AppendEmpty(static_cast<uint32_t>(num_rows), num_bytes));

// In case of varying length rows,
// initialize the first row offset for each range of rows corresponding to a
Expand Down Expand Up @@ -565,15 +558,15 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t first_target_row_offset,
const int64_t* source_rows_permutation) {
int64_t num_source_rows = source.length();
uint32_t* target_offsets = target->mutable_offsets();
const uint32_t* source_offsets = source.offsets();
RowTableImpl::offset_type* target_offsets = target->mutable_offsets();
const RowTableImpl::offset_type* source_offsets = source.offsets();

// Permutation of source rows is optional.
//
if (!source_rows_permutation) {
int64_t target_row_offset = first_target_row_offset;
for (int64_t i = 0; i < num_source_rows; ++i) {
target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = target_row_offset;
target_row_offset += source_offsets[i + 1] - source_offsets[i];
}
// We purposefully skip outputting of N+1 offset, to allow concurrent
Expand All @@ -593,7 +586,10 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
int64_t source_row_id = source_rows_permutation[i];
const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>(
source.data(2) + source_offsets[source_row_id]);
uint32_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id];
int64_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id];
// Though the row offset is 64-bit, the length of a single row must be 32-bit as
// required by current row table implementation.
DCHECK_LE(length, std::numeric_limits<uint32_t>::max());

// Rows should be 64-bit aligned.
// In that case we can copy them using a sequence of 64-bit read/writes.
Expand All @@ -604,7 +600,7 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl&
*target_row_ptr++ = *source_row_ptr++;
}

target_offsets[first_target_row_id + i] = static_cast<uint32_t>(target_row_offset);
target_offsets[first_target_row_id + i] = target_row_offset;
target_row_offset += length;
}
}
Expand Down
Loading

0 comments on commit 5e68513

Please sign in to comment.