Skip to content

Commit

Permalink
Check that partition keys are not null in PartitionIdGenerator (#10799)
Browse files Browse the repository at this point in the history
Summary:
In data processing and partitioning, null values in partition keys can complicate or invalidate the partitioning process because null values do not represent valid, meaningful data. Therefore, we want to ensure that partition keys are not null. However, there are some compute engines like spark that may generate null partition keys.

To accommodate both scenarios, i add the configuration that allows us to choose whether to permit null partition keys.

Pull Request resolved: #10799

Reviewed By: kgpai

Differential Revision: D62033189

Pulled By: bikramSingh91

fbshipit-source-id: edc355242ad33e98190f53ab7c5183e9dd9af2c3
  • Loading branch information
kevincmchen authored and facebook-github-bot committed Sep 3, 2024
1 parent fd06bd9 commit 5f4e7e5
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 2 deletions.
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ bool HiveConfig::isPartitionPathAsLowerCase(
return session->get<bool>(kPartitionPathAsLowerCaseSession, true);
}

bool HiveConfig::allowNullPartitionKeys(
const config::ConfigBase* session) const {
return session->get<bool>(
kAllowNullPartitionKeysSession,
config_->get<bool>(kAllowNullPartitionKeys, true));
}

bool HiveConfig::ignoreMissingFiles(const config::ConfigBase* session) const {
return session->get<bool>(kIgnoreMissingFilesSession, false);
}
Expand Down
7 changes: 7 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class HiveConfig {
static constexpr const char* kPartitionPathAsLowerCaseSession =
"partition_path_as_lower_case";

static constexpr const char* kAllowNullPartitionKeys =
"allow-null-partition-keys";
static constexpr const char* kAllowNullPartitionKeysSession =
"allow_null_partition_keys";

static constexpr const char* kIgnoreMissingFilesSession =
"ignore_missing_files";

Expand Down Expand Up @@ -292,6 +297,8 @@ class HiveConfig {

bool isPartitionPathAsLowerCase(const config::ConfigBase* session) const;

bool allowNullPartitionKeys(const config::ConfigBase* session) const;

bool ignoreMissingFiles(const config::ConfigBase* session) const;

int64_t maxCoalescedBytes() const;
Expand Down
15 changes: 15 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,21 @@ std::string HiveDataSink::stateString(State state) {
void HiveDataSink::computePartitionAndBucketIds(const RowVectorPtr& input) {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isPartitioned()) {
if (!hiveConfig_->allowNullPartitionKeys(
connectorQueryCtx_->sessionProperties())) {
// Check that there are no nulls in the partition keys.
for (auto& partitionIdx : partitionChannels_) {
auto col = input->childAt(partitionIdx);
if (col->mayHaveNulls()) {
for (auto i = 0; i < col->size(); ++i) {
VELOX_USER_CHECK(
!col->isNullAt(i),
"Partition key must not be null: {}",
input->type()->asRow().nameOf(partitionIdx))
}
}
}
}
partitionIdGenerator_->run(input, partitionIds_);
}

Expand Down
2 changes: 0 additions & 2 deletions velox/connectors/hive/PartitionIdGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ void PartitionIdGenerator::run(
const auto numRows = input->size();
result.resize(numRows);

// TODO Check that there are no nulls in the partition keys.

// Compute value IDs using VectorHashers and store these in 'result'.
computeValueIds(input, result);

Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/tests/HiveConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TEST(HiveConfigTest, defaultConfig) {
ASSERT_EQ(
hiveConfig.sortWriterMaxOutputBytes(emptySession.get()), 10UL << 20);
ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(emptySession.get()), true);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), true);
ASSERT_EQ(hiveConfig.orcWriterMinCompressionSize(emptySession.get()), 1024);
ASSERT_EQ(
hiveConfig.orcWriterCompressionLevel(emptySession.get()), std::nullopt);
Expand Down Expand Up @@ -97,6 +98,7 @@ TEST(HiveConfigTest, overrideConfig) {
{HiveConfig::kGCSCredentials, "hey"},
{HiveConfig::kOrcUseColumnNames, "true"},
{HiveConfig::kFileColumnNamesReadAsLowerCase, "true"},
{HiveConfig::kAllowNullPartitionKeys, "false"},
{HiveConfig::kMaxCoalescedBytes, "100"},
{HiveConfig::kMaxCoalescedDistanceBytes, "100"},
{HiveConfig::kNumCacheFileHandles, "100"},
Expand Down Expand Up @@ -136,6 +138,7 @@ TEST(HiveConfigTest, overrideConfig) {
ASSERT_EQ(hiveConfig.isOrcUseColumnNames(emptySession.get()), true);
ASSERT_EQ(
hiveConfig.isFileColumnNamesReadAsLowerCase(emptySession.get()), true);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(emptySession.get()), false);
ASSERT_EQ(hiveConfig.maxCoalescedBytes(), 100);
ASSERT_EQ(hiveConfig.maxCoalescedDistanceBytes(), 100);
ASSERT_EQ(hiveConfig.numCacheFileHandles(), 100);
Expand Down Expand Up @@ -178,6 +181,7 @@ TEST(HiveConfigTest, overrideSession) {
{HiveConfig::kSortWriterMaxOutputRowsSession, "20"},
{HiveConfig::kSortWriterMaxOutputBytesSession, "20MB"},
{HiveConfig::kPartitionPathAsLowerCaseSession, "false"},
{HiveConfig::kAllowNullPartitionKeysSession, "false"},
{HiveConfig::kIgnoreMissingFilesSession, "true"},
{HiveConfig::kOrcWriterMinCompressionSizeSession, "512"},
{HiveConfig::kOrcWriterCompressionLevelSession, "1"},
Expand Down Expand Up @@ -224,6 +228,7 @@ TEST(HiveConfigTest, overrideSession) {
ASSERT_EQ(hiveConfig.sortWriterMaxOutputRows(session.get()), 20);
ASSERT_EQ(hiveConfig.sortWriterMaxOutputBytes(session.get()), 20UL << 20);
ASSERT_EQ(hiveConfig.isPartitionPathAsLowerCase(session.get()), false);
ASSERT_EQ(hiveConfig.allowNullPartitionKeys(session.get()), false);
ASSERT_EQ(hiveConfig.ignoreMissingFiles(session.get()), true);
ASSERT_EQ(
hiveConfig.orcWriterLinearStripeSizeHeuristics(session.get()), false);
Expand Down
9 changes: 9 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,14 @@ Each query can override the config by setting corresponding query session proper
- bool
- true
- If true, the partition directory will be converted to lowercase when executing a table write operation.
* - allow-null-partition-keys
- allow_null_partition_keys
- bool
- true
- Determines whether null values for partition keys are allowed or not. If not, fails with "Partition key must
not be null" error message when writing data with null partition key.
Null check for partitioning key should be used only when partitions are generated dynamically during query execution.
For queries that write to fixed partitions, this check should happen much earlier before the Velox execution even starts.
* - ignore_missing_files
-
- bool
Expand Down Expand Up @@ -518,6 +526,7 @@ Each query can override the config by setting corresponding query session proper
from the one-time table scan by large batch query when mixed running with interactive
query which has high data locality.


``Amazon S3 Configuration``
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. list-table::
Expand Down

0 comments on commit 5f4e7e5

Please sign in to comment.