Skip to content

Commit

Permalink
Optimize reading from table with large number of columns (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#10145)

Summary:
Pull Request resolved: facebookincubator#10145

We see table scan using more CPU than Java on tables with large number
of columns (> 7000).  Some optimizations are implemented to improve the
performance of a typical query from 24.07 hours to 4.7 hours (Java 10.02 hours).

- Remove usage of `ColumnSelector` from selective readers
- Use `Type` instead of `TypeWithId` for `requestedType`
- Do not populate file `TypeWithId` where the column is not selected
- Use arena for `StripeFooter`

bypass-github-export-checks

Reviewed By: oerling

Differential Revision: D58364580
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jun 13, 2024
1 parent 5c123cc commit 7432bea
Show file tree
Hide file tree
Showing 50 changed files with 287 additions and 237 deletions.
2 changes: 1 addition & 1 deletion velox/common/base/BitSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BitSet {
bits::setBit(bits_.data(), bit, true);
}

bool contains(uint32_t index) {
bool contains(uint32_t index) const {
uint64_t bit = index - min_;
if (bit >= bits_.size() * 64) {
// If index was < min_, bit will have wrapped around and will be >
Expand Down
16 changes: 0 additions & 16 deletions velox/common/io/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,6 @@ class ReaderOptions {
autoPreloadLength_(DEFAULT_AUTO_PRELOAD_SIZE),
prefetchMode_(PrefetchMode::PREFETCH) {}

ReaderOptions& operator=(const ReaderOptions& other) {
memoryPool_ = other.memoryPool_;
autoPreloadLength_ = other.autoPreloadLength_;
prefetchMode_ = other.prefetchMode_;
maxCoalesceDistance_ = other.maxCoalesceDistance_;
maxCoalesceBytes_ = other.maxCoalesceBytes_;
prefetchRowGroups_ = other.prefetchRowGroups_;
loadQuantum_ = other.loadQuantum_;
noCacheRetention_ = other.noCacheRetention_;
return *this;
}

ReaderOptions(const ReaderOptions& other) {
*this = other;
}

/// Sets the memory pool for allocation.
ReaderOptions& setMemoryPool(velox::memory::MemoryPool& pool) {
memoryPool_ = &pool;
Expand Down
3 changes: 1 addition & 2 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,7 @@ void configureRowReaderOptions(
}
rowReaderOptions.setScanSpec(scanSpec);
rowReaderOptions.setMetadataFilter(std::move(metadataFilter));
rowReaderOptions.select(
dwio::common::ColumnSelector::fromScanSpec(*scanSpec, rowType));
rowReaderOptions.setRequestedType(rowType);
rowReaderOptions.range(hiveSplit->start, hiveSplit->length);
}

Expand Down
41 changes: 21 additions & 20 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,21 @@ void SplitReader::configureReaderOptions(
hiveTableHandle_,
hiveSplit_);
baseReaderOpts_.setRandomSkip(std::move(randomSkip));
baseReaderOpts_.setScanSpec(scanSpec_);
}

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader();
createReader(std::move(metadataFilter), rowIndexColumn);

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader(std::move(metadataFilter), rowIndexColumn);
createRowReader();
}

uint64_t SplitReader::next(uint64_t size, VectorPtr& output) {
Expand Down Expand Up @@ -218,7 +219,9 @@ std::string SplitReader::toString() const {
static_cast<const void*>(baseRowReader_.get()));
}

void SplitReader::createReader() {
void SplitReader::createReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
VELOX_CHECK_NE(
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand Down Expand Up @@ -255,6 +258,20 @@ void SplitReader::createReader() {

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.fileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);

auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
if (rowIndexColumn != nullptr) {
setRowIndexColumn(fileType, rowIndexColumn);
}
configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
std::move(metadataFilter),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
}

bool SplitReader::checkIfSplitIsEmpty(
Expand Down Expand Up @@ -286,23 +303,7 @@ bool SplitReader::checkIfSplitIsEmpty(
return emptySplit_;
}

void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
if (rowIndexColumn != nullptr) {
setRowIndexColumn(fileType, rowIndexColumn);
}

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
std::move(metadataFilter),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
void SplitReader::createRowReader() {
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ class SplitReader {
protected:
/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
void createReader();
void createReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);

/// Check if the hiveSplit_ is empty. The split is considered empty when
/// 1) The data file is missing but the user chooses to ignore it
Expand All @@ -127,9 +129,7 @@ class SplitReader {

/// Create the dwio::common::RowReader object baseRowReader_, which owns the
/// ColumnReaders that will be used to read the data
void createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);
void createRowReader();

/// Different table formats may have different meatadata columns.
/// This function will be used to update the scanSpec for these columns.
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader();
createReader(std::move(metadataFilter), rowIndexColumn);

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader(metadataFilter, rowIndexColumn);
createRowReader();

std::shared_ptr<const HiveIcebergSplit> icebergSplit =
std::dynamic_pointer_cast<const HiveIcebergSplit>(hiveSplit_);
Expand Down
5 changes: 0 additions & 5 deletions velox/connectors/hive/tests/HiveConnectorUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ TEST_F(HiveConnectorUtilTest, configureRowReaderOptions) {
float_features->childByName(common::ScanSpec::kMapKeysFieldName)
->setFilter(common::createBigintValues({1, 3}, false));
float_features->setFlatMapFeatureSelection({"1", "3"});
RowReaderOptions options;
configureRowReaderOptions(options, {}, spec, nullptr, rowType, split);
auto& nodes = options.getSelector()->getProjection();
ASSERT_EQ(nodes.size(), 1);
ASSERT_EQ(nodes[0].expression, "[1,3]");
}

} // namespace facebook::velox::connector
1 change: 0 additions & 1 deletion velox/dwio/common/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/common/ScanSpec.h"
#include "velox/dwio/common/SeekableInputStream.h"
#include "velox/dwio/common/Statistics.h"
Expand Down
50 changes: 23 additions & 27 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class RowReaderOptions {
bool returnFlatVector_ = false;
ErrorTolerance errorTolerance_;
std::shared_ptr<ColumnSelector> selector_;
RowTypePtr requestedType_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_ = nullptr;
std::shared_ptr<velox::common::MetadataFilter> metadataFilter_;
// Node id for map column to a list of keys to be projected as a struct.
Expand Down Expand Up @@ -177,6 +178,10 @@ class RowReaderOptions {
*/
RowReaderOptions& select(const std::shared_ptr<ColumnSelector>& selector) {
selector_ = selector;
if (selector) {
VELOX_CHECK_NULL(requestedType_);
requestedType_ = selector->getSchema();
}
return *this;
}

Expand Down Expand Up @@ -295,6 +300,15 @@ class RowReaderOptions {
return errorTolerance_;
}

const RowTypePtr& requestedType() const {
return requestedType_;
}

void setRequestedType(RowTypePtr requestedType) {
VELOX_CHECK_NULL(selector_);
requestedType_ = std::move(requestedType);
}

const std::shared_ptr<velox::common::ScanSpec>& getScanSpec() const {
return scanSpec_;
}
Expand Down Expand Up @@ -440,33 +454,6 @@ class ReaderOptions : public io::ReaderOptions {
fileFormat_(FileFormat::UNKNOWN),
fileSchema_(nullptr) {}

ReaderOptions& operator=(const ReaderOptions& other) {
io::ReaderOptions::operator=(other);
tailLocation_ = other.tailLocation_;
fileFormat_ = other.fileFormat_;
fileSchema_ = other.fileSchema_;
serDeOptions_ = other.serDeOptions_;
decrypterFactory_ = other.decrypterFactory_;
footerEstimatedSize_ = other.footerEstimatedSize_;
filePreloadThreshold_ = other.filePreloadThreshold_;
fileColumnNamesReadAsLowerCase_ = other.fileColumnNamesReadAsLowerCase_;
useColumnNamesForColumnMapping_ = other.useColumnNamesForColumnMapping_;
return *this;
}

ReaderOptions(const ReaderOptions& other)
: io::ReaderOptions(other),
tailLocation_(other.tailLocation_),
fileFormat_(other.fileFormat_),
fileSchema_(other.fileSchema_),
serDeOptions_(other.serDeOptions_),
decrypterFactory_(other.decrypterFactory_),
footerEstimatedSize_(other.footerEstimatedSize_),
filePreloadThreshold_(other.filePreloadThreshold_),
fileColumnNamesReadAsLowerCase_(other.fileColumnNamesReadAsLowerCase_),
useColumnNamesForColumnMapping_(other.useColumnNamesForColumnMapping_) {
}

/// Sets the format of the file, such as "rc" or "dwrf". The default is
/// "dwrf".
ReaderOptions& setFileFormat(FileFormat format) {
Expand Down Expand Up @@ -589,6 +576,14 @@ class ReaderOptions : public io::ReaderOptions {
noCacheRetention_ = noCacheRetention;
}

const std::shared_ptr<velox::common::ScanSpec>& scanSpec() const {
return scanSpec_;
}

void setScanSpec(std::shared_ptr<velox::common::ScanSpec> scanSpec) {
scanSpec_ = std::move(scanSpec);
}

private:
uint64_t tailLocation_;
FileFormat fileFormat_;
Expand All @@ -601,6 +596,7 @@ class ReaderOptions : public io::ReaderOptions {
bool useColumnNamesForColumnMapping_{false};
std::shared_ptr<folly::Executor> ioExecutor_;
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
};

struct WriterOptions {
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/common/memory/Memory.h"
#include "velox/common/process/ProcessBase.h"
#include "velox/common/process/TraceHistory.h"
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/common/FormatData.h"
#include "velox/dwio/common/IntDecoder.h"
#include "velox/dwio/common/Mutation.h"
Expand Down
18 changes: 8 additions & 10 deletions velox/dwio/common/SelectiveRepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,15 @@ RowSet SelectiveRepeatedColumnReader::applyFilter(RowSet rows) {
}

SelectiveListColumnReader::SelectiveListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec)
: SelectiveRepeatedColumnReader(
fileType->type(),
params,
scanSpec,
fileType),
requestedType_{requestedType} {}
fileType) {}

uint64_t SelectiveListColumnReader::skip(uint64_t numValues) {
numValues = formatData_->skipNulls(numValues);
Expand Down Expand Up @@ -241,28 +240,27 @@ void SelectiveListColumnReader::read(

void SelectiveListColumnReader::getValues(RowSet rows, VectorPtr* result) {
VELOX_DCHECK_NOT_NULL(result);
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
prepareResult(*result, requestedType_, rows.size(), &memoryPool_);
auto* resultArray = result->get()->asUnchecked<ArrayVector>();
makeOffsetsAndSizes(rows, *resultArray);
result->get()->setNulls(resultNulls());
if (child_ && !nestedRows_.empty()) {
auto& elements = resultArray->elements();
prepareStructResult(requestedType_->type()->childAt(0), &elements);
prepareStructResult(requestedType_->childAt(0), &elements);
child_->getValues(nestedRows_, &elements);
}
}

SelectiveMapColumnReader::SelectiveMapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec)
: SelectiveRepeatedColumnReader(
fileType->type(),
params,
scanSpec,
fileType),
requestedType_{requestedType} {}
fileType) {}

uint64_t SelectiveMapColumnReader::skip(uint64_t numValues) {
numValues = formatData_->skipNulls(numValues);
Expand Down Expand Up @@ -326,7 +324,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
!result->get() || result->get()->type()->isMap(),
"Expect MAP result vector, got {}",
result->get()->type()->toString());
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
prepareResult(*result, requestedType_, rows.size(), &memoryPool_);
auto* resultMap = result->get()->asUnchecked<MapVector>();
makeOffsetsAndSizes(rows, *resultMap);
result->get()->setNulls(resultNulls());
Expand All @@ -337,7 +335,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
if (!nestedRows_.empty()) {
keyReader_->getValues(nestedRows_, &resultMap->mapKeys());
auto& values = resultMap->mapValues();
prepareStructResult(requestedType_->type()->childAt(1), &values);
prepareStructResult(requestedType_->childAt(1), &values);
elementReader_->getValues(nestedRows_, &values);
}
}
Expand Down
6 changes: 2 additions & 4 deletions velox/dwio/common/SelectiveRepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SelectiveRepeatedColumnReader : public SelectiveColumnReader {
class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {
public:
SelectiveListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec);
Expand All @@ -103,13 +103,12 @@ class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {

protected:
std::unique_ptr<SelectiveColumnReader> child_;
const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;
};

class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {
public:
SelectiveMapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec);
Expand All @@ -128,7 +127,6 @@ class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {

std::unique_ptr<SelectiveColumnReader> keyReader_;
std::unique_ptr<SelectiveColumnReader> elementReader_;
const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;
};

} // namespace facebook::velox::dwio::common
Loading

0 comments on commit 7432bea

Please sign in to comment.