Skip to content

Commit

Permalink
refactor(hive): Move WriterOptions update to file-formats (#11956)
Browse files Browse the repository at this point in the history
Summary:
WriterOptions of the file format should implement how to process
format-specific session and connector configs.
This will decouple the Hive Connector library from the file-format libraries.
This was previously changed here #10915

Pull Request resolved: #11956

Reviewed By: xiaoxmeng

Differential Revision: D67718785

Pulled By: pedroerp

fbshipit-source-id: 8771162ebc166bd10e9d825e3760bc9a06a7a232
  • Loading branch information
majetideepak authored and facebook-github-bot committed Jan 8, 2025
1 parent 8bbc20c commit 0c61b5e
Show file tree
Hide file tree
Showing 24 changed files with 422 additions and 626 deletions.
5 changes: 2 additions & 3 deletions velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,12 +1110,11 @@ DEBUG_ONLY_TEST_P(SharedArbitrationTestWithThreadingModes, runtimeStats) {
// triggered flush.
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::kOrcWriterMaxStripeSizeSession,
dwrf::Config::kOrcWriterMaxStripeSizeSession,
"1GB")
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::
kOrcWriterMaxDictionaryMemorySession,
dwrf::Config::kOrcWriterMaxDictionaryMemorySession,
"1GB")
.plan(std::move(writerPlan))
.assertResults(fmt::format("SELECT {}", numRows));
Expand Down
19 changes: 2 additions & 17 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,8 @@ velox_add_library(
velox_link_libraries(
velox_hive_connector
PUBLIC velox_hive_iceberg_splitreader
PRIVATE
velox_common_io
velox_connector
velox_dwio_catalog_fbhive
velox_dwio_dwrf_reader
velox_dwio_dwrf_writer
velox_dwio_orc_reader
velox_dwio_parquet_reader
velox_dwio_parquet_writer
velox_dwio_text_writer_register
velox_file
velox_hive_partition_function
velox_type_tz
velox_s3fs
velox_hdfs
velox_gcs
velox_abfs)
PRIVATE velox_common_io velox_connector velox_dwio_catalog_fbhive
velox_hive_partition_function)

velox_add_library(velox_hive_partition_function HivePartitionFunction.cpp)

Expand Down
79 changes: 0 additions & 79 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,85 +162,6 @@ bool HiveConfig::isFileHandleCacheEnabled() const {
return config_->get<bool>(kEnableFileHandleCache, true);
}

uint64_t HiveConfig::orcWriterMaxStripeSize(
const config::ConfigBase* session) const {
return config::toCapacity(
session->get<std::string>(
kOrcWriterMaxStripeSizeSession,
config_->get<std::string>(kOrcWriterMaxStripeSize, "64MB")),
config::CapacityUnit::BYTE);
}

uint64_t HiveConfig::orcWriterMaxDictionaryMemory(
const config::ConfigBase* session) const {
return config::toCapacity(
session->get<std::string>(
kOrcWriterMaxDictionaryMemorySession,
config_->get<std::string>(kOrcWriterMaxDictionaryMemory, "16MB")),
config::CapacityUnit::BYTE);
}

bool HiveConfig::isOrcWriterIntegerDictionaryEncodingEnabled(
const config::ConfigBase* session) const {
return session->get<bool>(
kOrcWriterIntegerDictionaryEncodingEnabledSession,
config_->get<bool>(kOrcWriterIntegerDictionaryEncodingEnabled, true));
}

bool HiveConfig::isOrcWriterStringDictionaryEncodingEnabled(
const config::ConfigBase* session) const {
return session->get<bool>(
kOrcWriterStringDictionaryEncodingEnabledSession,
config_->get<bool>(kOrcWriterStringDictionaryEncodingEnabled, true));
}

bool HiveConfig::orcWriterLinearStripeSizeHeuristics(
const config::ConfigBase* session) const {
return session->get<bool>(
kOrcWriterLinearStripeSizeHeuristicsSession,
config_->get<bool>(kOrcWriterLinearStripeSizeHeuristics, true));
}

uint64_t HiveConfig::orcWriterMinCompressionSize(
const config::ConfigBase* session) const {
return session->get<uint64_t>(
kOrcWriterMinCompressionSizeSession,
config_->get<uint64_t>(kOrcWriterMinCompressionSize, 1024));
}

std::optional<uint8_t> HiveConfig::orcWriterCompressionLevel(
const config::ConfigBase* session) const {
auto sessionProp = session->get<uint8_t>(kOrcWriterCompressionLevelSession);

if (sessionProp.has_value()) {
return sessionProp.value();
}

auto configProp = config_->get<uint8_t>(kOrcWriterCompressionLevel);

if (configProp.has_value()) {
return configProp.value();
}

// Presto has a single config controlling this value, but different defaults
// depending on the compression kind.
return std::nullopt;
}

