Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41813: [C++] Fix avx2 gather offset larger than 2GB in CompareColumnsToRows #42188

Merged
merged 24 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions cpp/src/arrow/compute/row/compare_internal_avx2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
irow_right =
_mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
}
// TODO: Need to test if this gather is OK when irow_right is larger than
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll test in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say "in the future", is it in this PR or another one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I meant in another PR.

// 0x80000000u.
__m256i offset_right =
_mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4);
offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
Expand All @@ -251,6 +253,40 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
}
}

namespace {

// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we
// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row
// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat
// it as negative offset and gather the data from undesired address. To avoid this issue,
// we normalize the addresses by translating `base` `0x80000000` higher, and `offset`
// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those
// intrinsics are safe.

constexpr uint64_t kTwoGB = 0x80000000ull;

template <uint32_t kScale>
inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset) {
int const* normalized_base = base + kTwoGB / sizeof(int);
__m256i normalized_offset =
_mm256_sub_epi32(offset, _mm256_set1_epi32(static_cast<int>(kTwoGB / kScale)));
return _mm256_i32gather_epi32(normalized_base, normalized_offset,
static_cast<int>(kScale));
}

template <uint32_t kScale>
inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of int64_for_gather_t exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__m128i offset) {
arrow::util::int64_for_gather_t const* normalized_base =
base + kTwoGB / sizeof(arrow::util::int64_for_gather_t);
__m128i normalized_offset =
_mm_sub_epi32(offset, _mm_set1_epi32(static_cast<int>(kTwoGB / kScale)));
return _mm256_i32gather_epi64(normalized_base, normalized_offset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about instructions.

Why is the vindex parameter type of _mm256_i32gather_epi32 is _m256i and the vindex type of _mm256_i32gather_epi64 is _m128i?

This may not be related to PR, I just want to understand it🫡

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both intrinsics gather "several" integers based on a base address and "several" 32b offsets (vindex), and stores the results into a 256b register. The difference is: _mm256_i32gather_epi32 gathers 8 32b-integers (8 * 32 = 256) at a time so 8 32b indices are used, hence the 256b vindex. Whereas _mm256_i32gather_epi64 gathers 4 64b-integers at a time so 4 32b indices are used, hence the 128b vindex.

Copy link
Collaborator

@ZhangHuiGui ZhangHuiGui Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!

static_cast<int>(kScale));
}

} // namespace

template <int column_width>
inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* right_base,
__m256i irow_left, __m256i offset_right,
Expand Down Expand Up @@ -281,7 +317,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r
ARROW_DCHECK(false);
}

__m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
__m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right);
if (column_width != sizeof(uint32_t)) {
constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff;
right = _mm256_and_si256(right, _mm256_set1_epi32(mask));
Expand Down Expand Up @@ -330,7 +366,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas
ARROW_DCHECK(false);
}

__m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
__m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right);
if (column_width != sizeof(uint32_t)) {
constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff;
right = _mm256_and_si256(right, _mm256_set1_epi32(mask));
Expand Down Expand Up @@ -367,9 +403,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig
auto right_base_i64 =
reinterpret_cast<const arrow::util::int64_for_gather_t*>(right_base);
__m256i right_lo =
_mm256_i32gather_epi64(right_base_i64, _mm256_castsi256_si128(offset_right), 1);
__m256i right_hi = _mm256_i32gather_epi64(right_base_i64,
_mm256_extracti128_si256(offset_right, 1), 1);
UnsignedOffsetSafeGather64<1>(right_base_i64, _mm256_castsi256_si128(offset_right));
__m256i right_hi = UnsignedOffsetSafeGather64<1>(
right_base_i64, _mm256_extracti128_si256(offset_right, 1));
uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo));
uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi));
return result_lo | (static_cast<uint64_t>(result_hi) << 32);
Expand Down
128 changes: 128 additions & 0 deletions cpp/src/arrow/compute/row/compare_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#include <numeric>

#include "arrow/compute/row/compare_internal.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/bitmap_ops.h"

namespace arrow {
namespace compute {
Expand Down Expand Up @@ -164,5 +166,131 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) {
}
}

#ifndef ARROW_VALGRIND
// Compare columns to rows at offsets over 2GB within a row table.
// Certain AVX2 instructions may behave unexpectedly causing troubles like GH-41813.
TEST(KeyCompare, CompareColumnsToRowsLarge) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the runtime of this test? Perhaps we need to disable it on Valgrind builds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "runtime"? I can't think of a reason why Valgrind would complain (at least ASAN didn't).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant "run time" or execution time :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it! It takes about 20s with ASAN enabled. Perhaps it will be fine with Valgrind too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should take a quick look.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it takes 70s locally under Valgrind. That's a bit high for a single test, I would rather disable it under Valgrind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Updated to disable the test under Valgrind. Thanks for helping running in your local!

