Skip to content

Commit

Permalink
Move connectorQueryCtx_ and other required fields to SplitReader
Browse files Browse the repository at this point in the history
The upcoming IcebergSplitReader will need to use fileHandleFactory_,
executor_, connectorQueryCtx_, etc because it needs to create another
HiveDataSource to read the delete files. This PR copies these required
fields to SplitReader. Moreover, since the SplitReader already owns
the baseReader_, the creation and configuration of ReaderOptions was
also moved to SplitReader in a single function configureReaderOptions().
Previously the configuration of ReaderOptions was scattered at multiple
locations in HiveDataSource.
  • Loading branch information
yingsu00 committed Nov 8, 2023
1 parent f73648a commit 92dd1b6
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 185 deletions.
15 changes: 1 addition & 14 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,14 @@ std::unique_ptr<DataSource> HiveConnector::createDataSource(
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
ConnectorQueryCtx* connectorQueryCtx) {
dwio::common::ReaderOptions options(connectorQueryCtx->memoryPool());
options.setMaxCoalesceBytes(
HiveConfig::maxCoalescedBytes(connectorQueryCtx->config()));
options.setMaxCoalesceDistance(
HiveConfig::maxCoalescedDistanceBytes(connectorQueryCtx->config()));
options.setFileColumnNamesReadAsLowerCase(
HiveConfig::isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->config()));
options.setUseColumnNamesForColumnMapping(
HiveConfig::isOrcUseColumnNames(connectorQueryCtx->config()));

return std::make_unique<HiveDataSource>(
outputType,
tableHandle,
columnHandles,
&fileHandleFactory_,
connectorQueryCtx->expressionEvaluator(),
connectorQueryCtx->cache(),
connectorQueryCtx->scanId(),
executor_,
options);
connectorQueryCtx);
}

std::unique_ptr<DataSink> HiveConnector::createDataSink(
Expand Down
131 changes: 22 additions & 109 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

#include "velox/connectors/hive/HiveDataSource.h"

#include <string>
#include <unordered_map>

#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/FieldReference.h"

#include <string>
#include <unordered_map>

namespace facebook::velox::connector::hive {

class HiveTableHandle;
Expand Down Expand Up @@ -357,18 +357,13 @@ HiveDataSource::HiveDataSource(
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
FileHandleFactory* fileHandleFactory,
core::ExpressionEvaluator* expressionEvaluator,
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options)
: fileHandleFactory_(fileHandleFactory),
readerOpts_(options),
pool_(&options.getMemoryPool()),
ConnectorQueryCtx* connectorQueryCtx)
: pool_(connectorQueryCtx->memoryPool()),
outputType_(outputType),
expressionEvaluator_(expressionEvaluator),
cache_(cache),
scanId_(scanId),
expressionEvaluator_(connectorQueryCtx->expressionEvaluator()),
fileHandleFactory_(fileHandleFactory),
connectorQueryCtx_(connectorQueryCtx),
executor_(executor) {
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : columnHandles) {
Expand Down Expand Up @@ -409,7 +404,8 @@ HiveDataSource::HiveDataSource(
VELOX_CHECK(
hiveTableHandle_ != nullptr,
"TableHandle must be an instance of HiveTableHandle");
if (readerOpts_.isFileColumnNamesReadAsLowerCase()) {
if (HiveConfig::isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->config())) {
checkColumnNameLowerCase(outputType_);
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters());
checkColumnNameLowerCase(hiveTableHandle_->remainingFilter());
Expand Down Expand Up @@ -473,62 +469,20 @@ HiveDataSource::HiveDataSource(
*scanSpec_, *remainingFilter, expressionEvaluator_);
}

readerOpts_.setFileSchema(hiveTableHandle_->dataColumns());
ioStats_ = std::make_shared<io::IoStatistics>();
}

inline uint8_t parseDelimiter(const std::string& delim) {
for (char const& ch : delim) {
if (!std::isdigit(ch)) {
return delim[0];
}
}
return stoi(delim);
}

void HiveDataSource::parseSerdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters) {
auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim);
if (fieldIt == serdeParameters.end()) {
fieldIt = serdeParameters.find("serialization.format");
}
auto collectionIt =
serdeParameters.find(dwio::common::SerDeOptions::kCollectionDelim);
if (collectionIt == serdeParameters.end()) {
// For collection delimiter, Hive 1.x, 2.x uses "colelction.delim", but
// Hive 3.x uses "collection.delim".
// See: https://issues.apache.org/jira/browse/HIVE-16922)
collectionIt = serdeParameters.find("colelction.delim");
}
auto mapKeyIt =
serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim);

if (fieldIt == serdeParameters.end() &&
collectionIt == serdeParameters.end() &&
mapKeyIt == serdeParameters.end()) {
return;
}