uint8_t HiveConfig::orcWriterZLIBCompressionLevel(
const config::ConfigBase* session) const {
constexpr uint8_t kDefaultZlibCompressionLevel = 4;
return orcWriterCompressionLevel(session).value_or(
kDefaultZlibCompressionLevel);
}

uint8_t HiveConfig::orcWriterZSTDCompressionLevel(
const config::ConfigBase* session) const {
constexpr uint8_t kDefaultZstdCompressionLevel = 3;
return orcWriterCompressionLevel(session).value_or(
kDefaultZstdCompressionLevel);
}

std::string HiveConfig::writeFileCreateConfig() const {
return config_->get<std::string>(kWriteFileCreateConfig, "");
}
Expand Down
70 changes: 0 additions & 70 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ class HiveConfig {

/// Maximum number of (bucketed) partitions per a single table writer
/// instance.
///
/// TODO: remove hive_orc_use_column_names since it doesn't exist in presto,
/// right now this is only used for testing.
static constexpr const char* kMaxPartitionsPerWriters =
"max-partitions-per-writers";
static constexpr const char* kMaxPartitionsPerWritersSession =
Expand Down Expand Up @@ -136,48 +133,6 @@ class HiveConfig {
/// meta data together. Optimization to decrease the small IO requests
static constexpr const char* kFilePreloadThreshold = "file-preload-threshold";

/// Maximum stripe size in orc writer.
static constexpr const char* kOrcWriterMaxStripeSize =
"hive.orc.writer.stripe-max-size";
static constexpr const char* kOrcWriterMaxStripeSizeSession =
"orc_optimized_writer_max_stripe_size";

/// Maximum dictionary memory that can be used in orc writer.
static constexpr const char* kOrcWriterMaxDictionaryMemory =
"hive.orc.writer.dictionary-max-memory";
static constexpr const char* kOrcWriterMaxDictionaryMemorySession =
"orc_optimized_writer_max_dictionary_memory";

/// Configs to control dictionary encoding.
static constexpr const char* kOrcWriterIntegerDictionaryEncodingEnabled =
"hive.orc.writer.integer-dictionary-encoding-enabled";
static constexpr const char*
kOrcWriterIntegerDictionaryEncodingEnabledSession =
"orc_optimized_writer_integer_dictionary_encoding_enabled";
static constexpr const char* kOrcWriterStringDictionaryEncodingEnabled =
"hive.orc.writer.string-dictionary-encoding-enabled";
static constexpr const char*
kOrcWriterStringDictionaryEncodingEnabledSession =
"orc_optimized_writer_string_dictionary_encoding_enabled";

/// Enables historical based stripe size estimation after compression.
static constexpr const char* kOrcWriterLinearStripeSizeHeuristics =
"hive.orc.writer.linear-stripe-size-heuristics";
static constexpr const char* kOrcWriterLinearStripeSizeHeuristicsSession =
"orc_writer_linear_stripe_size_heuristics";

/// Minimal number of items in an encoded stream.
static constexpr const char* kOrcWriterMinCompressionSize =
"hive.orc.writer.min-compression-size";
static constexpr const char* kOrcWriterMinCompressionSizeSession =
"orc_writer_min_compression_size";

/// The compression level to use with ZLIB and ZSTD.
static constexpr const char* kOrcWriterCompressionLevel =
"hive.orc.writer.compression-level";
static constexpr const char* kOrcWriterCompressionLevelSession =
"orc_optimized_writer_compression_level";

/// Config used to create write files. This config is provided to underlying
/// file system through hive connector and data sink. The config is free form.
/// The form should be defined by the underlying file system.
Expand Down Expand Up @@ -261,31 +216,6 @@ class HiveConfig {

uint64_t fileWriterFlushThresholdBytes() const;

uint64_t orcWriterMaxStripeSize(const config::ConfigBase* session) const;

uint64_t orcWriterMaxDictionaryMemory(
const config::ConfigBase* session) const;

bool isOrcWriterIntegerDictionaryEncodingEnabled(
const config::ConfigBase* session) const;

bool isOrcWriterStringDictionaryEncodingEnabled(
const config::ConfigBase* session) const;

bool orcWriterLinearStripeSizeHeuristics(
const config::ConfigBase* session) const;

uint64_t orcWriterMinCompressionSize(const config::ConfigBase* session) const;

std::optional<uint8_t> orcWriterCompressionLevel(
const config::ConfigBase* session) const;

uint8_t orcWriterZLIBCompressionLevel(
const config::ConfigBase* session) const;

uint8_t orcWriterZSTDCompressionLevel(
const config::ConfigBase* session) const;

std::string writeFileCreateConfig() const;

uint32_t sortWriterMaxOutputRows(const config::ConfigBase* session) const;
Expand Down
1 change: 0 additions & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include <memory>

using namespace facebook::velox::exec;
using namespace facebook::velox::dwrf;

namespace facebook::velox::connector::hive {

Expand Down
Loading

0 comments on commit 0c61b5e

Please sign in to comment.