Skip to content

Commit

Permalink
Test writing multi-threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Dec 13, 2024
1 parent 5a35359 commit 294c131
Showing 1 changed file with 85 additions and 31 deletions.
116 changes: 85 additions & 31 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,67 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<std::tuple<Encry
std::move(parquet_encryption_config);

// Write dataset.
auto concurrently = std::get<1>(GetParam());
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
// ideally, we would have UseThreads(concurrently) here, but that is not working
// unless GH-26818 (https://github.com/apache/arrow/issues/26818) is fixed
ARROW_EXPECT_OK(scanner_builder->UseThreads(false));
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
if (concurrently) {
// have a notable number of threads to exhibit multi-threading issues
ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(16));
std::vector<Future<>> threads;

// write dataset above multiple times concurrently to see that is thread-safe.
for (size_t i = 1; i <= 100; ++i) {
FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = "thread-" + std::to_string(i);
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
threads.push_back(
DeferNotOk(pool->Submit(FileSystemDataset::Write, write_options, scanner))
);
}
pool->WaitForIdle();

// assert all jobs succeeded
for (auto& thread : threads) {
thread.Wait();
ASSERT_TRUE(thread.state() == FutureState::SUCCESS);
}
} else {
FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}
}

virtual void PrepareTableAndPartitioning() = 0;

Result<std::shared_ptr<Dataset>> CreateDataset(std::string_view base_dir, const std::shared_ptr<ParquetFileFormat> &file_format) {
// Get FileInfo objects for all files under the base directory
fs::FileSelector selector;
selector.base_dir = base_dir;
selector.recursive = true;

FileSystemFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = base_dir;
ARROW_ASSIGN_OR_RAISE(auto dataset_factory,
FileSystemDatasetFactory::Make(file_system_, selector, file_format, factory_options));

// Create the dataset
return dataset_factory->Finish();
}

void TestScanDataset() {
// Create decryption properties.
auto decryption_config =
Expand All @@ -152,23 +198,13 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<std::tuple<Encry
auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

// Get FileInfo objects for all files under the base directory
fs::FileSelector selector;
selector.base_dir = kBaseDir;
selector.recursive = true;

FileSystemFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
FileSystemDatasetFactory::Make(file_system_, selector,
file_format, factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());
ASSERT_OK_AND_ASSIGN(auto expected_table, table_->CombineChunks());

auto concurrently = std::get<1>(GetParam());
if (concurrently) {
// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, CreateDataset("thread-1", file_format));

// have a notable number of threads to exhibit multi-threading issues
ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(16));
std::vector<Future<std::shared_ptr<Table>>> threads;
Expand All @@ -183,13 +219,23 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<std::tuple<Encry
// assert correctness of jobs
for (auto& thread : threads) {
ASSERT_OK_AND_ASSIGN(auto read_table, thread.result());
AssertTablesEqual(*read_table, *table_);
AssertTablesEqual(*read_table, *expected_table);
}

// finally check datasets written by all other threads are as expected
for (size_t i = 2; i <= 100; ++i) {
ASSERT_OK_AND_ASSIGN(dataset, CreateDataset("thread-" + std::to_string(i), file_format));
ASSERT_OK_AND_ASSIGN(auto read_table, DatasetEncryptionTestBase::read(dataset));
AssertTablesEqual(*read_table, *expected_table);
}
} else {
// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, CreateDataset(kBaseDir, file_format));

// Reuse the dataset above to scan it twice to make sure decryption works correctly.
for (size_t i = 0; i < 2; ++i) {
ASSERT_OK_AND_ASSIGN(auto read_table, read(dataset));
AssertTablesEqual(*read_table, *table_);
AssertTablesEqual(*read_table, *expected_table);
}
}
}
Expand All @@ -198,6 +244,7 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<std::tuple<Encry
// Read dataset into table
ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
ARROW_EXPECT_OK(scanner_builder->UseThreads(std::get<1>(GetParam())));
ARROW_ASSIGN_OR_RAISE(auto read_table, scanner->ToTable());

// Verify the data was read correctly
Expand All @@ -208,6 +255,7 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<std::tuple<Encry
}

protected:
std::string base_dir_ = std::get<1>(GetParam()) ? "thread-1" : std::string(kBaseDir);
std::shared_ptr<fs::FileSystem> file_system_;
std::shared_ptr<Table> table_;
std::shared_ptr<Partitioning> partitioning_;
Expand Down Expand Up @@ -252,7 +300,7 @@ TEST_P(DatasetEncryptionTest, WriteReadDatasetWithEncryption) {
// Read a single parquet file with and without decryption properties.
TEST_P(DatasetEncryptionTest, ReadSingleFile) {
// Open the Parquet file.
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("part=a/part0.parquet"));
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(base_dir_ + "/part=a/part0.parquet"));

// Try to read metadata without providing decryption properties
// when the footer is encrypted.
Expand Down Expand Up @@ -288,32 +336,38 @@ INSTANTIATE_TEST_SUITE_P(DatasetEncryptionTestThreaded, DatasetEncryptionTest,

// GH-39444: This test covers the case where parquet dataset scanner crashes when
// processing encrypted datasets over 2^15 rows in multi-threaded mode.
class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
class LargeRowCountEncryptionTest : public DatasetEncryptionTestBase {
public:
// The dataset is partitioned using a Hive partitioning scheme.
void PrepareTableAndPartitioning() override {
// Specifically chosen to be greater than batch size for triggering prefetch.
constexpr int kRowCount = 32769;
// Number of batches
constexpr int kBatchCount = 10;

// Create a random floating-point array with large number of rows.
// Create multiple random floating-point arrays with large number of rows.
arrow::random::RandomArrayGenerator rand_gen(0);
auto array = rand_gen.Float32(kRowCount, 0.0, 1.0, false);
auto arrays = std::vector<std::shared_ptr<arrow::Array>>();
for (int i = 0; i < kBatchCount; i++) {
arrays.push_back(rand_gen.Float32(kRowCount, 0.0, 1.0, false));
}
ASSERT_OK_AND_ASSIGN(auto column, ChunkedArray::Make(arrays, float32()));
auto table_schema = schema({field("a", float32())});

// Prepare table and partitioning.
table_ = arrow::Table::Make(table_schema, {array});
table_ = arrow::Table::Make(table_schema, {column});
partitioning_ = std::make_shared<dataset::DirectoryPartitioning>(arrow::schema({}));
}
};

// Test for writing and reading encrypted dataset with large row count.
TEST_P(LargeRowEncryptionTest, ReadEncryptLargeRows) {
TEST_P(LargeRowCountEncryptionTest, ReadEncryptLargeRowCount) {
ASSERT_NO_FATAL_FAILURE(TestScanDataset());
}

INSTANTIATE_TEST_SUITE_P(LargeRowEncryptionTest, LargeRowEncryptionTest,
INSTANTIATE_TEST_SUITE_P(LargeRowCountEncryptionTest, LargeRowCountEncryptionTest,
::testing::Values(std::tuple<EncryptionParam, bool>(COLUMN_KEY, false), std::tuple<EncryptionParam, bool>(UNIFORM, false)));
INSTANTIATE_TEST_SUITE_P(LargeRowEncryptionTestThreaded, LargeRowEncryptionTest,
INSTANTIATE_TEST_SUITE_P(LargeRowCountEncryptionTestThreaded, LargeRowCountEncryptionTest,
::testing::Values(std::tuple<EncryptionParam, bool>(COLUMN_KEY, true), std::tuple<EncryptionParam, bool>(UNIFORM, true)));

} // namespace dataset
Expand Down

0 comments on commit 294c131

Please sign in to comment.