diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index d79e21b733439..aaa68e3f3eecf 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -19,6 +19,11 @@ #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" +#include "velox/dwio/dwrf/writer/FlushPolicy.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#ifdef VELOX_ENABLE_PARQUET +#include "velox/dwio/parquet/writer/Writer.h" +#endif #include "velox/exec/PlanNodeStats.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -34,14 +39,7 @@ namespace facebook::velox::connector::hive::iceberg { class HiveIcebergTest : public HiveConnectorTestBase { public: - HiveIcebergTest() - : config_{std::make_shared()} { - // Make the writers flush per batch so that we can create non-aligned - // RowGroups between the base data files and delete files - flushPolicyFactory_ = []() { - return std::make_unique([]() { return true; }); - }; - } + HiveIcebergTest() {} /// Create 1 base data file data_file_1 with 2 RowGroups of 10000 rows each. /// Also create 1 delete file delete_file_1 which contains delete positions @@ -170,66 +168,76 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::multimap>>& deleteFilesForBaseDatafiles, int32_t numPrefetchSplits = 0) { - // Keep the reference to the deleteFilePath, otherwise the corresponding - // file will be deleted. - std::map> dataFilePaths = - writeDataFiles(rowGroupSizesForFiles); - std::unordered_map< - std::string, - std::pair>> - deleteFilePaths = writePositionDeleteFiles( - deleteFilesForBaseDatafiles, dataFilePaths); - - std::vector> splits; - - for (const auto& dataFile : dataFilePaths) { - std::string baseFileName = dataFile.first; - std::string baseFilePath = dataFile.second->getPath(); - - std::vector deleteFiles; - - for (auto const& deleteFile : deleteFilesForBaseDatafiles) { - std::string deleteFileName = deleteFile.first; - std::multimap> deleteFileContent = - deleteFile.second; - - if (deleteFileContent.count(baseFileName) != 0) { - // If this delete file contains rows for the target base file, then - // add it to the split - auto deleteFilePath = - deleteFilePaths[deleteFileName].second->getPath(); - IcebergDeleteFile deleteFile( - FileContent::kPositionalDeletes, - deleteFilePath, - fileFomat_, - deleteFilePaths[deleteFileName].first, - testing::internal::GetFileSize( - std::fopen(deleteFilePath.c_str(), "r"))); - deleteFiles.push_back(deleteFile); + std::vector fileFormats = { + dwio::common::FileFormat::DWRF}; + if (hasWriterFactory(dwio::common::FileFormat::PARQUET)) { + fileFormats.push_back(dwio::common::FileFormat::PARQUET); + } + + for (dwio::common::FileFormat fileFormat : fileFormats) { + // Keep the reference to the deleteFilePath, otherwise the corresponding + // file will be deleted. + std::map> dataFilePaths = + writeDataFiles(rowGroupSizesForFiles, fileFormat); + std::unordered_map< + std::string, + std::pair>> + deleteFilePaths = writePositionDeleteFiles( + deleteFilesForBaseDatafiles, dataFilePaths, fileFormat); + + std::vector> splits; + + for (const auto& dataFile : dataFilePaths) { + std::string baseFileName = dataFile.first; + std::string baseFilePath = dataFile.second->getPath(); + + std::vector deleteFiles; + + for (auto const& deleteFile : deleteFilesForBaseDatafiles) { + std::string deleteFileName = deleteFile.first; + std::multimap> deleteFileContent = + deleteFile.second; + + if (deleteFileContent.count(baseFileName) != 0) { + // If this delete file contains rows for the target base file, then + // add it to the split + auto deleteFilePath = + deleteFilePaths[deleteFileName].second->getPath(); + IcebergDeleteFile deleteFile( + FileContent::kPositionalDeletes, + deleteFilePath, + fileFormat, + deleteFilePaths[deleteFileName].first, + testing::internal::GetFileSize( + std::fopen(deleteFilePath.c_str(), "r"))); + deleteFiles.push_back(deleteFile); + } } + + splits.emplace_back( + makeIcebergSplit(baseFilePath, deleteFiles, fileFormat)); } - splits.emplace_back(makeIcebergSplit(baseFilePath, deleteFiles)); + std::string duckdbSql = + getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); + auto plan = tableScanNode(); + auto task = HiveConnectorTestBase::assertQuery( + plan, splits, duckdbSql, numPrefetchSplits); + + auto planStats = toPlanStats(task->taskStats()); + auto scanNodeId = plan->id(); + auto it = planStats.find(scanNodeId); + ASSERT_TRUE(it != planStats.end()); + ASSERT_TRUE(it->second.peakMemoryBytes > 0); } - - std::string duckdbSql = - getDuckDBQuery(rowGroupSizesForFiles, deleteFilesForBaseDatafiles); - auto plan = tableScanNode(); - auto task = HiveConnectorTestBase::assertQuery( - plan, splits, duckdbSql, numPrefetchSplits); - - auto planStats = toPlanStats(task->taskStats()); - auto scanNodeId = plan->id(); - auto it = planStats.find(scanNodeId); - ASSERT_TRUE(it != planStats.end()); - ASSERT_TRUE(it->second.peakMemoryBytes > 0); } const static int rowCount = 20000; private: std::map> writeDataFiles( - std::map> rowGroupSizesForFiles) { + std::map> rowGroupSizesForFiles, + dwio::common::FileFormat fileFormat) { std::map> dataFilePaths; std::vector dataVectorsJoined; @@ -240,14 +248,50 @@ class HiveIcebergTest : public HiveConnectorTestBase { dataFilePaths[dataFile.first] = TempFilePath::create(); // We make the values are continuously increasing even across base data - // files. This is to make constructing DuckDB queries easier + // files. This is to make constructing DuckDB queries easier. std::vector dataVectors = makeVectors(dataFile.second, startingValue); + std::shared_ptr writerOptions = nullptr; + + // Make the writers flush per batch so that we can create non-aligned + // RowGroups between the base data files and delete files. + switch (fileFormat) { + case dwio::common::FileFormat::DWRF: { + auto dwrfOptions = std::make_shared(); + dwrfOptions->flushPolicyFactory = []() { + return std::make_unique( + []() { return true; }); + }; + writerOptions = dwrfOptions; + break; + } +#ifdef VELOX_ENABLE_PARQUET + case dwio::common::FileFormat::PARQUET: { + auto parquetOptions = std::make_shared(); + parquetOptions->flushPolicyFactory = []() { + return std::make_unique( + 1'024 * 1'024, 128 * 1'024 * 1'024, []() { return true; }); + }; + writerOptions = parquetOptions; + break; + } +#endif + default: { + auto defaultOptions = std::make_shared(); + defaultOptions->flushPolicyFactory = []() { + return std::make_unique( + []() { return true; }); + }; + writerOptions = defaultOptions; + break; + } + } + writeToFile( dataFilePaths[dataFile.first]->getPath(), dataVectors, - config_, - flushPolicyFactory_); + writerOptions, + fileFormat); for (int i = 0; i < dataVectors.size(); i++) { dataVectorsJoined.push_back(dataVectors[i]); @@ -271,7 +315,8 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::vector>>& deleteFilesForBaseDatafiles, // - std::map> baseFilePaths) { + std::map> baseFilePaths, + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF) { std::unordered_map< std::string, std::pair>> @@ -303,12 +348,43 @@ class HiveIcebergTest : public HiveConnectorTestBase { deleteFileVectors.push_back(deleteFileVector); totalPositionsInDeleteFile += positionsInRowGroup.size(); } + std::shared_ptr writerOptions = nullptr; + switch (fileFormat) { + case dwio::common::FileFormat::DWRF: { + auto dwrfOptions = std::make_shared(); + dwrfOptions->flushPolicyFactory = []() { + return std::make_unique( + []() { return true; }); + }; + writerOptions = dwrfOptions; + break; + } + case dwio::common::FileFormat::PARQUET: { + auto parquetOptions = std::make_shared(); + parquetOptions->flushPolicyFactory = []() { + return std::make_unique( + 1'024 * 1'024, 128 * 1'024 * 1'024, []() { return true; }); + }; + writerOptions = parquetOptions; + break; + } + + default: { + auto defaultOptions = std::make_shared(); + defaultOptions->flushPolicyFactory = []() { + return std::make_unique( + []() { return true; }); + }; + writerOptions = defaultOptions; + break; + } + } writeToFile( deleteFilePath->getPath(), deleteFileVectors, - config_, - flushPolicyFactory_); + writerOptions, + fileFormat); deleteFilePaths[deleteFileName] = std::make_pair(totalPositionsInDeleteFile, deleteFilePath); @@ -337,7 +413,8 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::shared_ptr makeIcebergSplit( const std::string& dataFilePath, - const std::vector& deleteFiles = {}) { + const std::vector& deleteFiles = {}, + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF) { std::unordered_map> partitionKeys; std::unordered_map customSplitInfo; customSplitInfo["table_format"] = "hive-iceberg"; @@ -349,7 +426,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { return std::make_shared( kHiveConnectorId, dataFilePath, - fileFomat_, + fileFormat, 0, fileSize, partitionKeys, @@ -476,10 +553,6 @@ class HiveIcebergTest : public HiveConnectorTestBase { return PlanBuilder(pool_.get()).tableScan(rowType_).planNode(); } - dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF}; - std::shared_ptr config_; - std::function()> flushPolicyFactory_; - RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})}; std::shared_ptr pathColumn_ = IcebergMetadataColumn::icebergDeleteFilePathColumn(); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 2fc22d0e98a73..4bb86572e0722 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -33,6 +33,8 @@ #include "velox/connectors/hive/HivePartitionFunction.h" #include "velox/dwio/common/CacheInputStream.h" #include "velox/dwio/common/tests/utils/DataFiles.h" +#include "velox/dwio/dwrf/common/Config.h" +#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/PlanNodeStats.h" @@ -3489,6 +3491,9 @@ TEST_F(TableScanTest, randomSample) { auto writeConfig = std::make_shared(); writeConfig->set( dwrf::Config::STRIPE_SIZE, rows->size() * sizeof(double)); + std::shared_ptr dwrfWriterOptions = + std::make_shared(); + dwrfWriterOptions->config = writeConfig; int numTotalRows = 0; for (int i = 0; i < 10; ++i) { auto file = TempFilePath::create(); @@ -3497,10 +3502,10 @@ TEST_F(TableScanTest, randomSample) { for (int j = 0; j < 100; ++j) { vectors.push_back(rows); } - writeToFile(file->getPath(), vectors, writeConfig); + writeToFile(file->getPath(), vectors, dwrfWriterOptions); numTotalRows += rows->size() * vectors.size(); } else { - writeToFile(file->getPath(), {rows}, writeConfig); + writeToFile(file->getPath(), {rows}, dwrfWriterOptions); numTotalRows += rows->size(); } files.push_back(file); @@ -4750,9 +4755,12 @@ TEST_F(TableScanTest, readFlatMapAsStruct) { config->set>(dwrf::Config::MAP_FLAT_COLS, {0}); config->set>>( dwrf::Config::MAP_FLAT_COLS_STRUCT_KEYS, {keys}); + std::shared_ptr dwrfWriterOptions = + std::make_shared(); + dwrfWriterOptions->config = config; auto file = TempFilePath::create(); auto writeSchema = ROW({"c0"}, {MAP(INTEGER(), BIGINT())}); - writeToFile(file->getPath(), {vector}, config, writeSchema); + writeToFile(file->getPath(), {vector}, dwrfWriterOptions, writeSchema); auto readSchema = asRowType(vector->type()); auto plan = PlanBuilder().tableScan(readSchema, {}, "", writeSchema).planNode(); diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index 8e28b4dcdc89e..fe890635169a2 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -34,21 +34,22 @@ add_library( target_link_libraries( velox_exec_test_lib - velox_vector_test_lib - velox_temp_path + velox_aggregates velox_core - velox_exception - velox_expression - velox_parse_parser velox_duckdb_conversion velox_dwio_common + velox_dwio_common_test_utils velox_dwio_dwrf_reader velox_dwio_dwrf_writer - velox_dwio_common_test_utils + velox_dwio_parquet_writer + velox_exception + velox_expression velox_file_test_utils - velox_type_fbhive + velox_functions_prestosql velox_hive_connector - velox_tpch_connector + velox_parse_parser velox_presto_serializer - velox_functions_prestosql - velox_aggregates) + velox_temp_path + velox_tpch_connector + velox_type_fbhive + velox_vector_test_lib) diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index 6f2f0676c1751..a0952ccb0f823 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -15,13 +15,13 @@ */ #include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/dwio/dwrf/writer/Writer.h" +#ifdef VELOX_ENABLE_PARQUET +#include "velox/dwio/parquet/writer/Writer.h" +#endif -#include "velox/common/file/FileSystems.h" #include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" -#include "velox/dwio/dwrf/reader/DwrfReader.h" -#include "velox/dwio/dwrf/writer/FlushPolicy.h" -#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" namespace facebook::velox::exec::test { @@ -64,46 +64,71 @@ void HiveConnectorTestBase::resetHiveConnector( void HiveConnectorTestBase::writeToFile( const std::string& filePath, - RowVectorPtr vector) { - writeToFile(filePath, std::vector{vector}); + const RowVectorPtr vector, + dwio::common::FileFormat fileFormat) { + // Default to using dwrf writer options if none were provided. + writeToFile( + filePath, + std::vector{vector}, + std::make_shared(), + fileFormat); } void HiveConnectorTestBase::writeToFile( const std::string& filePath, const std::vector& vectors, - std::shared_ptr config, - const std::function()>& - flushPolicyFactory) { + dwio::common::FileFormat fileFormat) { + // Default to using dwrf writer options if none were provided. writeToFile( - filePath, - vectors, - std::move(config), - vectors[0]->type(), - flushPolicyFactory); + filePath, vectors, std::make_shared(), fileFormat); +} + +void HiveConnectorTestBase::writeToFile( + const std::string& filePath, + const std::vector& vectors, + std::shared_ptr options, + dwio::common::FileFormat fileFormat) { + writeToFile(filePath, vectors, options, vectors[0]->type(), fileFormat); } void HiveConnectorTestBase::writeToFile( const std::string& filePath, const std::vector& vectors, - std::shared_ptr config, + std::shared_ptr options, const TypePtr& schema, - const std::function()>& - flushPolicyFactory) { - velox::dwrf::WriterOptions options; - options.config = config; - options.schema = schema; + dwio::common::FileFormat fileFormat) { auto localWriteFile = std::make_unique(filePath, true, false); auto sink = std::make_unique( std::move(localWriteFile), filePath); auto childPool = rootPool_->addAggregateChild("HiveConnectorTestBase.Writer"); - options.memoryPool = childPool.get(); - options.flushPolicyFactory = flushPolicyFactory; - - facebook::velox::dwrf::Writer writer{std::move(sink), options}; + options->memoryPool = childPool.get(); + options->schema = schema; + std::shared_ptr writerOptions; + switch (fileFormat) { + case dwio::common::FileFormat::DWRF: + writerOptions = std::dynamic_pointer_cast(options); + break; +#ifdef VELOX_ENABLE_PARQUET + case dwio::common::FileFormat::PARQUET: + writerOptions = + std::dynamic_pointer_cast(options); + break; +#endif + default: + VELOX_NYI( + "File format not supported: {}.", dwio::common::toString(fileFormat)); + break; + } + VELOX_CHECK_NOT_NULL( + writerOptions, + "Writer Factory expected a supported file format WriterOptions object."); + std::unique_ptr writer = + dwio::common::getWriterFactory(fileFormat) + ->createWriter(std::move(sink), writerOptions); for (size_t i = 0; i < vectors.size(); ++i) { - writer.write(vectors[i]); + writer->write(vectors[i]); } - writer.close(); + writer->close(); } std::vector HiveConnectorTestBase::makeVectors( diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index aa6e5ac5beb96..9b02f00394c62 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -19,8 +19,6 @@ #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" -#include "velox/dwio/dwrf/common/Config.h" -#include "velox/dwio/dwrf/writer/FlushPolicy.h" #include "velox/exec/Operator.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/TempFilePath.h" @@ -43,23 +41,28 @@ class HiveConnectorTestBase : public OperatorTestBase { void resetHiveConnector( const std::shared_ptr& config); - void writeToFile(const std::string& filePath, RowVectorPtr vector); + void writeToFile( + const std::string& filePath, + const RowVectorPtr vector, + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF); + + void writeToFile( + const std::string& filePath, + const std::vector& vectors, + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF); void writeToFile( const std::string& filePath, const std::vector& vectors, - std::shared_ptr config = - std::make_shared(), - const std::function()>& - flushPolicyFactory = nullptr); + std::shared_ptr options, + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF); void writeToFile( const std::string& filePath, const std::vector& vectors, - std::shared_ptr config, + std::shared_ptr options, const TypePtr& schema, - const std::function()>& - flushPolicyFactory = nullptr); + dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF); std::vector makeVectors( const RowTypePtr& rowType,