uint8_t fieldDelim = '\1';
uint8_t collectionDelim = '\2';
uint8_t mapKeyDelim = '\3';
if (fieldIt != serdeParameters.end()) {
fieldDelim = parseDelimiter(fieldIt->second);
}
if (collectionIt != serdeParameters.end()) {
collectionDelim = parseDelimiter(collectionIt->second);
}
if (mapKeyIt != serdeParameters.end()) {
mapKeyDelim = parseDelimiter(mapKeyIt->second);
}
dwio::common::SerDeOptions serDeOptions(
fieldDelim, collectionDelim, mapKeyDelim);
readerOpts_.setSerDeOptions(serDeOptions);
}

std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
return SplitReader::create(
split_, readerOutputType_, partitionKeys_, scanSpec_, pool_);
split_,
hiveTableHandle_,
scanSpec_,
readerOutputType_,
&partitionKeys_,
fileHandleFactory_,
executor_,
connectorQueryCtx_,
ioStats_);
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
Expand All @@ -540,30 +494,12 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {

VLOG(1) << "Adding split " << split_->toString();

if (readerOpts_.getFileFormat() != dwio::common::FileFormat::UNKNOWN) {
VELOX_CHECK(
readerOpts_.getFileFormat() == split_->fileFormat,
"HiveDataSource received splits of different formats: {} and {}",
toString(readerOpts_.getFileFormat()),
toString(split_->fileFormat));
} else {
parseSerdeParameters(split_->serdeParameters);
readerOpts_.setFileFormat(split_->fileFormat);
}

auto fileHandle = fileHandleFactory_->generate(split_->filePath).second;
auto input = createBufferedInput(*fileHandle, readerOpts_);

if (splitReader_) {
splitReader_.reset();
}

splitReader_ = createSplitReader();
splitReader_->prepareSplit(
hiveTableHandle_,
readerOpts_,
std::move(input),
metadataFilter_,
runtimeStats_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
}

std::optional<RowVectorPtr> HiveDataSource::next(
Expand Down Expand Up @@ -787,29 +723,6 @@ std::shared_ptr<common::ScanSpec> HiveDataSource::makeScanSpec(
return spec;
}

std::unique_ptr<dwio::common::BufferedInput>
HiveDataSource::createBufferedInput(
const FileHandle& fileHandle,
const dwio::common::ReaderOptions& readerOpts) {
if (cache_) {
return std::make_unique<dwio::common::CachedBufferedInput>(
fileHandle.file,
dwio::common::MetricsLog::voidLog(),
fileHandle.uuid.id(),
cache_,
Connector::getTracker(scanId_, readerOpts.loadQuantum()),
fileHandle.groupId.id(),
ioStats_,
executor_,
readerOpts);
}
return std::make_unique<dwio::common::BufferedInput>(
fileHandle.file,
readerOpts.getMemoryPool(),
dwio::common::MetricsLog::voidLog(),
ioStats_.get());
}

vector_size_t HiveDataSource::evaluateRemainingFilter(RowVectorPtr& rowVector) {
filterRows_.resize(output_->size());

Expand Down
22 changes: 5 additions & 17 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ class HiveDataSource : public DataSource {
std::string,
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
FileHandleFactory* fileHandleFactory,
core::ExpressionEvaluator* expressionEvaluator,
cache::AsyncDataCache* cache,
const std::string& scanId,
folly::Executor* executor,
const dwio::common::ReaderOptions& options);
ConnectorQueryCtx* connectorQueryCtx);

void addSplit(std::shared_ptr<ConnectorSplit> split) override;

Expand Down Expand Up @@ -95,15 +92,9 @@ class HiveDataSource : public DataSource {
protected:
virtual std::unique_ptr<SplitReader> createSplitReader();

std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
const FileHandle&,
const dwio::common::ReaderOptions&);

std::shared_ptr<HiveConnectorSplit> split_;
FileHandleFactory* fileHandleFactory_;
dwio::common::ReaderOptions readerOpts_;
std::shared_ptr<common::ScanSpec> scanSpec_;
memory::MemoryPool* pool_;
std::shared_ptr<common::ScanSpec> scanSpec_;
VectorPtr output_;
std::unique_ptr<SplitReader> splitReader_;

Expand All @@ -128,9 +119,6 @@ class HiveDataSource : public DataSource {
// hold adaptation.
void resetSplit();

void parseSerdeParameters(
const std::unordered_map<std::string, std::string>& serdeParameters);

const RowVectorPtr& getEmptyOutput() {
if (!emptyOutput_) {
emptyOutput_ = RowVector::createEmpty(outputType_, pool_);
Expand All @@ -140,7 +128,7 @@ class HiveDataSource : public DataSource {

std::shared_ptr<HiveTableHandle> hiveTableHandle_;

// The row type for the data source output, not including filter only columns
// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;
std::shared_ptr<io::IoStatistics> ioStats_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
Expand All @@ -155,8 +143,8 @@ class HiveDataSource : public DataSource {
SelectivityVector filterRows_;
exec::FilterEvalCtx filterEvalCtx_;

cache::AsyncDataCache* const cache_{nullptr};
const std::string& scanId_;
FileHandleFactory* fileHandleFactory_;
const ConnectorQueryCtx* const connectorQueryCtx_;
folly::Executor* executor_;
};

Expand Down
Loading

0 comments on commit 92dd1b6

Please sign in to comment.