Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41813: [C++] Fix avx2 gather offset larger than 2GB in CompareColumnsToRows #42188

Merged
merged 24 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions cpp/src/arrow/compute/row/compare_internal_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
irow_right =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
}
// TODO: Need to test if this gather is OK when irow_right is larger than
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll test in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say "in the future", is it in this PR or another one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I meant in another PR.

// 0x80000000u.
__m256i offset_right =
_mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4);
offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
Expand All @@ -251,6 +253,35 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
}
}

namespace {

/// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use regular comments (//)? This isn't a docstring so shouldn't use the docstring-specific prefix (///)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

/// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row
/// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat
/// it as negative offset and gather the data from undesired address. To avoid this issue,
/// we normalize the addresses by translating `base` `0x80000000` higher, and `offset`
/// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those
/// intrinsics are safe.

constexpr auto two_gb = 0x80000000ull;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure we use an explicit width type here? I'm not even sure what it is expected to be for correctness of the code using this constant (uint32_t or uint64_t?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both uint32_t and uint64_t are OK. It only has to be unsigned and wide enough for 0x80000000. I'm declaring it uint64_t (the ull suffix) just to make all the arithmetics to be promoted to 64b to not worry about the potential underflow. The two subsequent usages are:

  1. Being added to pointer base after divided by a specific sizeof(). The division is unsigned so the addition is addressing the base "forward", as expected.
  2. Being loaded to a signed __m256i register via an implicit static cast (after divided by scale).

I'll update to make it, and the usages, more more type and width explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.


template <int scale>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. if we're using unsigned arithmetic below, the scale type should probably be unsigned for readability and sanity?
  2. naming convention: can we make this kScale?

Copy link
Contributor Author

@zanmato1984 zanmato1984 Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The type of the third formal parameter of _mm256_set1_epi32/64 is int so I'm just using int too. Yeah, that's probably good.
  2. Yeah, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset) {
auto normalized_base = base + two_gb / sizeof(int);
__m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(two_gb / scale));
return _mm256_i32gather_epi32(normalized_base, normalized_offset, scale);
}

template <int scale>
inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of int64_for_gather_t exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__m128i offset) {
auto normalized_base = base + two_gb / sizeof(arrow::util::int64_for_gather_t);
__m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(two_gb / scale));
return _mm256_i32gather_epi64(normalized_base, normalized_offset, scale);
}

} // namespace

template <int column_width>
inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* right_base,
__m256i irow_left, __m256i offset_right,
Expand Down Expand Up @@ -281,7 +312,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r
ARROW_DCHECK(false);
}

__m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
__m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right);
if (column_width != sizeof(uint32_t)) {
constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff;
right = _mm256_and_si256(right, _mm256_set1_epi32(mask));
Expand Down Expand Up @@ -330,7 +361,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas
ARROW_DCHECK(false);
}

__m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
__m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right);
if (column_width != sizeof(uint32_t)) {
constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff;
right = _mm256_and_si256(right, _mm256_set1_epi32(mask));
Expand Down Expand Up @@ -367,9 +398,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig
auto right_base_i64 =
reinterpret_cast<const arrow::util::int64_for_gather_t*>(right_base);
__m256i right_lo =
_mm256_i32gather_epi64(right_base_i64, _mm256_castsi256_si128(offset_right), 1);
__m256i right_hi = _mm256_i32gather_epi64(right_base_i64,
_mm256_extracti128_si256(offset_right, 1), 1);
UnsignedOffsetSafeGather64<1>(right_base_i64, _mm256_castsi256_si128(offset_right));
__m256i right_hi = UnsignedOffsetSafeGather64<1>(
right_base_i64, _mm256_extracti128_si256(offset_right, 1));
uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo));
uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi));
return result_lo | (static_cast<uint64_t>(result_hi) << 32);
Expand Down
125 changes: 125 additions & 0 deletions cpp/src/arrow/compute/row/compare_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#include <numeric>

#include "arrow/compute/row/compare_internal.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/bitmap_ops.h"

namespace arrow {
namespace compute {
Expand Down Expand Up @@ -164,5 +166,128 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) {
}
}

// Compare columns to rows at offsets over 2GB within a row table.
// Certain AVX2 instructions may behave unexpectedly causing troubles like GH-41813.
TEST(KeyCompare, CompareColumnsToRowsLarge) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the runtime of this test? Perhaps we need to disable it on Valgrind builds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "runtime"? I can't think of a reason why Valgrind would complain (at least ASAN didn't).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant "run time" or execution time :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it! It takes about 20s with ASAN enabled. Perhaps it will be fine with Valgrind too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should take a quick look.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it takes 70s locally under Valgrind. That's a bit high for a single test, I would rather disable it under Valgrind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Updated to disable the test under Valgrind. Thanks for helping running in your local!

if constexpr (sizeof(void*) == 4) {
GTEST_SKIP() << "Test only works on 64-bit platforms";
}

