Skip to content

Commit

Permalink
feat: Support to disable dynamic filter reordering
Browse files Browse the repository at this point in the history
Summary:
Add config to disable dynamic filter reordering to see if it can help reduce io in favor of small query execution
if io wait could take significant amount when having local ssd. The static reordering solely based on the filter kind.
This should only be used in specific case as dynamic filter reordering is an important scan optimization.

Differential Revision: D67542368
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 23, 2024
1 parent d46bea7 commit c4326ec
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 34 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class ConnectorQueryCtx {
const config::ConfigBase* const sessionProperties_;
const common::SpillConfig* const spillConfig_;
const common::PrefixSortConfig prefixSortConfig_;
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
const std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
cache::AsyncDataCache* cache_;
const std::string scanId_;
const std::string queryId_;
Expand Down
13 changes: 11 additions & 2 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ bool HiveConfig::ignoreMissingFiles(const config::ConfigBase* session) const {
return session->get<bool>(kIgnoreMissingFilesSession, false);
}

int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20); // 128MB
int64_t HiveConfig::maxCoalescedBytes(const config::ConfigBase* session) const {
return session->get<int64_t>(
kMaxCoalescedBytesSession,
config_->get<int64_t>(kMaxCoalescedDistance, 128 << 20)); // 128MB
}

int32_t HiveConfig::maxCoalescedDistanceBytes(
Expand Down Expand Up @@ -283,6 +285,13 @@ uint8_t HiveConfig::readTimestampUnit(const config::ConfigBase* session) const {
return unit;
}

bool HiveConfig::readFilterReorderDisabled(
const config::ConfigBase* session) const {
return session->get<bool>(
kreadFilterReorderDisabledSession,
config_->get<bool>(kreadFilterReorderDisabled, false));
}

bool HiveConfig::cacheNoRetention(const config::ConfigBase* session) const {
return session->get<bool>(
kCacheNoRetentionSession,
Expand Down
11 changes: 10 additions & 1 deletion velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class HiveConfig {

/// The max coalesce bytes for a request.
static constexpr const char* kMaxCoalescedBytes = "max-coalesced-bytes";
static constexpr const char* kMaxCoalescedBytesSession =
"max-coalesced-bytes";

/// The max merge distance to combine read requests.
/// Note: The session property name differs from the constant name for
Expand Down Expand Up @@ -206,6 +208,11 @@ class HiveConfig {
static constexpr const char* kReadTimestampUnitSession =
"hive.reader.timestamp_unit";

static constexpr const char* kreadFilterReorderDisabled =
"hive.reader.filter_reorder_disabaled";
static constexpr const char* kreadFilterReorderDisabledSession =
"hive.reader.filter_reorder_disabaled";

static constexpr const char* kCacheNoRetention = "cache.no_retention";
static constexpr const char* kCacheNoRetentionSession = "cache.no_retention";
static constexpr const char* kLocalDataPath = "hive_local_data_path";
Expand Down Expand Up @@ -239,7 +246,7 @@ class HiveConfig {

bool ignoreMissingFiles(const config::ConfigBase* session) const;

int64_t maxCoalescedBytes() const;
int64_t maxCoalescedBytes(const config::ConfigBase* session) const;

int32_t maxCoalescedDistanceBytes(const config::ConfigBase* session) const;

Expand Down Expand Up @@ -294,6 +301,8 @@ class HiveConfig {
// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const config::ConfigBase* session) const;

bool readFilterReorderDisabled(const config::ConfigBase* session) const;

/// Returns true to evict out a query scanned data out of in-memory cache
/// right after the access, and also skip staging to the ssd cache. This helps
/// to prevent the cache space pollution from the one-time table scan by large
Expand Down
41 changes: 35 additions & 6 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@
namespace facebook::velox::connector::hive {
namespace {

struct SubfieldFilter {
std::string name;
const common::Subfield* subfield;
const common::Filter* filter;
};

// This ensures the filters are ordered based on the subfield name.
std::vector<SubfieldFilter> orderSubfieldFilters(
const SubfieldFilters& filters) {
std::vector<SubfieldFilter> ordered;
ordered.reserve(filters.size());
for (const auto& [subfield, filter] : filters) {
ordered.emplace_back(
SubfieldFilter{subfield.toString(), &subfield, filter.get()});
}
std::sort(
ordered.begin(), ordered.end(), [](const auto& lhs, const auto& rhs) {
return lhs.name < rhs.name;
});
return ordered;
}

struct SubfieldSpec {
const common::Subfield* subfield;
bool filterOnly;
Expand Down Expand Up @@ -369,6 +391,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const SpecialColumnNames& specialColumns,
bool disableFilterReorder,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
Expand Down Expand Up @@ -441,8 +464,10 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
}
}

for (auto& pair : filters) {
const auto name = pair.first.toString();
// Ensure the initial filter ordering is the same across splits and drivers
// are the same.
const auto orderedFilters = orderSubfieldFilters(filters);
for (const auto& filterEntry : orderedFilters) {
// SelectiveColumnReader doesn't support constant columns with filters,
// hence, we can't have a filter for a $path or $bucket column.
//
Expand All @@ -451,13 +476,16 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
// to be removed.
// TODO Remove this check when Presto is fixed to not specify a filter
// on $path and $bucket column.
if (isSynthesizedColumn(name, infoColumns)) {
if (isSynthesizedColumn(filterEntry.name, infoColumns)) {
continue;
}
auto fieldSpec = spec->getOrCreateChild(pair.first);
fieldSpec->addFilter(*pair.second);
auto fieldSpec = spec->getOrCreateChild(*filterEntry.subfield);
fieldSpec->addFilter(*filterEntry.filter);
}

if (disableFilterReorder) {
spec->disableFilterReorder();
}
return spec;
}

Expand Down Expand Up @@ -557,7 +585,8 @@ void configureReaderOptions(
const std::unordered_map<std::string, std::string>& tableParameters) {
auto sessionProperties = connectorQueryCtx->sessionProperties();
readerOptions.setLoadQuantum(hiveConfig->loadQuantum());
readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes());
readerOptions.setMaxCoalesceBytes(
hiveConfig->maxCoalescedBytes(sessionProperties));
readerOptions.setMaxCoalesceDistance(
hiveConfig->maxCoalescedDistanceBytes(sessionProperties));
readerOptions.setFileColumnNamesReadAsLowerCase(
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
const SpecialColumnNames& specialColumns,
bool disableFilterReorder,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ HiveDataSource::HiveDataSource(
partitionKeys_,
infoColumns_,
specialColumns_,
hiveConfig_->readFilterReorderDisabled(
connectorQueryCtx_->sessionProperties()),
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down Expand Up @@ -260,6 +262,8 @@ std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
partitionKeys_,
infoColumns_,
specialColumns_,
hiveConfig_->readFilterReorderDisabled(
connectorQueryCtx_->sessionProperties()),
pool_);
newScanSpec->moveAdaptationFrom(*scanSpec_);
scanSpec_ = std::move(newScanSpec);
Expand Down
Loading

0 comments on commit c4326ec

Please sign in to comment.