Skip to content

Commit

Permalink
Optimize reading from table with large number of columns (facebookinc…
Browse files Browse the repository at this point in the history
…ubator#10145)

Summary:
Pull Request resolved: facebookincubator#10145

We see table scan using more CPU than Java on tables with large number
of columns (> 7000).  Some optimizations are implemented to improve the
performance of a typical query from 24.07 hours to 4.7 hours (Java 10.02 hours).

- Remove usage of `ColumnSelector` from selective readers
- Use `Type` instead of `TypeWithId` for `requestedType`
- Do not populate file `TypeWithId` where the column is not selected
- Use arena for `StripeFooter`

Reviewed By: oerling

Differential Revision: D58364580
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jun 12, 2024
1 parent 9cc123f commit e90a332
Show file tree
Hide file tree
Showing 49 changed files with 288 additions and 195 deletions.
2 changes: 1 addition & 1 deletion velox/common/base/BitSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BitSet {
bits::setBit(bits_.data(), bit, true);
}

bool contains(uint32_t index) {
bool contains(uint32_t index) const {
uint64_t bit = index - min_;
if (bit >= bits_.size() * 64) {
// If index was < min_, bit will have wrapped around and will be >
Expand Down
3 changes: 1 addition & 2 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,7 @@ void configureRowReaderOptions(
}
rowReaderOptions.setScanSpec(scanSpec);
rowReaderOptions.setMetadataFilter(metadataFilter);
rowReaderOptions.select(
dwio::common::ColumnSelector::fromScanSpec(*scanSpec, rowType));
rowReaderOptions.setRequestedType(rowType);
rowReaderOptions.range(hiveSplit->start, hiveSplit->length);
}

Expand Down
41 changes: 21 additions & 20 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,21 @@ void SplitReader::configureReaderOptions(
hiveTableHandle_,
hiveSplit_);
baseReaderOpts_.setRandomSkip(std::move(randomSkip));
baseReaderOpts_.setScanSpec(scanSpec_);
}

void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader();
createReader(metadataFilter, rowIndexColumn);

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader(metadataFilter, rowIndexColumn);
createRowReader();
}

uint64_t SplitReader::next(uint64_t size, VectorPtr& output) {
Expand Down Expand Up @@ -219,7 +220,9 @@ std::string SplitReader::toString() const {
static_cast<const void*>(baseRowReader_.get()));
}

void SplitReader::createReader() {
void SplitReader::createReader(
const std::shared_ptr<common::MetadataFilter>& metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
VELOX_CHECK_NE(
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand Down Expand Up @@ -256,6 +259,20 @@ void SplitReader::createReader() {

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.fileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);

auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
if (rowIndexColumn != nullptr) {
setRowIndexColumn(fileType, rowIndexColumn);
}
configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
}

bool SplitReader::checkIfSplitIsEmpty(
Expand Down Expand Up @@ -287,23 +304,7 @@ bool SplitReader::checkIfSplitIsEmpty(
return emptySplit_;
}

void SplitReader::createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
if (rowIndexColumn != nullptr) {
setRowIndexColumn(fileType, rowIndexColumn);
}

configureRowReaderOptions(
baseRowReaderOpts_,
hiveTableHandle_->tableParameters(),
scanSpec_,
metadataFilter,
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
void SplitReader::createRowReader() {
// NOTE: we firstly reset the finished 'baseRowReader_' of previous split
// before setting up for the next one to avoid doubling the peak memory usage.
baseRowReader_.reset();
Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ class SplitReader {
protected:
/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
void createReader();
void createReader(
const std::shared_ptr<common::MetadataFilter>& metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);

/// Check if the hiveSplit_ is empty. The split is considered empty when
/// 1) The data file is missing but the user chooses to ignore it
Expand All @@ -127,9 +129,7 @@ class SplitReader {

/// Create the dwio::common::RowReader object baseRowReader_, which owns the
/// ColumnReaders that will be used to read the data
void createRowReader(
std::shared_ptr<common::MetadataFilter> metadataFilter,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn);
void createRowReader();

/// Different table formats may have different meatadata columns.
/// This function will be used to update the scanSpec for these columns.
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats,
const std::shared_ptr<HiveColumnHandle>& rowIndexColumn) {
createReader();
createReader(metadataFilter, rowIndexColumn);

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
return;
}

createRowReader(metadataFilter, rowIndexColumn);
createRowReader();

std::shared_ptr<const HiveIcebergSplit> icebergSplit =
std::dynamic_pointer_cast<const HiveIcebergSplit>(hiveSplit_);
Expand Down
5 changes: 0 additions & 5 deletions velox/connectors/hive/tests/HiveConnectorUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ TEST_F(HiveConnectorUtilTest, configureRowReaderOptions) {
float_features->childByName(common::ScanSpec::kMapKeysFieldName)
->setFilter(common::createBigintValues({1, 3}, false));
float_features->setFlatMapFeatureSelection({"1", "3"});
RowReaderOptions options;
configureRowReaderOptions(options, {}, spec, nullptr, rowType, split);
auto& nodes = options.getSelector()->getProjection();
ASSERT_EQ(nodes.size(), 1);
ASSERT_EQ(nodes[0].expression, "[1,3]");
}

} // namespace facebook::velox::connector
1 change: 0 additions & 1 deletion velox/dwio/common/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/common/ScanSpec.h"
#include "velox/dwio/common/SeekableInputStream.h"
#include "velox/dwio/common/Statistics.h"
Expand Down
25 changes: 24 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class RowReaderOptions {
bool returnFlatVector_ = false;
ErrorTolerance errorTolerance_;
std::shared_ptr<ColumnSelector> selector_;
RowTypePtr requestedType_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_ = nullptr;
std::shared_ptr<velox::common::MetadataFilter> metadataFilter_;
// Node id for map column to a list of keys to be projected as a struct.
Expand Down Expand Up @@ -177,6 +178,10 @@ class RowReaderOptions {
*/
RowReaderOptions& select(const std::shared_ptr<ColumnSelector>& selector) {
selector_ = selector;
if (selector) {
VELOX_CHECK_NULL(requestedType_);
requestedType_ = selector->getSchema();
}
return *this;
}

Expand Down Expand Up @@ -295,6 +300,15 @@ class RowReaderOptions {
return errorTolerance_;
}

const RowTypePtr& requestedType() const {
return requestedType_;
}

void setRequestedType(RowTypePtr requestedType) {
VELOX_CHECK_NULL(selector_);
requestedType_ = std::move(requestedType);
}

const std::shared_ptr<velox::common::ScanSpec>& getScanSpec() const {
return scanSpec_;
}
Expand Down Expand Up @@ -578,7 +592,7 @@ class ReaderOptions : public io::ReaderOptions {
}

void setRandomSkip(std::shared_ptr<random::RandomSkipTracker> randomSkip) {
randomSkip_ = randomSkip;
randomSkip_ = std::move(randomSkip);
}

bool noCacheRetention() const {
Expand All @@ -589,6 +603,14 @@ class ReaderOptions : public io::ReaderOptions {
noCacheRetention_ = noCacheRetention;
}

const std::shared_ptr<velox::common::ScanSpec>& scanSpec() const {
return scanSpec_;
}

void setScanSpec(std::shared_ptr<velox::common::ScanSpec> scanSpec) {
scanSpec_ = std::move(scanSpec);
}

private:
uint64_t tailLocation_;
FileFormat fileFormat_;
Expand All @@ -601,6 +623,7 @@ class ReaderOptions : public io::ReaderOptions {
bool useColumnNamesForColumnMapping_{false};
std::shared_ptr<folly::Executor> ioExecutor_;
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
};

struct WriterOptions {
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/common/memory/Memory.h"
#include "velox/common/process/ProcessBase.h"
#include "velox/common/process/TraceHistory.h"
#include "velox/dwio/common/ColumnSelector.h"
#include "velox/dwio/common/FormatData.h"
#include "velox/dwio/common/IntDecoder.h"
#include "velox/dwio/common/Mutation.h"
Expand Down
18 changes: 8 additions & 10 deletions velox/dwio/common/SelectiveRepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,15 @@ RowSet SelectiveRepeatedColumnReader::applyFilter(RowSet rows) {
}

SelectiveListColumnReader::SelectiveListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec)
: SelectiveRepeatedColumnReader(
fileType->type(),
params,
scanSpec,
fileType),
requestedType_{requestedType} {}
fileType) {}

uint64_t SelectiveListColumnReader::skip(uint64_t numValues) {
numValues = formatData_->skipNulls(numValues);
Expand Down Expand Up @@ -241,28 +240,27 @@ void SelectiveListColumnReader::read(

void SelectiveListColumnReader::getValues(RowSet rows, VectorPtr* result) {
VELOX_DCHECK_NOT_NULL(result);
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
prepareResult(*result, requestedType_, rows.size(), &memoryPool_);
auto* resultArray = result->get()->asUnchecked<ArrayVector>();
makeOffsetsAndSizes(rows, *resultArray);
result->get()->setNulls(resultNulls());
if (child_ && !nestedRows_.empty()) {
auto& elements = resultArray->elements();
prepareStructResult(requestedType_->type()->childAt(0), &elements);
prepareStructResult(requestedType_->childAt(0), &elements);
child_->getValues(nestedRows_, &elements);
}
}

SelectiveMapColumnReader::SelectiveMapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec)
: SelectiveRepeatedColumnReader(
fileType->type(),
params,
scanSpec,
fileType),
requestedType_{requestedType} {}
fileType) {}

uint64_t SelectiveMapColumnReader::skip(uint64_t numValues) {
numValues = formatData_->skipNulls(numValues);
Expand Down Expand Up @@ -326,7 +324,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
!result->get() || result->get()->type()->isMap(),
"Expect MAP result vector, got {}",
result->get()->type()->toString());
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
prepareResult(*result, requestedType_, rows.size(), &memoryPool_);
auto* resultMap = result->get()->asUnchecked<MapVector>();
makeOffsetsAndSizes(rows, *resultMap);
result->get()->setNulls(resultNulls());
Expand All @@ -337,7 +335,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
if (!nestedRows_.empty()) {
keyReader_->getValues(nestedRows_, &resultMap->mapKeys());
auto& values = resultMap->mapValues();
prepareStructResult(requestedType_->type()->childAt(1), &values);
prepareStructResult(requestedType_->childAt(1), &values);
elementReader_->getValues(nestedRows_, &values);
}
}
Expand Down
6 changes: 2 additions & 4 deletions velox/dwio/common/SelectiveRepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SelectiveRepeatedColumnReader : public SelectiveColumnReader {
class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {
public:
SelectiveListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec);
Expand All @@ -103,13 +103,12 @@ class SelectiveListColumnReader : public SelectiveRepeatedColumnReader {

protected:
std::unique_ptr<SelectiveColumnReader> child_;
const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;
};

class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {
public:
SelectiveMapColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec);
Expand All @@ -128,7 +127,6 @@ class SelectiveMapColumnReader : public SelectiveRepeatedColumnReader {

std::unique_ptr<SelectiveColumnReader> keyReader_;
std::unique_ptr<SelectiveColumnReader> elementReader_;
const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;
};

} // namespace facebook::velox::dwio::common
11 changes: 4 additions & 7 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,12 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
static constexpr int32_t kConstantChildSpecSubscript = -1;

SelectiveStructColumnReaderBase(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
FormatParams& params,
velox::common::ScanSpec& scanSpec,
bool isRoot = false)
: SelectiveColumnReader(fileType->type(), fileType, params, scanSpec),
requestedType_(requestedType),
: SelectiveColumnReader(requestedType, fileType, params, scanSpec),
debugString_(
getExceptionContext().message(VeloxException::Type::kSystem)),
isRoot_(isRoot) {}
Expand All @@ -138,8 +137,6 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {

void fillOutputRowsFromMutation(vector_size_t size);

const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;

std::vector<SelectiveColumnReader*> children_;

// Sequence number of output batch. Checked against ColumnLoaders
Expand Down Expand Up @@ -210,7 +207,7 @@ class SelectiveFlatMapColumnReaderHelper {
reader_.children_[i] = keyNodes_[i].reader.get();
reader_.children_[i]->setIsFlatMapValue(true);
}
if (auto type = reader_.requestedType_->type()->childAt(1); type->isRow()) {
if (auto type = reader_.requestedType_->childAt(1); type->isRow()) {
childValues_ = BaseVector::create(type, 0, &reader_.memoryPool_);
}
}
Expand All @@ -228,7 +225,7 @@ class SelectiveFlatMapColumnReaderHelper {
} else {
VLOG(1) << "Reallocating result MAP vector of size " << size;
result = BaseVector::create(
reader_.requestedType_->type(), size, &reader_.memoryPool_);
reader_.requestedType_, size, &reader_.memoryPool_);
}
return *result->asUnchecked<MapVector>();
}
Expand Down
Loading

0 comments on commit e90a332

Please sign in to comment.