Skip to content

Commit

Permalink
Refactoring HiveConnectorTestBase::writeToFile() for DWRF and Parquet…
Browse files Browse the repository at this point in the history
… file formats and refactoring tests accordingly
  • Loading branch information
minhancao committed Oct 7, 2024
1 parent 13c18db commit 3e06627
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 119 deletions.
213 changes: 143 additions & 70 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
#include "velox/dwio/dwrf/writer/FlushPolicy.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#ifdef VELOX_ENABLE_PARQUET
#include "velox/dwio/parquet/writer/Writer.h"
#endif
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand All @@ -34,14 +39,7 @@ namespace facebook::velox::connector::hive::iceberg {

class HiveIcebergTest : public HiveConnectorTestBase {
public:
HiveIcebergTest()
: config_{std::make_shared<facebook::velox::dwrf::Config>()} {
// Make the writers flush per batch so that we can create non-aligned
// RowGroups between the base data files and delete files
flushPolicyFactory_ = []() {
return std::make_unique<dwrf::LambdaFlushPolicy>([]() { return true; });
};
}
HiveIcebergTest() {}

/// Create 1 base data file data_file_1 with 2 RowGroups of 10000 rows each.
/// Also create 1 delete file delete_file_1 which contains delete positions
Expand Down Expand Up @@ -170,66 +168,76 @@ class HiveIcebergTest : public HiveConnectorTestBase {
std::multimap<std::string, std::vector<int64_t>>>&
deleteFilesForBaseDatafiles,
int32_t numPrefetchSplits = 0) {
// Keep the reference to the deleteFilePath, otherwise the corresponding
// file will be deleted.
std::map<std::string, std::shared_ptr<TempFilePath>> dataFilePaths =
writeDataFiles(rowGroupSizesForFiles);
std::unordered_map<
std::string,
std::pair<int64_t, std::shared_ptr<TempFilePath>>>
deleteFilePaths = writePositionDeleteFiles(
deleteFilesForBaseDatafiles, dataFilePaths);

std::vector<std::shared_ptr<ConnectorSplit>> splits;

for (const auto& dataFile : dataFilePaths) {
std::string baseFileName = dataFile.first;
std::string baseFilePath = dataFile.second->getPath();

std::vector<IcebergDeleteFile> deleteFiles;

for (auto const& deleteFile : deleteFilesForBaseDatafiles) {
std::string deleteFileName = deleteFile.first;
std::multimap<std::string, std::vector<int64_t>> deleteFileContent =
deleteFile.second;

if (deleteFileContent.count(baseFileName) != 0) {
// If this delete file contains rows for the target base file, then
// add it to the split
auto deleteFilePath =
deleteFilePaths[deleteFileName].second->getPath();
IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath,
fileFomat_,
deleteFilePaths[deleteFileName].first,
testing::internal::GetFileSize(
std::fopen(deleteFilePath.c_str(), "r")));
deleteFiles.push_back(deleteFile);
std::vector<dwio::common::FileFormat> fileFormats = {
dwio::common::FileFormat::DWRF};
if (hasWriterFactory(dwio::common::FileFormat::PARQUET)) {
fileFormats.push_back(dwio::common::FileFormat::PARQUET);
}

for (dwio::common::FileFormat fileFormat : fileFormats) {
// Keep the reference to the deleteFilePath, otherwise the corresponding
// file will be deleted.
std::map<std::string, std::shared_ptr<TempFilePath>> dataFilePaths =
writeDataFiles(rowGroupSizesForFiles, fileFormat);
std::unordered_map<
std::string,
std::pair<int64_t, std::shared_ptr<TempFilePath>>>
deleteFilePaths = writePositionDeleteFiles(
deleteFilesForBaseDatafiles, dataFilePaths, fileFormat);

std::vector<std::shared_ptr<ConnectorSplit>> splits;

for (const auto& dataFile : dataFilePaths) {
std::string baseFileName = dataFile.first;
std::string baseFilePath = dataFile.second->getPath();

std::vector<IcebergDeleteFile> deleteFiles;

for (auto const& deleteFile : deleteFilesForBaseDatafiles) {
std::string deleteFileName = deleteFile.first;
std::multimap<std::string, std::vector<int64_t>> deleteFileContent =
deleteFile.second;

if (deleteFileContent.count(baseFileName) != 0) {
// If this delete file contains rows for the target base file, then
// add it to the split
auto deleteFilePath =
deleteFilePaths[deleteFileName].second->getPath();
IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath,
fileFormat,
deleteFilePaths[deleteFileName].first,
testing::internal::GetFileSize(
std::fopen(deleteFilePath.c_str(), "r")));
deleteFiles.push_back(deleteFile);
}
}

splits.emplace_back(
makeIcebergSplit(baseFilePath, deleteFiles, fileFormat));
}

splits.emplace_back(makeIcebergSplit(baseFilePath, deleteFiles));
std::string duckdbSql =
getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles);
auto plan = tableScanNode();
auto task = HiveConnectorTestBase::assertQuery(
plan, splits, duckdbSql, numPrefetchSplits);

auto planStats = toPlanStats(task->taskStats());
auto scanNodeId = plan->id();
auto it = planStats.find(scanNodeId);
ASSERT_TRUE(it != planStats.end());
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

std::string duckdbSql =
getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles);
auto plan = tableScanNode();
auto task = HiveConnectorTestBase::assertQuery(
plan, splits, duckdbSql, numPrefetchSplits);

auto planStats = toPlanStats(task->taskStats());
auto scanNodeId = plan->id();
auto it = planStats.find(scanNodeId);
ASSERT_TRUE(it != planStats.end());
ASSERT_TRUE(it->second.peakMemoryBytes > 0);
}

const static int rowCount = 20000;

private:
std::map<std::string, std::shared_ptr<TempFilePath>> writeDataFiles(
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles) {
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles,
dwio::common::FileFormat fileFormat) {
std::map<std::string, std::shared_ptr<TempFilePath>> dataFilePaths;

std::vector<RowVectorPtr> dataVectorsJoined;
Expand All @@ -240,14 +248,50 @@ class HiveIcebergTest : public HiveConnectorTestBase {
dataFilePaths[dataFile.first] = TempFilePath::create();

// We make the values are continuously increasing even across base data
// files. This is to make constructing DuckDB queries easier
// files. This is to make constructing DuckDB queries easier.
std::vector<RowVectorPtr> dataVectors =
makeVectors(dataFile.second, startingValue);
std::shared_ptr<dwio::common::WriterOptions> writerOptions = nullptr;

// Make the writers flush per batch so that we can create non-aligned
// RowGroups between the base data files and delete files.
switch (fileFormat) {
case dwio::common::FileFormat::DWRF: {
auto dwrfOptions = std::make_shared<dwrf::WriterOptions>();
dwrfOptions->flushPolicyFactory = []() {
return std::make_unique<dwrf::LambdaFlushPolicy>(
[]() { return true; });
};
writerOptions = dwrfOptions;
break;
}
#ifdef VELOX_ENABLE_PARQUET
case dwio::common::FileFormat::PARQUET: {
auto parquetOptions = std::make_shared<parquet::WriterOptions>();
parquetOptions->flushPolicyFactory = []() {
return std::make_unique<parquet::LambdaFlushPolicy>(
1'024 * 1'024, 128 * 1'024 * 1'024, []() { return true; });
};
writerOptions = parquetOptions;
break;
}
#endif
default: {
auto defaultOptions = std::make_shared<dwrf::WriterOptions>();
defaultOptions->flushPolicyFactory = []() {
return std::make_unique<dwrf::LambdaFlushPolicy>(
[]() { return true; });
};
writerOptions = defaultOptions;
break;
}
}

writeToFile(
dataFilePaths[dataFile.first]->getPath(),
dataVectors,
config_,
flushPolicyFactory_);
writerOptions,
fileFormat);

for (int i = 0; i < dataVectors.size(); i++) {
dataVectorsJoined.push_back(dataVectors[i]);
Expand All @@ -271,7 +315,8 @@ class HiveIcebergTest : public HiveConnectorTestBase {
std::vector<int64_t>>>&
deleteFilesForBaseDatafiles, // <base file name, delete position
// vector for all RowGroups>
std::map<std::string, std::shared_ptr<TempFilePath>> baseFilePaths) {
std::map<std::string, std::shared_ptr<TempFilePath>> baseFilePaths,
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF) {
std::unordered_map<
std::string,
std::pair<int64_t, std::shared_ptr<TempFilePath>>>
Expand Down Expand Up @@ -303,12 +348,43 @@ class HiveIcebergTest : public HiveConnectorTestBase {
deleteFileVectors.push_back(deleteFileVector);
totalPositionsInDeleteFile += positionsInRowGroup.size();
}
std::shared_ptr<dwio::common::WriterOptions> writerOptions = nullptr;
switch (fileFormat) {
case dwio::common::FileFormat::DWRF: {
auto dwrfOptions = std::make_shared<dwrf::WriterOptions>();
dwrfOptions->flushPolicyFactory = []() {
return std::make_unique<dwrf::LambdaFlushPolicy>(
[]() { return true; });
};
writerOptions = dwrfOptions;
break;
}

case dwio::common::FileFormat::PARQUET: {
auto parquetOptions = std::make_shared<parquet::WriterOptions>();
parquetOptions->flushPolicyFactory = []() {
return std::make_unique<parquet::LambdaFlushPolicy>(
1'024 * 1'024, 128 * 1'024 * 1'024, []() { return true; });
};
writerOptions = parquetOptions;
break;
}

default: {
auto defaultOptions = std::make_shared<dwrf::WriterOptions>();
defaultOptions->flushPolicyFactory = []() {
return std::make_unique<dwrf::LambdaFlushPolicy>(
[]() { return true; });
};
writerOptions = defaultOptions;
break;
}
}
writeToFile(
deleteFilePath->getPath(),
deleteFileVectors,
config_,
flushPolicyFactory_);
writerOptions,
fileFormat);

deleteFilePaths[deleteFileName] =
std::make_pair(totalPositionsInDeleteFile, deleteFilePath);
Expand Down Expand Up @@ -337,7 +413,8 @@ class HiveIcebergTest : public HiveConnectorTestBase {

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
const std::vector<IcebergDeleteFile>& deleteFiles = {},
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";
Expand All @@ -349,7 +426,7 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
fileFormat,
0,
fileSize,
partitionKeys,
Expand Down Expand Up @@ -476,10 +553,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return PlanBuilder(pool_.get()).tableScan(rowType_).planNode();
}

dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF};
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})};
std::shared_ptr<IcebergMetadataColumn> pathColumn_ =
IcebergMetadataColumn::icebergDeleteFilePathColumn();
Expand Down
14 changes: 11 additions & 3 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/dwio/common/CacheInputStream.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
#include "velox/dwio/dwrf/common/Config.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/Exchange.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/PlanNodeStats.h"
Expand Down Expand Up @@ -3489,6 +3491,9 @@ TEST_F(TableScanTest, randomSample) {
auto writeConfig = std::make_shared<dwrf::Config>();
writeConfig->set<uint64_t>(
dwrf::Config::STRIPE_SIZE, rows->size() * sizeof(double));
std::shared_ptr<dwrf::WriterOptions> dwrfWriterOptions =
std::make_shared<dwrf::WriterOptions>();
dwrfWriterOptions->config = writeConfig;
int numTotalRows = 0;
for (int i = 0; i < 10; ++i) {
auto file = TempFilePath::create();
Expand All @@ -3497,10 +3502,10 @@ TEST_F(TableScanTest, randomSample) {
for (int j = 0; j < 100; ++j) {
vectors.push_back(rows);
}
writeToFile(file->getPath(), vectors, writeConfig);
writeToFile(file->getPath(), vectors, dwrfWriterOptions);
numTotalRows += rows->size() * vectors.size();
} else {
writeToFile(file->getPath(), {rows}, writeConfig);
writeToFile(file->getPath(), {rows}, dwrfWriterOptions);
numTotalRows += rows->size();
}
files.push_back(file);
Expand Down Expand Up @@ -4750,9 +4755,12 @@ TEST_F(TableScanTest, readFlatMapAsStruct) {
config->set<const std::vector<uint32_t>>(dwrf::Config::MAP_FLAT_COLS, {0});
config->set<const std::vector<std::vector<std::string>>>(
dwrf::Config::MAP_FLAT_COLS_STRUCT_KEYS, {keys});
std::shared_ptr<dwrf::WriterOptions> dwrfWriterOptions =
std::make_shared<dwrf::WriterOptions>();
dwrfWriterOptions->config = config;
auto file = TempFilePath::create();
auto writeSchema = ROW({"c0"}, {MAP(INTEGER(), BIGINT())});
writeToFile(file->getPath(), {vector}, config, writeSchema);
writeToFile(file->getPath(), {vector}, dwrfWriterOptions, writeSchema);
auto readSchema = asRowType(vector->type());
auto plan =
PlanBuilder().tableScan(readSchema, {}, "", writeSchema).planNode();
Expand Down
21 changes: 11 additions & 10 deletions velox/exec/tests/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,22 @@ add_library(

target_link_libraries(
velox_exec_test_lib
velox_vector_test_lib
velox_temp_path
velox_aggregates
velox_core
velox_exception
velox_expression
velox_parse_parser
velox_duckdb_conversion
velox_dwio_common
velox_dwio_common_test_utils
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer
velox_dwio_common_test_utils
velox_dwio_parquet_writer
velox_exception
velox_expression
velox_file_test_utils
velox_type_fbhive
velox_functions_prestosql
velox_hive_connector
velox_tpch_connector
velox_parse_parser
velox_presto_serializer
velox_functions_prestosql
velox_aggregates)
velox_temp_path
velox_tpch_connector
velox_type_fbhive
velox_vector_test_lib)
Loading

0 comments on commit 3e06627

Please sign in to comment.