From 5f4e7e5c33cc0a554981768ca13e8383d0772d79 Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Tue, 3 Sep 2024 15:02:30 -0700 Subject: [PATCH] Check that partition keys are not null in PartitionIdGenerator (#10799) Summary: In data processing and partitioning, null values in partition keys can complicate or invalidate the partitioning process because null values do not represent valid, meaningful data. Therefore, we want to ensure that partition keys are not null. However, there are some compute engines like spark that may generate null partition keys. To accommodate both scenarios, i add the configuration that allows us to choose whether to permit null partition keys. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10799 Reviewed By: kgpai Differential Revision: D62033189 Pulled By: bikramSingh91 fbshipit-source-id: edc355242ad33e98190f53ab7c5183e9dd9af2c3 --- velox/connectors/hive/HiveConfig.cpp | 7 +++++++ velox/connectors/hive/HiveConfig.h | 7 +++++++ velox/connectors/hive/HiveDataSink.cpp | 15 +++++++++++++++ velox/connectors/hive/PartitionIdGenerator.cpp | 2 -- velox/connectors/hive/tests/HiveConfigTest.cpp | 5 +++++ velox/docs/configs.rst | 9 +++++++++ 6 files changed, 43 insertions(+), 2 deletions(-) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 984a27cade42..b44e06c8074c 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -173,6 +173,13 @@ bool HiveConfig::isPartitionPathAsLowerCase( return session->get(kPartitionPathAsLowerCaseSession, true); } +bool HiveConfig::allowNullPartitionKeys( + const config::ConfigBase* session) const { + return session->get( + kAllowNullPartitionKeysSession, + config_->get(kAllowNullPartitionKeys, true)); +} + bool HiveConfig::ignoreMissingFiles(const config::ConfigBase* session) const { return session->get(kIgnoreMissingFilesSession, false); } diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 3fbbe710b865..3d449e6d6718 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -135,6 +135,11 @@ class HiveConfig { static constexpr const char* kPartitionPathAsLowerCaseSession = "partition_path_as_lower_case"; + static constexpr const char* kAllowNullPartitionKeys = + "allow-null-partition-keys"; + static constexpr const char* kAllowNullPartitionKeysSession = + "allow_null_partition_keys"; + static constexpr const char* kIgnoreMissingFilesSession = "ignore_missing_files"; @@ -292,6 +297,8 @@ class HiveConfig { bool isPartitionPathAsLowerCase(const config::ConfigBase* session) const; + bool allowNullPartitionKeys(const config::ConfigBase* session) const; + bool ignoreMissingFiles(const config::ConfigBase* session) const; int64_t maxCoalescedBytes() const; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index c0d5cb46b9fa..678acd31f84b 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -496,6 +496,21 @@ std::string HiveDataSink::stateString(State state) { void HiveDataSink::computePartitionAndBucketIds(const RowVectorPtr& input) { VELOX_CHECK(isPartitioned() || isBucketed()); if (isPartitioned()) { + if (!hiveConfig_->allowNullPartitionKeys( + connectorQueryCtx_->sessionProperties())) { + // Check that there are no nulls in the partition keys. + for (auto& partitionIdx : partitionChannels_) { + auto col = input->childAt(partitionIdx); + if (col->mayHaveNulls()) { + for (auto i = 0; i < col->size(); ++i) { + VELOX_USER_CHECK( + !col->isNullAt(i), + "Partition key must not be null: {}", + input->type()->asRow().nameOf(partitionIdx)) + } + } + } + } partitionIdGenerator_->run(input, partitionIds_); } diff --git a/velox/connectors/hive/PartitionIdGenerator.cpp b/velox/connectors/hive/PartitionIdGenerator.cpp index bc5678a46cc5..deec8dc5b005 100644 --- a/velox/connectors/hive/PartitionIdGenerator.cpp +++ b/velox/connectors/hive/PartitionIdGenerator.cpp @@ -66,8 +66,6 @@ void PartitionIdGenerator::run( const auto numRows = input->size(); result.resize(numRows); - // TODO Check that there are no nulls in the partition keys. - // Compute value IDs using VectorHashers and store these in 'result'. computeValueIds(input, result); diff --git a/velox/connectors/hive/tests/HiveConfigTest.cpp b/velox/connectors/hive/tests/HiveConfigTest.cpp index b4585ecd4443..b7bbfdbc7904 100644 --- a/velox/connectors/hive/tests/HiveConfigTest.cpp +++ b/velox/connectors/hive/tests/HiveConfigTest.cpp @@ -70,6 +70,7 @@ TEST(HiveConfigTest, defaultConfig) { ASSERT_EQ( hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 10UL << 20); ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(emptySession.get()), true); + ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), true); ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 1024); ASSERT_EQ( hiveConfig.orcWriterCompressionLevel(emptySession.get()), std::nullopt); @@ -97,6 +98,7 @@ TEST(HiveConfigTest, overrideConfig) { {HiveConfig::kGCSCredentials, "hey"}, {HiveConfig::kOrcUseColumnNames, "true"}, {HiveConfig::kFileColumnNamesReadAsLowerCase, "true"}, + {HiveConfig::kAllowNullPartitionKeys, "false"}, {HiveConfig::kMaxCoalescedBytes, "100"}, {HiveConfig::kMaxCoalescedDistanceBytes, "100"}, {HiveConfig::kNumCacheFileHandles, "100"}, @@ -136,6 +138,7 @@ TEST(HiveConfigTest, overrideConfig) { ASSERT_EQ(hiveConfig.isOrcUseColumnNames(emptySession.get()), true); ASSERT_EQ( hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()), true); + ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), false); ASSERT_EQ(hiveConfig.maxCoalescedBytes(), 100); ASSERT_EQ(hiveConfig.maxCoalescedDistanceBytes(), 100); ASSERT_EQ(hiveConfig.numCacheFileHandles(), 100); @@ -178,6 +181,7 @@ TEST(HiveConfigTest, overrideSession) { {HiveConfig::kSortWriterMaxOutputRowsSession, "20"}, {HiveConfig::kSortWriterMaxOutputBytesSession, "20MB"}, {HiveConfig::kPartitionPathAsLowerCaseSession, "false"}, + {HiveConfig::kAllowNullPartitionKeysSession, "false"}, {HiveConfig::kIgnoreMissingFilesSession, "true"}, {HiveConfig::kOrcWriterMinCompressionSizeSession, "512"}, {HiveConfig::kOrcWriterCompressionLevelSession, "1"}, @@ -224,6 +228,7 @@ TEST(HiveConfigTest, overrideSession) { ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(session.get()), 20); ASSERT_EQ(hiveConfig.sortWriterMaxOutputBytes(session.get()), 20UL << 20); ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(session.get()), false); + ASSERT_EQ(hiveConfig.allowNullPartitionKeys(session.get()), false); ASSERT_EQ(hiveConfig.ignoreMissingFiles(session.get()), true); ASSERT_EQ( hiveConfig.orcWriterLinearStripeSizeHeuristics(session.get()), false); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 0e4582e1b5c1..746ad0d8cd44 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -414,6 +414,14 @@ Each query can override the config by setting corresponding query session proper - bool - true - If true, the partition directory will be converted to lowercase when executing a table write operation. + * - allow-null-partition-keys + - allow_null_partition_keys + - bool + - true + - Determines whether null values for partition keys are allowed or not. If not, fails with "Partition key must + not be null" error message when writing data with null partition key. + Null check for partitioning key should be used only when partitions are generated dynamically during query execution. + For queries that write to fixed partitions, this check should happen much earlier before the Velox execution even starts. * - ignore_missing_files - - bool @@ -518,6 +526,7 @@ Each query can override the config by setting corresponding query session proper from the one-time table scan by large batch query when mixed running with interactive query which has high data locality. + ``Amazon S3 Configuration`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. list-table::