From 8528e00df2e777b6bd709e4a0a2762d587f999d9 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Thu, 13 Jun 2024 17:19:55 +0800 Subject: [PATCH 01/24] Repro WIP --- cpp/CMakePresets.json | 13 ++++++ cpp/src/arrow/dataset/dataset_test.cc | 58 +++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index cb4cdfc03ac82..3f0dd45dd050d 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -258,6 +258,19 @@ "displayName": "Debug build with tests and more optional components", "cacheVariables": {} }, + { + "name": "fix-41813", + "inherits": [ + "base-debug", + "features-main" + ], + "displayName": "Fix 41813", + "cacheVariables": { + "ARROW_JEMALLOC": "OFF", + "ARROW_MIMALLOC": "OFF", + "ARROW_USE_ASAN": "ON" + } + }, { "name": "ninja-debug-cuda", "inherits": [ diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index eb3fd0e304750..c2f32cf196711 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -21,6 +21,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" +#include "arrow/dataset/file_parquet.h" #include "arrow/dataset/partition.h" #include "arrow/dataset/test_util_internal.h" #include "arrow/filesystem/mockfs.h" @@ -801,5 +802,62 @@ TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) { *ArrayFromJSON(partition_field->type(), R"(["one"])")); } +namespace fs = arrow::fs; +namespace ds = arrow::dataset; +namespace cp = arrow::compute; + +arrow::Result> GetFileSystemFromUri( + const std::string& uri, std::string* path) { + return fs::FileSystemFromUri(uri, path); +} + +arrow::Result> GetDatasetFromDirectory( + std::shared_ptr fs, std::shared_ptr format, + std::string dir) { + // Find all files under `path` + fs::FileSelector s; + s.base_dir = dir; + s.recursive = true; + + ds::FileSystemFactoryOptions options; + // The factory will try to build a child dataset. + ARROW_ASSIGN_OR_RAISE(auto factory, + ds::FileSystemDatasetFactory::Make(fs, s, format, options)); + + // Try to infer a common schema for all files. + ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect({})); + // Caller can optionally decide another schema as long as it is compatible + // with the previous one, e.g. `factory->Finish(compatible_schema)`. + ARROW_ASSIGN_OR_RAISE(auto child, factory->Finish()); + + ds::DatasetVector children{1, child}; + auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); + + return dataset; +} + +arrow::Result> GetScannerFromDataset( + std::shared_ptr dataset) { + ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan()); + + ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(true)); + + return scanner_builder->Finish(); +} + +TEST(GH41813, GH41813) { + std::string uri = + "file:///Users/zanmato/Downloads/arrow_segfault_reproducer_2/data/reduced_attempt3"; + std::string path; + auto format = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto fs, GetFileSystemFromUri(uri, &path)); + ASSERT_OK_AND_ASSIGN(auto dataset, GetDatasetFromDirectory(fs, format, path)); + + ASSERT_OK_AND_ASSIGN(auto scanner, GetScannerFromDataset(dataset)); + + ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); + std::cout << "Table size: " << table->num_rows() << "\n"; +} + } // namespace dataset } // namespace arrow From c01227c7d597c4b2077114543d2f83a7ef0ed40d Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 14 Jun 2024 01:38:22 +0800 Subject: [PATCH 02/24] Repro done --- cpp/src/arrow/dataset/dataset_test.cc | 32 ++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index c2f32cf196711..6f4a86c0dc57c 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -802,9 +802,10 @@ TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) { *ArrayFromJSON(partition_field->type(), R"(["one"])")); } -namespace fs = arrow::fs; -namespace ds = arrow::dataset; +namespace ac = arrow::acero; namespace cp = arrow::compute; +namespace ds = arrow::dataset; +namespace fs = arrow::fs; arrow::Result> GetFileSystemFromUri( const std::string& uri, std::string* path) { @@ -820,6 +821,7 @@ arrow::Result> GetDatasetFromDirectory( s.recursive = true; ds::FileSystemFactoryOptions options; + options.partitioning = DirectoryPartitioning::MakeFactory({"year", "month"}); // The factory will try to build a child dataset. ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(fs, s, format, options)); @@ -845,6 +847,16 @@ arrow::Result> GetScannerFromDataset( return scanner_builder->Finish(); } +arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) { + // collect sink_reader into a Table + std::shared_ptr response_table; + ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan))); + + std::cout << "Results : " << response_table->ToString() << std::endl; + + return arrow::Status::OK(); +} + TEST(GH41813, GH41813) { std::string uri = "file:///Users/zanmato/Downloads/arrow_segfault_reproducer_2/data/reduced_attempt3"; @@ -852,11 +864,19 @@ TEST(GH41813, GH41813) { auto format = std::make_shared(); ASSERT_OK_AND_ASSIGN(auto fs, GetFileSystemFromUri(uri, &path)); ASSERT_OK_AND_ASSIGN(auto dataset, GetDatasetFromDirectory(fs, format, path)); - ASSERT_OK_AND_ASSIGN(auto scanner, GetScannerFromDataset(dataset)); - - ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); - std::cout << "Table size: " << table->num_rows() << "\n"; + auto scan_options = std::make_shared(); + scan_options->projection = cp::project({}, {}); // create empty projection + auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, scan_options}; + ac::Declaration scan{"scan", std::move(scan_node_options)}; + + auto count_options = std::make_shared(cp::CountOptions::ONLY_VALID); + auto aggregate_options = ac::AggregateNodeOptions{ + /*aggregates=*/{{"hash_count", count_options, "date", "count(date)"}}, + /*keys=*/{"year", "month", "cid"}}; + ac::Declaration aggregate{"aggregate", {std::move(scan)}, std::move(aggregate_options)}; + + ASSERT_OK(ExecutePlanAndCollectAsTable(std::move(aggregate))); } } // namespace dataset From 42781f4f38258426ca25d12ae9f6f09e27a26d3d Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 17 Jun 2024 10:52:31 +0800 Subject: [PATCH 03/24] UT --- .../compute/row/compare_internal_avx2.cc | 26 +++++- cpp/src/arrow/compute/row/compare_test.cc | 89 +++++++++++++++++++ 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 18f656a2e458d..82991a8f1d162 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include "arrow/compute/row/compare_internal.h" #include "arrow/compute/util.h" @@ -281,7 +282,11 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r ARROW_DCHECK(false); } - __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1); + // const int* normalized_right_base = (const int*)(right_base + 0x80000000ull); + // __m256i normalized_offset_right = + // _mm256_sub_epi32(offset_right, _mm256_set1_epi32(0x80000000)); + // __m256i right = _mm256_i32gather_epi32(normalized_right_base, normalized_offset_right, 1); + __m256i right = _mm256_i32gather_epi32(right_base, offset_right, 1); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -330,7 +335,11 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas ARROW_DCHECK(false); } - __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1); + // const int* normalized_right_base = (const int*)(right_base + 0x80000000ull); + // __m256i normalized_offset_right = + // _mm256_sub_epi32(offset_right, _mm256_set1_epi32(0x80000000)); + // __m256i right = _mm256_i32gather_epi32(normalized_right_base, normalized_offset_right, 1); + __m256i right = _mm256_i32gather_epi32(right_base, offset_right, 1); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -670,5 +679,18 @@ uint32_t KeyCompare::CompareVarBinaryColumnToRow_avx2( return num_rows_to_compare; } +void RossiTest() { + size_t size = 0x100000000ull + 2 * sizeof(uint32_t); + uint32_t* data = new uint32_t[size / sizeof(uint32_t)]; + data[0] = 0xDEADBEEF; + data[0x100000000ull / sizeof(uint32_t) + 1] = 0xFEEBDAED; + __m256i offset = _mm256_setr_epi32(-4, 0, 0, 0, 0, 0, 0, 0); + __m256i content = _mm256_i32gather_epi32(data + 1, offset, 1); + std::cout << "Content: " << std::hex << _mm256_extract_epi32(content, 0) << std::endl; + int32_t i_2g = 0x80000000; + int32_t i_over_2g = 0x800000AB; + std::cout << std::hex << i_over_2g - i_2g << std::endl; +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 4044049b10863..a389f529ba82f 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -17,6 +17,7 @@ #include +#include "arrow/array/builder_binary.h" #include "arrow/compute/row/compare_internal.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" @@ -164,5 +165,93 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { } } +// Specialized case for GH-41813. +TEST(KeyCompare, CompareColumnsToRowsLarge) { + if constexpr (sizeof(void*) == 4) { + GTEST_SKIP() << "Test only works on 64-bit platforms"; + } + + constexpr auto fsb_length = 128 * 1024 * 1024; + + constexpr auto num_rows_base = 18; + MemoryPool* pool = default_memory_pool(); + TempVectorStack stack; + ASSERT_OK( + stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows_base))); + + // An array containing 17 null rows and one 'X...' row. + std::shared_ptr column_fsb; + { + FixedSizeBinaryBuilder builder(fixed_size_binary(fsb_length), pool); + ASSERT_OK(builder.Reserve(num_rows_base)); + std::string x(fsb_length, 'X'), y(fsb_length, 'Y'); + for (int i = 0; i < num_rows_base - 1; ++i) { + ASSERT_OK(builder.Append(x.data())); + } + ASSERT_OK(builder.Append(y.data())); + ASSERT_OK(builder.Finish(&column_fsb)); + } + std::shared_ptr column_binary; + { + BinaryBuilder builder(binary(), pool); + ASSERT_OK(builder.AppendNulls(num_rows_base)); + ASSERT_OK(builder.Finish(&column_binary)); + } + ExecBatch batch_base({column_fsb, column_binary}, num_rows_base); + + std::vector column_metadatas_base; + ASSERT_OK(ColumnMetadatasFromExecBatch(batch_base, &column_metadatas_base)); + std::vector column_arrays_base; + ASSERT_OK(ColumnArraysFromExecBatch(batch_base, &column_arrays_base)); + + RowTableMetadata table_metadata_right; + table_metadata_right.FromColumnMetadataVector(column_metadatas_base, sizeof(uint64_t), + sizeof(uint64_t)); + + RowTableImpl row_table; + ASSERT_OK(row_table.Init(pool, table_metadata_right)); + + // Encode row table with 18 rows, so that the last row is placed at over 2GB offset. + constexpr auto num_rows_right = num_rows_base; + RowTableEncoder row_encoder; + row_encoder.Init(column_metadatas_base, sizeof(uint64_t), sizeof(uint64_t)); + row_encoder.PrepareEncodeSelected(0, num_rows_right, column_arrays_base); + std::array row_ids_right; + std::iota(row_ids_right.begin(), row_ids_right.end(), 0); + // for (int i = 0; i < num_rows_right - 1; ++i) { + // row_ids_right[i] = 0; + // } + // row_ids_right[num_rows_right - 1] = 1; + ASSERT_OK(row_encoder.EncodeSelected(&row_table, num_rows_right, row_ids_right.data())); + + ASSERT_GT(row_table.offsets()[num_rows_right - 1], 0x80000000u); + + constexpr auto num_rows_left = 16; + std::vector row_ids_left(num_rows_left, num_rows_base - 1); + + LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack}; + + { + uint32_t num_rows_no_match; + std::vector row_ids_out(num_rows_left); + KeyCompare::CompareColumnsToRows(num_rows_left, NULLPTR, row_ids_left.data(), &ctx, + &num_rows_no_match, row_ids_out.data(), + column_arrays_base, row_table, true, NULLPTR); + ASSERT_EQ(num_rows_no_match, 0); + ASSERT_EQ(row_ids_out[0], 0); + } + + // { + // std::vector match_bitvector(BytesForBits(num_rows)); + // KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, + // NULLPTR, NULLPTR, column_arrays_left, row_table, + // true, match_bitvector.data()); + // for (int i = 0; i < num_rows; ++i) { + // SCOPED_TRACE(i); + // ASSERT_EQ(arrow::bit_util::GetBit(match_bitvector.data(), i), i != 6); + // } + // } +} + } // namespace compute } // namespace arrow From 135f2950bf7c2fa63d6c72a5bcb35e3b26a4bcbd Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 17 Jun 2024 20:40:38 +0800 Subject: [PATCH 04/24] Finish UT --- cpp/src/arrow/compute/row/compare_test.cc | 121 +++++++++++----------- 1 file changed, 59 insertions(+), 62 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index a389f529ba82f..94e35f8af261d 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -17,10 +17,11 @@ #include -#include "arrow/array/builder_binary.h" #include "arrow/compute/row/compare_internal.h" +#include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" +#include "arrow/util/bitmap_ops.h" namespace arrow { namespace compute { @@ -165,92 +166,88 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { } } -// Specialized case for GH-41813. +// Compare columns to rows at offsets over 2GB within a row table. +// Certain AVX2 instructions may behave unexpectedly causing troubles like GH-41813. TEST(KeyCompare, CompareColumnsToRowsLarge) { if constexpr (sizeof(void*) == 4) { GTEST_SKIP() << "Test only works on 64-bit platforms"; } - constexpr auto fsb_length = 128 * 1024 * 1024; + // The idea of this case is to create a row table using one fixed length column and one + // var length column (so the row is hence var length and has offset buffer), with the + // overall data size exceeding 2GB. Then compare each row with itself. + constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll; + // The compare function requires the row id of the left column to be uint16_t, hence the + // number of rows. + constexpr int64_t num_rows = std::numeric_limits::max() + 1; + // The var length column should be a little smaller than 2GB to WAR the capacity + // limitation in the builder. + constexpr int32_t var_length = two_gb / num_rows - 1; + const int32_t fixed_length = uint64()->byte_width(); + // The overall size should be larger than 2GB. + ASSERT_GT((var_length + fixed_length) * num_rows, two_gb); - constexpr auto num_rows_base = 18; MemoryPool* pool = default_memory_pool(); TempVectorStack stack; - ASSERT_OK( - stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows_base))); - - // An array containing 17 null rows and one 'X...' row. - std::shared_ptr column_fsb; - { - FixedSizeBinaryBuilder builder(fixed_size_binary(fsb_length), pool); - ASSERT_OK(builder.Reserve(num_rows_base)); - std::string x(fsb_length, 'X'), y(fsb_length, 'Y'); - for (int i = 0; i < num_rows_base - 1; ++i) { - ASSERT_OK(builder.Append(x.data())); - } - ASSERT_OK(builder.Append(y.data())); - ASSERT_OK(builder.Finish(&column_fsb)); - } - std::shared_ptr column_binary; - { - BinaryBuilder builder(binary(), pool); - ASSERT_OK(builder.AppendNulls(num_rows_base)); - ASSERT_OK(builder.Finish(&column_binary)); - } - ExecBatch batch_base({column_fsb, column_binary}, num_rows_base); - - std::vector column_metadatas_base; - ASSERT_OK(ColumnMetadatasFromExecBatch(batch_base, &column_metadatas_base)); - std::vector column_arrays_base; - ASSERT_OK(ColumnArraysFromExecBatch(batch_base, &column_arrays_base)); + ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); + // A fixed length array containing random numbers. + ASSERT_OK_AND_ASSIGN(auto column_fixed_length, + ::arrow::gen::Random(uint64())->Generate(num_rows)); + // A var length array containing 'X' repeated var_length times. + ASSERT_OK_AND_ASSIGN( + auto column_var_length, + ::arrow::gen::Constant(std::make_shared(std::string(var_length, 'X'))) + ->Generate(num_rows)); + ExecBatch batch({column_fixed_length, column_var_length}, num_rows); + + std::vector column_metadatas; + ASSERT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas)); + std::vector column_arrays; + ASSERT_OK(ColumnArraysFromExecBatch(batch, &column_arrays)); + + // The row table (right side). RowTableMetadata table_metadata_right; - table_metadata_right.FromColumnMetadataVector(column_metadatas_base, sizeof(uint64_t), + table_metadata_right.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t), sizeof(uint64_t)); - RowTableImpl row_table; ASSERT_OK(row_table.Init(pool, table_metadata_right)); - - // Encode row table with 18 rows, so that the last row is placed at over 2GB offset. - constexpr auto num_rows_right = num_rows_base; - RowTableEncoder row_encoder; - row_encoder.Init(column_metadatas_base, sizeof(uint64_t), sizeof(uint64_t)); - row_encoder.PrepareEncodeSelected(0, num_rows_right, column_arrays_base); - std::array row_ids_right; + std::vector row_ids_right(num_rows); std::iota(row_ids_right.begin(), row_ids_right.end(), 0); - // for (int i = 0; i < num_rows_right - 1; ++i) { - // row_ids_right[i] = 0; - // } - // row_ids_right[num_rows_right - 1] = 1; - ASSERT_OK(row_encoder.EncodeSelected(&row_table, num_rows_right, row_ids_right.data())); + RowTableEncoder row_encoder; + row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t)); + row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays); + ASSERT_OK(row_encoder.EncodeSelected(&row_table, static_cast(num_rows), + row_ids_right.data())); - ASSERT_GT(row_table.offsets()[num_rows_right - 1], 0x80000000u); + ASSERT_TRUE(row_table.offsets()); + // The whole point of this test. + ASSERT_GT(row_table.offsets()[num_rows - 1], two_gb); - constexpr auto num_rows_left = 16; - std::vector row_ids_left(num_rows_left, num_rows_base - 1); + // The left rows. + std::vector row_ids_left(num_rows); + std::iota(row_ids_left.begin(), row_ids_left.end(), 0); LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack}; { uint32_t num_rows_no_match; - std::vector row_ids_out(num_rows_left); - KeyCompare::CompareColumnsToRows(num_rows_left, NULLPTR, row_ids_left.data(), &ctx, + std::vector row_ids_out(num_rows); + KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, &num_rows_no_match, row_ids_out.data(), - column_arrays_base, row_table, true, NULLPTR); + column_arrays, row_table, true, NULLPTR); ASSERT_EQ(num_rows_no_match, 0); - ASSERT_EQ(row_ids_out[0], 0); } - // { - // std::vector match_bitvector(BytesForBits(num_rows)); - // KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, - // NULLPTR, NULLPTR, column_arrays_left, row_table, - // true, match_bitvector.data()); - // for (int i = 0; i < num_rows; ++i) { - // SCOPED_TRACE(i); - // ASSERT_EQ(arrow::bit_util::GetBit(match_bitvector.data(), i), i != 6); - // } - // } + { + std::vector match_bitvector(BytesForBits(num_rows)); + KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, + NULLPTR, NULLPTR, column_arrays, row_table, true, + match_bitvector.data()); + + ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), + num_rows); + } } } // namespace compute From 2b43288ca31cfb9fc783f9781779949bdbc13f2e Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 17 Jun 2024 21:04:30 +0800 Subject: [PATCH 05/24] Change fix length to uint32 --- cpp/src/arrow/compute/row/compare_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 94e35f8af261d..bd0ef2007e27f 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -183,7 +183,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // The var length column should be a little smaller than 2GB to WAR the capacity // limitation in the builder. constexpr int32_t var_length = two_gb / num_rows - 1; - const int32_t fixed_length = uint64()->byte_width(); + const int32_t fixed_length = uint32()->byte_width(); // The overall size should be larger than 2GB. ASSERT_GT((var_length + fixed_length) * num_rows, two_gb); @@ -193,7 +193,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // A fixed length array containing random numbers. ASSERT_OK_AND_ASSIGN(auto column_fixed_length, - ::arrow::gen::Random(uint64())->Generate(num_rows)); + ::arrow::gen::Random(uint32())->Generate(num_rows)); // A var length array containing 'X' repeated var_length times. ASSERT_OK_AND_ASSIGN( auto column_var_length, From 771ad49ddba358622ac4ab8209782e3d7f111ba9 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 17 Jun 2024 21:41:52 +0800 Subject: [PATCH 06/24] Fix --- cpp/src/arrow/compute/row/compare_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index bd0ef2007e27f..f43b7a7b477ff 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -181,7 +181,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // number of rows. constexpr int64_t num_rows = std::numeric_limits::max() + 1; // The var length column should be a little smaller than 2GB to WAR the capacity - // limitation in the builder. + // limitation in the var length builder. constexpr int32_t var_length = two_gb / num_rows - 1; const int32_t fixed_length = uint32()->byte_width(); // The overall size should be larger than 2GB. From d998c45af49602d54cf8c3d8f0ac5d6bba9596be Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 01:10:44 +0800 Subject: [PATCH 07/24] Done --- .../compute/row/compare_internal_avx2.cc | 36 ++++++---- cpp/src/arrow/compute/row/compare_test.cc | 66 +++++++++++++++---- 2 files changed, 75 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 82991a8f1d162..cbe7ce98cae7f 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -252,6 +252,24 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( } } +namespace { + +inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset, + const int scale = 1) { + auto normalized_base = base + 0x80000000ull / sizeof(int); + __m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(0x80000000)); + return _mm256_i32gather_epi32(normalized_base, normalized_offset, 1); +} + +inline __m256i UnsignedOffsetSafeGather64(long long const* base, __m128i offset, + const int scale = 1) { + auto normalized_base = base + 0x80000000ull / sizeof(long long); + __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(0x80000000)); + return _mm256_i32gather_epi64(normalized_base, normalized_offset, 1); +} + +} // namespace + template inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* right_base, __m256i irow_left, __m256i offset_right, @@ -282,11 +300,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r ARROW_DCHECK(false); } - // const int* normalized_right_base = (const int*)(right_base + 0x80000000ull); - // __m256i normalized_offset_right = - // _mm256_sub_epi32(offset_right, _mm256_set1_epi32(0x80000000)); - // __m256i right = _mm256_i32gather_epi32(normalized_right_base, normalized_offset_right, 1); - __m256i right = _mm256_i32gather_epi32(right_base, offset_right, 1); + __m256i right = UnsignedOffsetSafeGather32((int const*)right_base, offset_right, 1); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -335,11 +349,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas ARROW_DCHECK(false); } - // const int* normalized_right_base = (const int*)(right_base + 0x80000000ull); - // __m256i normalized_offset_right = - // _mm256_sub_epi32(offset_right, _mm256_set1_epi32(0x80000000)); - // __m256i right = _mm256_i32gather_epi32(normalized_right_base, normalized_offset_right, 1); - __m256i right = _mm256_i32gather_epi32(right_base, offset_right, 1); + __m256i right = UnsignedOffsetSafeGather32((int const*)right_base, offset_right, 1); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -376,9 +386,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig auto right_base_i64 = reinterpret_cast(right_base); __m256i right_lo = - _mm256_i32gather_epi64(right_base_i64, _mm256_castsi256_si128(offset_right), 1); - __m256i right_hi = _mm256_i32gather_epi64(right_base_i64, - _mm256_extracti128_si256(offset_right, 1), 1); + UnsignedOffsetSafeGather64(right_base_i64, _mm256_castsi256_si128(offset_right), 1); + __m256i right_hi = UnsignedOffsetSafeGather64( + right_base_i64, _mm256_extracti128_si256(offset_right, 1), 1); uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo)); uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi)); return result_lo | (static_cast(result_hi) << 32); diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index f43b7a7b477ff..ca6768eb37145 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -173,33 +173,44 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { GTEST_SKIP() << "Test only works on 64-bit platforms"; } - // The idea of this case is to create a row table using one fixed length column and one + // The idea of this case is to create a row table using several fixed length columns one // var length column (so the row is hence var length and has offset buffer), with the // overall data size exceeding 2GB. Then compare each row with itself. constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll; // The compare function requires the row id of the left column to be uint16_t, hence the // number of rows. constexpr int64_t num_rows = std::numeric_limits::max() + 1; + const std::vector> fixed_length_types{uint64(), uint32()}; // The var length column should be a little smaller than 2GB to WAR the capacity // limitation in the var length builder. constexpr int32_t var_length = two_gb / num_rows - 1; - const int32_t fixed_length = uint32()->byte_width(); + auto row_size = + std::accumulate(fixed_length_types.begin(), fixed_length_types.end(), var_length, + [](int64_t acc, const std::shared_ptr& type) { + return acc + type->byte_width(); + }); // The overall size should be larger than 2GB. - ASSERT_GT((var_length + fixed_length) * num_rows, two_gb); + ASSERT_GT(row_size * num_rows, two_gb); MemoryPool* pool = default_memory_pool(); TempVectorStack stack; ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); - // A fixed length array containing random numbers. - ASSERT_OK_AND_ASSIGN(auto column_fixed_length, - ::arrow::gen::Random(uint32())->Generate(num_rows)); - // A var length array containing 'X' repeated var_length times. - ASSERT_OK_AND_ASSIGN( - auto column_var_length, - ::arrow::gen::Constant(std::make_shared(std::string(var_length, 'X'))) - ->Generate(num_rows)); - ExecBatch batch({column_fixed_length, column_var_length}, num_rows); + std::vector columns; + { + // Several fixed length arrays containing random content. + for (const auto& type : fixed_length_types) { + ASSERT_OK_AND_ASSIGN(auto column, ::arrow::gen::Random(type)->Generate(num_rows)); + columns.push_back(std::move(column)); + } + // A var length array containing 'X' repeated var_length times. + ASSERT_OK_AND_ASSIGN(auto column_var_length, + ::arrow::gen::Constant( + std::make_shared(std::string(var_length, 'X'))) + ->Generate(num_rows)); + columns.push_back(std::move(column_var_length)); + } + ExecBatch batch(std::move(columns), num_rows); std::vector column_metadatas; ASSERT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas)); @@ -231,16 +242,43 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack}; { + // No selection, output no match row ids. uint32_t num_rows_no_match; std::vector row_ids_out(num_rows); - KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, - &num_rows_no_match, row_ids_out.data(), + KeyCompare::CompareColumnsToRows( + num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, + &num_rows_no_match, row_ids_out.data(), column_arrays, row_table, true, NULLPTR); + ASSERT_EQ(num_rows_no_match, 0); + } + + { + // With selection, output no match row ids. + uint32_t num_rows_no_match; + std::vector row_ids_out(num_rows); + std::vector selection_left(num_rows); + std::iota(selection_left.begin(), selection_left.end(), 0); + KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), row_ids_left.data(), + &ctx, &num_rows_no_match, row_ids_out.data(), column_arrays, row_table, true, NULLPTR); ASSERT_EQ(num_rows_no_match, 0); } { + // No selection, output match bit vector. + std::vector match_bitvector(BytesForBits(num_rows)); + KeyCompare::CompareColumnsToRows( + num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, NULLPTR, + NULLPTR, column_arrays, row_table, true, match_bitvector.data()); + + ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), + num_rows); + } + + { + // With selection, output match bit vector. std::vector match_bitvector(BytesForBits(num_rows)); + std::vector selection_left(num_rows); + std::iota(selection_left.begin(), selection_left.end(), 0); KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays, row_table, true, match_bitvector.data()); From dcb1306c4569e35bd07bb460c016a62a2c5334f2 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 01:13:21 +0800 Subject: [PATCH 08/24] Revert --- cpp/CMakePresets.json | 13 ---- .../compute/row/compare_internal_avx2.cc | 14 ---- cpp/src/arrow/dataset/dataset_test.cc | 78 ------------------- 3 files changed, 105 deletions(-) diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index 3f0dd45dd050d..cb4cdfc03ac82 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -258,19 +258,6 @@ "displayName": "Debug build with tests and more optional components", "cacheVariables": {} }, - { - "name": "fix-41813", - "inherits": [ - "base-debug", - "features-main" - ], - "displayName": "Fix 41813", - "cacheVariables": { - "ARROW_JEMALLOC": "OFF", - "ARROW_MIMALLOC": "OFF", - "ARROW_USE_ASAN": "ON" - } - }, { "name": "ninja-debug-cuda", "inherits": [ diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index cbe7ce98cae7f..2abefc45f2098 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -16,7 +16,6 @@ // under the License. #include -#include #include "arrow/compute/row/compare_internal.h" #include "arrow/compute/util.h" @@ -689,18 +688,5 @@ uint32_t KeyCompare::CompareVarBinaryColumnToRow_avx2( return num_rows_to_compare; } -void RossiTest() { - size_t size = 0x100000000ull + 2 * sizeof(uint32_t); - uint32_t* data = new uint32_t[size / sizeof(uint32_t)]; - data[0] = 0xDEADBEEF; - data[0x100000000ull / sizeof(uint32_t) + 1] = 0xFEEBDAED; - __m256i offset = _mm256_setr_epi32(-4, 0, 0, 0, 0, 0, 0, 0); - __m256i content = _mm256_i32gather_epi32(data + 1, offset, 1); - std::cout << "Content: " << std::hex << _mm256_extract_epi32(content, 0) << std::endl; - int32_t i_2g = 0x80000000; - int32_t i_over_2g = 0x800000AB; - std::cout << std::hex << i_over_2g - i_2g << std::endl; -} - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 6f4a86c0dc57c..eb3fd0e304750 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -21,7 +21,6 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" -#include "arrow/dataset/file_parquet.h" #include "arrow/dataset/partition.h" #include "arrow/dataset/test_util_internal.h" #include "arrow/filesystem/mockfs.h" @@ -802,82 +801,5 @@ TEST(TestDictPartitionColumn, SelectPartitionColumnFilterPhysicalColumn) { *ArrayFromJSON(partition_field->type(), R"(["one"])")); } -namespace ac = arrow::acero; -namespace cp = arrow::compute; -namespace ds = arrow::dataset; -namespace fs = arrow::fs; - -arrow::Result> GetFileSystemFromUri( - const std::string& uri, std::string* path) { - return fs::FileSystemFromUri(uri, path); -} - -arrow::Result> GetDatasetFromDirectory( - std::shared_ptr fs, std::shared_ptr format, - std::string dir) { - // Find all files under `path` - fs::FileSelector s; - s.base_dir = dir; - s.recursive = true; - - ds::FileSystemFactoryOptions options; - options.partitioning = DirectoryPartitioning::MakeFactory({"year", "month"}); - // The factory will try to build a child dataset. - ARROW_ASSIGN_OR_RAISE(auto factory, - ds::FileSystemDatasetFactory::Make(fs, s, format, options)); - - // Try to infer a common schema for all files. - ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect({})); - // Caller can optionally decide another schema as long as it is compatible - // with the previous one, e.g. `factory->Finish(compatible_schema)`. - ARROW_ASSIGN_OR_RAISE(auto child, factory->Finish()); - - ds::DatasetVector children{1, child}; - auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children)); - - return dataset; -} - -arrow::Result> GetScannerFromDataset( - std::shared_ptr dataset) { - ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan()); - - ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(true)); - - return scanner_builder->Finish(); -} - -arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) { - // collect sink_reader into a Table - std::shared_ptr response_table; - ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan))); - - std::cout << "Results : " << response_table->ToString() << std::endl; - - return arrow::Status::OK(); -} - -TEST(GH41813, GH41813) { - std::string uri = - "file:///Users/zanmato/Downloads/arrow_segfault_reproducer_2/data/reduced_attempt3"; - std::string path; - auto format = std::make_shared(); - ASSERT_OK_AND_ASSIGN(auto fs, GetFileSystemFromUri(uri, &path)); - ASSERT_OK_AND_ASSIGN(auto dataset, GetDatasetFromDirectory(fs, format, path)); - ASSERT_OK_AND_ASSIGN(auto scanner, GetScannerFromDataset(dataset)); - auto scan_options = std::make_shared(); - scan_options->projection = cp::project({}, {}); // create empty projection - auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, scan_options}; - ac::Declaration scan{"scan", std::move(scan_node_options)}; - - auto count_options = std::make_shared(cp::CountOptions::ONLY_VALID); - auto aggregate_options = ac::AggregateNodeOptions{ - /*aggregates=*/{{"hash_count", count_options, "date", "count(date)"}}, - /*keys=*/{"year", "month", "cid"}}; - ac::Declaration aggregate{"aggregate", {std::move(scan)}, std::move(aggregate_options)}; - - ASSERT_OK(ExecutePlanAndCollectAsTable(std::move(aggregate))); -} - } // namespace dataset } // namespace arrow From 91b72fe831c2d9e8d6a81b9d451299c85bf8ad14 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 01:43:35 +0800 Subject: [PATCH 09/24] Add comment --- cpp/src/arrow/compute/row/compare_internal_avx2.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 2abefc45f2098..1b291dd60ce28 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -253,6 +253,14 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( namespace { +/// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we +/// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row +/// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat +/// it as negative offset and gather the data from undesired address. To avoid this issue, +/// we normalize the addresses by translating `base` `0x80000000` higher, and `offset` +/// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those +/// intrinsics are safe. + inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset, const int scale = 1) { auto normalized_base = base + 0x80000000ull / sizeof(int); From 474cd5695e45ddfb06fb403008aa9354c1c42e31 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 01:46:03 +0800 Subject: [PATCH 10/24] Format --- cpp/src/arrow/compute/row/compare_test.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index ca6768eb37145..08c92d054cd9f 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -269,7 +269,6 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { KeyCompare::CompareColumnsToRows( num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays, row_table, true, match_bitvector.data()); - ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), num_rows); } @@ -282,7 +281,6 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays, row_table, true, match_bitvector.data()); - ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), num_rows); } From 53f6d738e8158fdfd75dc63e991ce1cfd74e363e Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 01:57:29 +0800 Subject: [PATCH 11/24] Fix lint --- cpp/src/arrow/compute/row/compare_internal_avx2.cc | 6 +++--- cpp/src/arrow/compute/row/compare_test.cc | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 1b291dd60ce28..627ee2e8fa071 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -268,9 +268,9 @@ inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset, return _mm256_i32gather_epi32(normalized_base, normalized_offset, 1); } -inline __m256i UnsignedOffsetSafeGather64(long long const* base, __m128i offset, - const int scale = 1) { - auto normalized_base = base + 0x80000000ull / sizeof(long long); +inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base, + __m128i offset, const int scale = 1) { + auto normalized_base = base + 0x80000000ull / sizeof(arrow::util::int64_for_gather_t); __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(0x80000000)); return _mm256_i32gather_epi64(normalized_base, normalized_offset, 1); } diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 08c92d054cd9f..bbc81fc9bb25c 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -173,9 +173,9 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { GTEST_SKIP() << "Test only works on 64-bit platforms"; } - // The idea of this case is to create a row table using several fixed length columns one - // var length column (so the row is hence var length and has offset buffer), with the - // overall data size exceeding 2GB. Then compare each row with itself. + // The idea of this case is to create a row table using several fixed length columns and + // one var length column (so the row is hence var length and has offset buffer), with + // the overall data size exceeding 2GB. Then compare each row with itself. constexpr int64_t two_gb = 2ll * 1024ll * 1024ll * 1024ll; // The compare function requires the row id of the left column to be uint16_t, hence the // number of rows. From 42669619148a9101b49346e810805bc93a561faf Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 01:59:47 +0800 Subject: [PATCH 12/24] Add todo --- cpp/src/arrow/compute/row/compare_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index bbc81fc9bb25c..f23e39deeb66f 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -180,6 +180,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // The compare function requires the row id of the left column to be uint16_t, hence the // number of rows. constexpr int64_t num_rows = std::numeric_limits::max() + 1; + // TODO: This test will fail if we switch the order between uint64 and uint32. const std::vector> fixed_length_types{uint64(), uint32()}; // The var length column should be a little smaller than 2GB to WAR the capacity // limitation in the var length builder. From 7d56722f8bed1ffad705c665865203ffb97df954 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 18 Jun 2024 08:40:20 +0800 Subject: [PATCH 13/24] Fix warning on windows --- cpp/src/arrow/compute/row/compare_test.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index f23e39deeb66f..afde495388997 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -185,11 +185,11 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // The var length column should be a little smaller than 2GB to WAR the capacity // limitation in the var length builder. constexpr int32_t var_length = two_gb / num_rows - 1; - auto row_size = - std::accumulate(fixed_length_types.begin(), fixed_length_types.end(), var_length, - [](int64_t acc, const std::shared_ptr& type) { - return acc + type->byte_width(); - }); + auto row_size = std::accumulate(fixed_length_types.begin(), fixed_length_types.end(), + static_cast(var_length), + [](int64_t acc, const std::shared_ptr& type) { + return acc + type->byte_width(); + }); // The overall size should be larger than 2GB. ASSERT_GT(row_size * num_rows, two_gb); From 04526843c9fceff2172d3d05057842f8fa7f41a8 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 20:28:13 +0800 Subject: [PATCH 14/24] Fix scale for avx2 safe gather --- .../compute/row/compare_internal_avx2.cc | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 627ee2e8fa071..966aa5a34f7be 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -261,17 +261,19 @@ namespace { /// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those /// intrinsics are safe. -inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset, - const int scale = 1) { - auto normalized_base = base + 0x80000000ull / sizeof(int); - __m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(0x80000000)); + +constexpr auto two_gb = 0x80000000ull; + +inline __m256i UnsignedOffsetSafeGather32Scale1(int const* base, __m256i offset) { + auto normalized_base = base + two_gb / sizeof(int); + __m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(two_gb)); return _mm256_i32gather_epi32(normalized_base, normalized_offset, 1); } -inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base, - __m128i offset, const int scale = 1) { - auto normalized_base = base + 0x80000000ull / sizeof(arrow::util::int64_for_gather_t); - __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(0x80000000)); +inline __m256i UnsignedOffsetSafeGather64Scale1(arrow::util::int64_for_gather_t const* base, + __m128i offset) { + auto normalized_base = base + two_gb / sizeof(arrow::util::int64_for_gather_t); + __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(two_gb)); return _mm256_i32gather_epi64(normalized_base, normalized_offset, 1); } @@ -307,7 +309,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r ARROW_DCHECK(false); } - __m256i right = UnsignedOffsetSafeGather32((int const*)right_base, offset_right, 1); + __m256i right = UnsignedOffsetSafeGather32Scale1((int const*)right_base, offset_right); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -356,7 +358,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas ARROW_DCHECK(false); } - __m256i right = UnsignedOffsetSafeGather32((int const*)right_base, offset_right, 1); + __m256i right = UnsignedOffsetSafeGather32Scale1((int const*)right_base, offset_right); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -393,9 +395,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig auto right_base_i64 = reinterpret_cast(right_base); __m256i right_lo = - UnsignedOffsetSafeGather64(right_base_i64, _mm256_castsi256_si128(offset_right), 1); - __m256i right_hi = UnsignedOffsetSafeGather64( - right_base_i64, _mm256_extracti128_si256(offset_right, 1), 1); + UnsignedOffsetSafeGather64Scale1(right_base_i64, _mm256_castsi256_si128(offset_right)); + __m256i right_hi = UnsignedOffsetSafeGather64Scale1( + right_base_i64, _mm256_extracti128_si256(offset_right, 1)); uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo)); uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi)); return result_lo | (static_cast(result_hi) << 32); From 0fc496d5b0563b8c20d1b892b9f504eee2a971df Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 20:28:58 +0800 Subject: [PATCH 15/24] No more todo - its a mis-use --- cpp/src/arrow/compute/row/compare_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index afde495388997..1b7e0ef62eb08 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -180,7 +180,6 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // The compare function requires the row id of the left column to be uint16_t, hence the // number of rows. constexpr int64_t num_rows = std::numeric_limits::max() + 1; - // TODO: This test will fail if we switch the order between uint64 and uint32. const std::vector> fixed_length_types{uint64(), uint32()}; // The var length column should be a little smaller than 2GB to WAR the capacity // limitation in the var length builder. From 0ea8674525b937d0fa6e0d418646e6bef65f20ed Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 20:32:10 +0800 Subject: [PATCH 16/24] Format --- cpp/src/arrow/compute/row/compare_internal_avx2.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 966aa5a34f7be..14c6bc44ef396 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -261,7 +261,6 @@ namespace { /// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those /// intrinsics are safe. - constexpr auto two_gb = 0x80000000ull; inline __m256i UnsignedOffsetSafeGather32Scale1(int const* base, __m256i offset) { @@ -270,8 +269,8 @@ inline __m256i UnsignedOffsetSafeGather32Scale1(int const* base, __m256i offset) return _mm256_i32gather_epi32(normalized_base, normalized_offset, 1); } -inline __m256i UnsignedOffsetSafeGather64Scale1(arrow::util::int64_for_gather_t const* base, - __m128i offset) { +inline __m256i UnsignedOffsetSafeGather64Scale1( + arrow::util::int64_for_gather_t const* base, __m128i offset) { auto normalized_base = base + two_gb / sizeof(arrow::util::int64_for_gather_t); __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(two_gb)); return _mm256_i32gather_epi64(normalized_base, normalized_offset, 1); @@ -394,8 +393,8 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig } auto right_base_i64 = reinterpret_cast(right_base); - __m256i right_lo = - UnsignedOffsetSafeGather64Scale1(right_base_i64, _mm256_castsi256_si128(offset_right)); + __m256i right_lo = UnsignedOffsetSafeGather64Scale1( + right_base_i64, _mm256_castsi256_si128(offset_right)); __m256i right_hi = UnsignedOffsetSafeGather64Scale1( right_base_i64, _mm256_extracti128_si256(offset_right, 1)); uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo)); From f9c88bbd598dce20ba5c634e4b7ad671a743fd46 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 20:48:19 +0800 Subject: [PATCH 17/24] Add a new todo --- cpp/src/arrow/compute/row/compare_internal_avx2.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 14c6bc44ef396..d8f431981ac7e 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -236,6 +236,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( irow_right = _mm256_loadu_si256(reinterpret_cast(left_to_right_map) + i); } + // TODO: Need to test if this gather is OK when irow_right is larger than + // 0x80000000u. __m256i offset_right = _mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4); offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row)); From e8a407c9e8b630037f4c10cecada01008a6c84a5 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 21:02:47 +0800 Subject: [PATCH 18/24] Support more scales for safe gather --- .../compute/row/compare_internal_avx2.cc | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index d8f431981ac7e..02ffc7cf300bf 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -265,17 +265,19 @@ namespace { constexpr auto two_gb = 0x80000000ull; -inline __m256i UnsignedOffsetSafeGather32Scale1(int const* base, __m256i offset) { +template +inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset) { auto normalized_base = base + two_gb / sizeof(int); - __m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(two_gb)); - return _mm256_i32gather_epi32(normalized_base, normalized_offset, 1); + __m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(two_gb / scale)); + return _mm256_i32gather_epi32(normalized_base, normalized_offset, scale); } -inline __m256i UnsignedOffsetSafeGather64Scale1( - arrow::util::int64_for_gather_t const* base, __m128i offset) { +template +inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base, + __m128i offset) { auto normalized_base = base + two_gb / sizeof(arrow::util::int64_for_gather_t); - __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(two_gb)); - return _mm256_i32gather_epi64(normalized_base, normalized_offset, 1); + __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(two_gb / scale)); + return _mm256_i32gather_epi64(normalized_base, normalized_offset, scale); } } // namespace @@ -310,7 +312,7 @@ inline uint64_t CompareSelected8_avx2(const uint8_t* left_base, const uint8_t* r ARROW_DCHECK(false); } - __m256i right = UnsignedOffsetSafeGather32Scale1((int const*)right_base, offset_right); + __m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); @@ -359,7 +361,7 @@ inline uint64_t Compare8_avx2(const uint8_t* left_base, const uint8_t* right_bas ARROW_DCHECK(false); } - __m256i right = UnsignedOffsetSafeGather32Scale1((int const*)right_base, offset_right); + __m256i right = UnsignedOffsetSafeGather32<1>((int const*)right_base, offset_right); if (column_width != sizeof(uint32_t)) { constexpr uint32_t mask = column_width == 0 || column_width == 1 ? 0xff : 0xffff; right = _mm256_and_si256(right, _mm256_set1_epi32(mask)); From a72dc253fccba32b8687bc596b44534057b11922 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 21:02:59 +0800 Subject: [PATCH 19/24] Comment --- cpp/src/arrow/compute/row/compare_test.cc | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 1b7e0ef62eb08..690d505b8fd19 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -245,9 +245,10 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // No selection, output no match row ids. uint32_t num_rows_no_match; std::vector row_ids_out(num_rows); - KeyCompare::CompareColumnsToRows( - num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, - &num_rows_no_match, row_ids_out.data(), column_arrays, row_table, true, NULLPTR); + KeyCompare::CompareColumnsToRows(num_rows, /*sel_left_maybe_null=*/NULLPTR, + row_ids_left.data(), &ctx, &num_rows_no_match, + row_ids_out.data(), column_arrays, row_table, + /*are_cols_in_encoding_order=*/true, NULLPTR); ASSERT_EQ(num_rows_no_match, 0); } @@ -259,7 +260,8 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { std::iota(selection_left.begin(), selection_left.end(), 0); KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), row_ids_left.data(), &ctx, &num_rows_no_match, row_ids_out.data(), - column_arrays, row_table, true, NULLPTR); + column_arrays, row_table, + /*are_cols_in_encoding_order=*/true, NULLPTR); ASSERT_EQ(num_rows_no_match, 0); } @@ -268,7 +270,8 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { std::vector match_bitvector(BytesForBits(num_rows)); KeyCompare::CompareColumnsToRows( num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, NULLPTR, - NULLPTR, column_arrays, row_table, true, match_bitvector.data()); + NULLPTR, column_arrays, row_table, /*are_cols_in_encoding_order=*/true, + match_bitvector.data()); ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), num_rows); } @@ -278,9 +281,9 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { std::vector match_bitvector(BytesForBits(num_rows)); std::vector selection_left(num_rows); std::iota(selection_left.begin(), selection_left.end(), 0); - KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, - NULLPTR, NULLPTR, column_arrays, row_table, true, - match_bitvector.data()); + KeyCompare::CompareColumnsToRows( + num_rows, NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays, + row_table, /*are_cols_in_encoding_order=*/true, match_bitvector.data()); ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), num_rows); } From 6b085c7e46f4c0fa7aa5ac7c2da62598d2893f18 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 21:04:42 +0800 Subject: [PATCH 20/24] Fix --- cpp/src/arrow/compute/row/compare_internal_avx2.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index 02ffc7cf300bf..d96292d6a089f 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -397,9 +397,9 @@ inline uint64_t Compare8_64bit_avx2(const uint8_t* left_base, const uint8_t* rig } auto right_base_i64 = reinterpret_cast(right_base); - __m256i right_lo = UnsignedOffsetSafeGather64Scale1( - right_base_i64, _mm256_castsi256_si128(offset_right)); - __m256i right_hi = UnsignedOffsetSafeGather64Scale1( + __m256i right_lo = + UnsignedOffsetSafeGather64<1>(right_base_i64, _mm256_castsi256_si128(offset_right)); + __m256i right_hi = UnsignedOffsetSafeGather64<1>( right_base_i64, _mm256_extracti128_si256(offset_right, 1)); uint32_t result_lo = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo)); uint32_t result_hi = _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi)); From 7ed9f0fe85280d7cc94333e7c56f97c6fcb57cac Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 24 Jun 2024 23:57:13 +0800 Subject: [PATCH 21/24] Address comments --- .../compute/row/compare_internal_avx2.cc | 37 +++++++++++-------- cpp/src/arrow/compute/row/compare_test.cc | 5 ++- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_internal_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc index d96292d6a089f..ec511aa03a6d0 100644 --- a/cpp/src/arrow/compute/row/compare_internal_avx2.cc +++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc @@ -255,29 +255,34 @@ uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2( namespace { -/// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we -/// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row -/// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat -/// it as negative offset and gather the data from undesired address. To avoid this issue, -/// we normalize the addresses by translating `base` `0x80000000` higher, and `offset` -/// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those -/// intrinsics are safe. +// Intrinsics `_mm256_i32gather_epi32/64` treat the `vindex` as signed integer, and we +// are using `uint32_t` to represent the offset, in range of [0, 4G), within the row +// table. When the offset is larger than `0x80000000` (2GB), those intrinsics will treat +// it as negative offset and gather the data from undesired address. To avoid this issue, +// we normalize the addresses by translating `base` `0x80000000` higher, and `offset` +// `0x80000000` lower. This way, the offset is always in range of [-2G, 2G) and those +// intrinsics are safe. -constexpr auto two_gb = 0x80000000ull; +constexpr uint64_t kTwoGB = 0x80000000ull; -template +template inline __m256i UnsignedOffsetSafeGather32(int const* base, __m256i offset) { - auto normalized_base = base + two_gb / sizeof(int); - __m256i normalized_offset = _mm256_sub_epi32(offset, _mm256_set1_epi32(two_gb / scale)); - return _mm256_i32gather_epi32(normalized_base, normalized_offset, scale); + int const* normalized_base = base + kTwoGB / sizeof(int); + __m256i normalized_offset = + _mm256_sub_epi32(offset, _mm256_set1_epi32(static_cast(kTwoGB / kScale))); + return _mm256_i32gather_epi32(normalized_base, normalized_offset, + static_cast(kScale)); } -template +template inline __m256i UnsignedOffsetSafeGather64(arrow::util::int64_for_gather_t const* base, __m128i offset) { - auto normalized_base = base + two_gb / sizeof(arrow::util::int64_for_gather_t); - __m128i normalized_offset = _mm_sub_epi32(offset, _mm_set1_epi32(two_gb / scale)); - return _mm256_i32gather_epi64(normalized_base, normalized_offset, scale); + arrow::util::int64_for_gather_t const* normalized_base = + base + kTwoGB / sizeof(arrow::util::int64_for_gather_t); + __m128i normalized_offset = + _mm_sub_epi32(offset, _mm_set1_epi32(static_cast(kTwoGB / kScale))); + return _mm256_i32gather_epi64(normalized_base, normalized_offset, + static_cast(kScale)); } } // namespace diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 690d505b8fd19..a6fc6c044148c 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -181,7 +181,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // number of rows. constexpr int64_t num_rows = std::numeric_limits::max() + 1; const std::vector> fixed_length_types{uint64(), uint32()}; - // The var length column should be a little smaller than 2GB to WAR the capacity + // The var length column should be a little smaller than 2GB to workaround the capacity // limitation in the var length builder. constexpr int32_t var_length = two_gb / num_rows - 1; auto row_size = std::accumulate(fixed_length_types.begin(), fixed_length_types.end(), @@ -231,7 +231,8 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { ASSERT_OK(row_encoder.EncodeSelected(&row_table, static_cast(num_rows), row_ids_right.data())); - ASSERT_TRUE(row_table.offsets()); + // The row table must contain an offset buffer. + ASSERT_NE(row_table.offsets(), NULLPTR); // The whole point of this test. ASSERT_GT(row_table.offsets()[num_rows - 1], two_gb); From 9862d03fdfdbffa3e9b9975054d41b2503dcde21 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 25 Jun 2024 00:30:36 +0800 Subject: [PATCH 22/24] Disable test for valgrind --- cpp/src/arrow/compute/row/compare_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index a6fc6c044148c..62bd7dd9a4711 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -166,6 +166,7 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { } } +#ifndef ARROW_VALGRIND // Compare columns to rows at offsets over 2GB within a row table. // Certain AVX2 instructions may behave unexpectedly causing troubles like GH-41813. TEST(KeyCompare, CompareColumnsToRowsLarge) { @@ -289,6 +290,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { num_rows); } } +#endif // ARROW_VALGRIND } // namespace compute } // namespace arrow From 3dcdd9f7d0af48498709062a1eb5744af160e828 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 25 Jun 2024 14:39:41 +0800 Subject: [PATCH 23/24] Pure readability refine --- cpp/src/arrow/compute/row/compare_test.cc | 124 ++++++++++++---------- 1 file changed, 67 insertions(+), 57 deletions(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 62bd7dd9a4711..18d5cc3c6696c 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -194,53 +194,59 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { ASSERT_GT(row_size * num_rows, two_gb); MemoryPool* pool = default_memory_pool(); - TempVectorStack stack; - ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); - std::vector columns; + // The left side columns. + std::vector columns_left; + ExecBatch batch_left; { + std::vector values; + // Several fixed length arrays containing random content. for (const auto& type : fixed_length_types) { - ASSERT_OK_AND_ASSIGN(auto column, ::arrow::gen::Random(type)->Generate(num_rows)); - columns.push_back(std::move(column)); + ASSERT_OK_AND_ASSIGN(auto value, ::arrow::gen::Random(type)->Generate(num_rows)); + values.push_back(std::move(value)); } // A var length array containing 'X' repeated var_length times. - ASSERT_OK_AND_ASSIGN(auto column_var_length, + ASSERT_OK_AND_ASSIGN(auto value_var_length, ::arrow::gen::Constant( std::make_shared(std::string(var_length, 'X'))) ->Generate(num_rows)); - columns.push_back(std::move(column_var_length)); - } - ExecBatch batch(std::move(columns), num_rows); - - std::vector column_metadatas; - ASSERT_OK(ColumnMetadatasFromExecBatch(batch, &column_metadatas)); - std::vector column_arrays; - ASSERT_OK(ColumnArraysFromExecBatch(batch, &column_arrays)); + values.push_back(std::move(value_var_length)); - // The row table (right side). - RowTableMetadata table_metadata_right; - table_metadata_right.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t), - sizeof(uint64_t)); - RowTableImpl row_table; - ASSERT_OK(row_table.Init(pool, table_metadata_right)); - std::vector row_ids_right(num_rows); - std::iota(row_ids_right.begin(), row_ids_right.end(), 0); - RowTableEncoder row_encoder; - row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t)); - row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays); - ASSERT_OK(row_encoder.EncodeSelected(&row_table, static_cast(num_rows), - row_ids_right.data())); + batch_left = ExecBatch(std::move(values), num_rows); + ASSERT_OK(ColumnArraysFromExecBatch(batch_left, &columns_left)); + } - // The row table must contain an offset buffer. - ASSERT_NE(row_table.offsets(), NULLPTR); - // The whole point of this test. - ASSERT_GT(row_table.offsets()[num_rows - 1], two_gb); + // The right side row table. + RowTableImpl row_table_right; + { + // Encode the row table with the left columns. + std::vector column_metadatas; + ASSERT_OK(ColumnMetadatasFromExecBatch(batch_left, &column_metadatas)); + RowTableMetadata table_metadata; + table_metadata.FromColumnMetadataVector(column_metadatas, sizeof(uint64_t), + sizeof(uint64_t)); + ASSERT_OK(row_table_right.Init(pool, table_metadata)); + std::vector row_ids(num_rows); + std::iota(row_ids.begin(), row_ids.end(), 0); + RowTableEncoder row_encoder; + row_encoder.Init(column_metadatas, sizeof(uint64_t), sizeof(uint64_t)); + row_encoder.PrepareEncodeSelected(0, num_rows, columns_left); + ASSERT_OK(row_encoder.EncodeSelected( + &row_table_right, static_cast(num_rows), row_ids.data())); + + // The row table must contain an offset buffer. + ASSERT_NE(row_table_right.offsets(), NULLPTR); + // The whole point of this test. + ASSERT_GT(row_table_right.offsets()[num_rows - 1], two_gb); + } - // The left rows. - std::vector row_ids_left(num_rows); - std::iota(row_ids_left.begin(), row_ids_left.end(), 0); + // The rows to compare. + std::vector row_ids_to_compare(num_rows); + std::iota(row_ids_to_compare.begin(), row_ids_to_compare.end(), 0); + TempVectorStack stack; + ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack}; { @@ -248,22 +254,10 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { uint32_t num_rows_no_match; std::vector row_ids_out(num_rows); KeyCompare::CompareColumnsToRows(num_rows, /*sel_left_maybe_null=*/NULLPTR, - row_ids_left.data(), &ctx, &num_rows_no_match, - row_ids_out.data(), column_arrays, row_table, - /*are_cols_in_encoding_order=*/true, NULLPTR); - ASSERT_EQ(num_rows_no_match, 0); - } - - { - // With selection, output no match row ids. - uint32_t num_rows_no_match; - std::vector row_ids_out(num_rows); - std::vector selection_left(num_rows); - std::iota(selection_left.begin(), selection_left.end(), 0); - KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), row_ids_left.data(), - &ctx, &num_rows_no_match, row_ids_out.data(), - column_arrays, row_table, - /*are_cols_in_encoding_order=*/true, NULLPTR); + row_ids_to_compare.data(), &ctx, &num_rows_no_match, + row_ids_out.data(), columns_left, row_table_right, + /*are_cols_in_encoding_order=*/true, + /*out_match_bitvector_maybe_null=*/NULLPTR); ASSERT_EQ(num_rows_no_match, 0); } @@ -271,21 +265,37 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // No selection, output match bit vector. std::vector match_bitvector(BytesForBits(num_rows)); KeyCompare::CompareColumnsToRows( - num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_left.data(), &ctx, NULLPTR, - NULLPTR, column_arrays, row_table, /*are_cols_in_encoding_order=*/true, - match_bitvector.data()); + num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_to_compare.data(), &ctx, + /*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left, + row_table_right, + /*are_cols_in_encoding_order=*/true, match_bitvector.data()); ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), num_rows); } + std::vector selection_left(num_rows); + std::iota(selection_left.begin(), selection_left.end(), 0); + + { + // With selection, output no match row ids. + uint32_t num_rows_no_match; + std::vector row_ids_out(num_rows); + KeyCompare::CompareColumnsToRows(num_rows, selection_left.data(), + row_ids_to_compare.data(), &ctx, &num_rows_no_match, + row_ids_out.data(), columns_left, row_table_right, + /*are_cols_in_encoding_order=*/true, + /*out_match_bitvector_maybe_null=*/NULLPTR); + ASSERT_EQ(num_rows_no_match, 0); + } + { // With selection, output match bit vector. std::vector match_bitvector(BytesForBits(num_rows)); - std::vector selection_left(num_rows); - std::iota(selection_left.begin(), selection_left.end(), 0); KeyCompare::CompareColumnsToRows( - num_rows, NULLPTR, row_ids_left.data(), &ctx, NULLPTR, NULLPTR, column_arrays, - row_table, /*are_cols_in_encoding_order=*/true, match_bitvector.data()); + num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_to_compare.data(), &ctx, + /*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left, + row_table_right, + /*are_cols_in_encoding_order=*/true, match_bitvector.data()); ASSERT_EQ(arrow::internal::CountSetBits(match_bitvector.data(), 0, num_rows), num_rows); } From 956869fa0180e50f0c6ee9c102edc7794b0faeb1 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 25 Jun 2024 14:49:09 +0800 Subject: [PATCH 24/24] Fix a case that should have used selection but did not --- cpp/src/arrow/compute/row/compare_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 18d5cc3c6696c..662862075c245 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -292,7 +292,7 @@ TEST(KeyCompare, CompareColumnsToRowsLarge) { // With selection, output match bit vector. std::vector match_bitvector(BytesForBits(num_rows)); KeyCompare::CompareColumnsToRows( - num_rows, /*sel_left_maybe_null=*/NULLPTR, row_ids_to_compare.data(), &ctx, + num_rows, selection_left.data(), row_ids_to_compare.data(), &ctx, /*out_num_rows=*/NULLPTR, /*out_sel_left_maybe_same=*/NULLPTR, columns_left, row_table_right, /*are_cols_in_encoding_order=*/true, match_bitvector.data());