// The idea of this case is to create a row table using several fixed length columns and
// one var length column (so the row is hence var length and has offset buffer), with
// the overall data size exceeding 2GB. Then compare each row with itself.
constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll;
// The compare function requires the row id of the left column to be uint16_t, hence the
// number of rows.
constexpr int64_t num_rows = std::numeric_limits<uint16_t>::max() + 1;
const std::vector<std::shared_ptr<DataType>> fixed_length_types{uint64(), uint32()};
// The var length column should be a little smaller than 2GB to WAR the capacity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"WAR"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant "workaround". Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

// limitation in the var length builder.
constexpr int32_t var_length = two_gb / num_rows - 1;
auto row_size = std::accumulate(fixed_length_types.begin(), fixed_length_types.end(),
static_cast<int64_t>(var_length),
[](int64_t acc, const std::shared_ptr<DataType>& type) {
return acc + type->byte_width();
});
// The overall size should be larger than 2GB.
ASSERT_GT(row_size * num_rows, two_gb);

MemoryPool* pool = default_memory_pool();
TempVectorStack stack;
ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows)));

std::vector<Datum> columns;
{
// Several fixed length arrays containing random content.
for (const auto& type : fixed_length_types) {
ASSERT_OK_AND_ASSIGN(auto column, ::arrow::gen::Random(type)->Generate(num_rows));
columns.push_back(std::move(column));
}
// A var length array containing 'X' repeated var_length times.
ASSERT_OK_AND_ASSIGN(auto column_var_length,
::arrow::gen::Constant(
std::make_shared<BinaryScalar>(std::string(var_length, 'X')))
->Generate(num_rows));
columns.push_back(std::move(column_var_length));
}
ExecBatch batch(std::move(columns), num_rows);

std::vector<KeyColumnMetadata> column_metadatas;
ASSERT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas));
std::vector<KeyColumnArray> column_arrays;
ASSERT_OK(ColumnArraysFromExecBatch(batch, &column_arrays));

// The row table (right side).
RowTableMetadata table_metadata_right;
table_metadata_right.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t),
sizeof(uint64_t));
RowTableImpl row_table;
ASSERT_OK(row_table.Init(pool, table_metadata_right));
std::vector<uint16_t> row_ids_right(num_rows);
std::iota(row_ids_right.begin(), row_ids_right.end(), 0);
RowTableEncoder row_encoder;
row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t));
row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays);
ASSERT_OK(row_encoder.EncodeSelected(&row_table, static_cast<uint32_t>(num_rows),
row_ids_right.data()));

ASSERT_TRUE(row_table.offsets());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's that supposed to check (offsets being "true"?). Do we want to make the test a bit more self-documenting, or perhaps add a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asserting the address of row_table.offsets() is not null, like if (some_pointer). Perhaps I can refine it to ASSERT_NE(row_table.offsets(), NULLPTR).

And the point of this check is to make sure the row_table constructed has an internal offset buffer, i.e., it contains var length columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the ASSERT_NE suggestion would make this more easily understandable, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

// The whole point of this test.
ASSERT_GT(row_table.offsets()[num_rows - 1], two_gb);

// The left rows.
std::vector<uint32_t> row_ids_left(num_rows);
std::iota(row_ids_left.begin(), row_ids_left.end(), 0);

LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack};

{
// No selection, output no match row ids.
uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
KeyCompare::CompareColumnsToRows(num_rows, /*sel_left_maybe_null=*/NULLPTR,
row_ids_left.data(), &ctx, &num_rows_no_match,
row_ids_out.data(), column_arrays, row_table,
/*are_cols_in_encoding_order=*/true, NULLPTR);
ASSERT_EQ(num_rows_no_match, 0);
}

{
// With selection, output no match row ids.
uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
std::vector<uint16_t> selection_left(num_rows);
std::iota(selection_left.begin(), selection_left.end(), 0);
KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), row_ids_left.data(),
&ctx, &num_rows_no_match, row_ids_out.data(),
column_arrays, row_table,
/*are_cols_in_encoding_order=*/true, NULLPTR);
ASSERT_EQ(num_rows_no_match, 0);
}

{
// No selection, output match bit vector.
std::vector<uint8_t> match_bitvector(BytesForBits(num_rows));
KeyCompare::CompareColumnsToRows(
num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, NULLPTR,
NULLPTR, column_arrays, row_table, /*are_cols_in_encoding_order=*/true,
match_bitvector.data());
ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows),
num_rows);
}

{
// With selection, output match bit vector.
std::vector<uint8_t> match_bitvector(BytesForBits(num_rows));
std::vector<uint16_t> selection_left(num_rows);
std::iota(selection_left.begin(), selection_left.end(), 0);
KeyCompare::CompareColumnsToRows(
num_rows, NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays,
row_table, /*are_cols_in_encoding_order=*/true, match_bitvector.data());
ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows),
num_rows);
}
}

} // namespace compute
} // namespace arrow
Loading