if constexpr (sizeof(void*) == 4) {
GTEST_SKIP() << "Test only works on 64-bit platforms";
}

// The idea of this case is to create a row table using several fixed length columns and
// one var length column (so the row is hence var length and has offset buffer), with
// the overall data size exceeding 2GB. Then compare each row with itself.
constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll;
// The compare function requires the row id of the left column to be uint16_t, hence the
// number of rows.
constexpr int64_t num_rows = std::numeric_limits<uint16_t>::max() + 1;
const std::vector<std::shared_ptr<DataType>> fixed_length_types{uint64(), uint32()};
// The var length column should be a little smaller than 2GB to workaround the capacity
// limitation in the var length builder.
constexpr int32_t var_length = two_gb / num_rows - 1;
auto row_size = std::accumulate(fixed_length_types.begin(), fixed_length_types.end(),
static_cast<int64_t>(var_length),
[](int64_t acc, const std::shared_ptr<DataType>& type) {
return acc + type->byte_width();
});
// The overall size should be larger than 2GB.
ASSERT_GT(row_size * num_rows, two_gb);

MemoryPool* pool = default_memory_pool();
TempVectorStack stack;
ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows)));

std::vector<Datum> columns;
{
// Several fixed length arrays containing random content.
for (const auto& type : fixed_length_types) {
ASSERT_OK_AND_ASSIGN(auto column, ::arrow::gen::Random(type)->Generate(num_rows));
columns.push_back(std::move(column));
}
// A var length array containing 'X' repeated var_length times.
ASSERT_OK_AND_ASSIGN(auto column_var_length,
::arrow::gen::Constant(
std::make_shared<BinaryScalar>(std::string(var_length, 'X')))
->Generate(num_rows));
columns.push_back(std::move(column_var_length));
}
ExecBatch batch(std::move(columns), num_rows);

std::vector<KeyColumnMetadata> column_metadatas;
ASSERT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas));
std::vector<KeyColumnArray> column_arrays;
ASSERT_OK(ColumnArraysFromExecBatch(batch, &column_arrays));

// The row table (right side).
RowTableMetadata table_metadata_right;
table_metadata_right.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t),
sizeof(uint64_t));
RowTableImpl row_table;
ASSERT_OK(row_table.Init(pool, table_metadata_right));
std::vector<uint16_t> row_ids_right(num_rows);
std::iota(row_ids_right.begin(), row_ids_right.end(), 0);
RowTableEncoder row_encoder;
row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t));
row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays);
ASSERT_OK(row_encoder.EncodeSelected(&row_table, static_cast<uint32_t>(num_rows),
row_ids_right.data()));

// The row table must contain an offset buffer.
ASSERT_NE(row_table.offsets(), NULLPTR);
// The whole point of this test.
ASSERT_GT(row_table.offsets()[num_rows - 1], two_gb);

// The left rows.
std::vector<uint32_t> row_ids_left(num_rows);
std::iota(row_ids_left.begin(), row_ids_left.end(), 0);

LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack};

{
// No selection, output no match row ids.
uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
KeyCompare::CompareColumnsToRows(num_rows, /*sel_left_maybe_null=*/NULLPTR,
row_ids_left.data(), &ctx, &num_rows_no_match,
row_ids_out.data(), column_arrays, row_table,
/*are_cols_in_encoding_order=*/true, NULLPTR);
ASSERT_EQ(num_rows_no_match, 0);
}

{
// With selection, output no match row ids.
uint32_t num_rows_no_match;
std::vector<uint16_t> row_ids_out(num_rows);
std::vector<uint16_t> selection_left(num_rows);
std::iota(selection_left.begin(), selection_left.end(), 0);
KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), row_ids_left.data(),
&ctx, &num_rows_no_match, row_ids_out.data(),
column_arrays, row_table,
/*are_cols_in_encoding_order=*/true, NULLPTR);
ASSERT_EQ(num_rows_no_match, 0);
}

{
// No selection, output match bit vector.
std::vector<uint8_t> match_bitvector(BytesForBits(num_rows));
KeyCompare::CompareColumnsToRows(
num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, NULLPTR,
NULLPTR, column_arrays, row_table, /*are_cols_in_encoding_order=*/true,
match_bitvector.data());
ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows),
num_rows);
}

{
// With selection, output match bit vector.
std::vector<uint8_t> match_bitvector(BytesForBits(num_rows));
std::vector<uint16_t> selection_left(num_rows);
std::iota(selection_left.begin(), selection_left.end(), 0);
KeyCompare::CompareColumnsToRows(
num_rows, NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays,
row_table, /*are_cols_in_encoding_order=*/true, match_bitvector.data());
ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows),
num_rows);
}
}
#endif // ARROW_VALGRIND

} // namespace compute
} // namespace arrow
Loading