Skip to content

Commit

Permalink
Clean up selective file reader framework (#9704)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #9704

Extract reusable common code, remove unused code, and add `const`
qualifiers where suitable.  No change in functionality.

bypass-github-export-checks

Reviewed By: oerling

Differential Revision: D56945236

fbshipit-source-id: ac93a82769996de780f084e1e9cce785704f25d8
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 7, 2024
1 parent b7bacaf commit 0c4dad1
Show file tree
Hide file tree
Showing 23 changed files with 306 additions and 305 deletions.
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,10 @@ void configureRowReaderOptions(
} else {
cs = std::make_shared<dwio::common::ColumnSelector>(rowType, columnNames);
}
rowReaderOptions.select(cs).range(hiveSplit->start, hiveSplit->length);
rowReaderOptions.select(cs);
if (hiveSplit) {
rowReaderOptions.range(hiveSplit->start, hiveSplit->length);
}
}

namespace {
Expand Down
180 changes: 149 additions & 31 deletions velox/dwio/common/ColumnVisitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,32 @@ struct DropValues {
}
};

template <typename TReader>
struct ExtractToReader {
using HookType = dwio::common::NoHook;
static constexpr bool kSkipNulls = false;
explicit ExtractToReader(TReader* readerIn) : reader(readerIn) {}
explicit ExtractToReader(SelectiveColumnReader* readerIn)
: reader_(readerIn) {}

bool acceptsNulls() const {
return true;
}

template <typename T>
void addNull(vector_size_t rowIndex);
void addNull(vector_size_t /*rowIndex*/) {
reader_->template addNull<T>();
}

template <typename V>
void addValue(vector_size_t /*rowIndex*/, V value) {
reader->addValue(value);
reader_->addValue(value);
}

TReader* reader;

dwio::common::NoHook& hook() {
return noHook();
}

private:
SelectiveColumnReader* reader_;
};

template <typename THook>
Expand Down Expand Up @@ -150,6 +153,7 @@ class ColumnVisitor {
using DataType = T;
static constexpr bool dense = isDense;
static constexpr bool kHasBulkPath = true;

ColumnVisitor(
TFilter& filter,
SelectiveColumnReader* reader,
Expand All @@ -163,6 +167,20 @@ class ColumnVisitor {
rowIndex_(0),
values_(values) {}

template <bool isDense2 = isDense, std::enable_if_t<isDense2, int> = 0>
ColumnVisitor(
TFilter& filter,
SelectiveColumnReader* reader,
vector_size_t numRows,
ExtractValues values)
: filter_(filter),
reader_(reader),
allowNulls_(!TFilter::deterministic || filter.testNull()),
rows_(nullptr),
numRows_(numRows),
rowIndex_(0),
values_(values) {}

bool allowNulls() {
if (ExtractValues::kSkipNulls && TFilter::deterministic) {
return false;
Expand Down Expand Up @@ -269,7 +287,7 @@ class ColumnVisitor {
}
if (++rowIndex_ >= numRows_) {
atEnd = true;
return rows_[numRows_ - 1] - previous;
return rowAt(numRows_ - 1) - previous;
}
if (TFilter::deterministic && isDense) {
return 0;
Expand Down Expand Up @@ -301,7 +319,7 @@ class ColumnVisitor {
if (isDense) {
return 0;
}
return currentRow() - rows_[rowIndex_ - 1] - 1;
return currentRow() - rowAt(rowIndex_ - 1) - 1;
}

FOLLY_ALWAYS_INLINE vector_size_t process(T value, bool& atEnd) {
Expand All @@ -314,7 +332,7 @@ class ColumnVisitor {
}
if (++rowIndex_ >= numRows_) {
atEnd = true;
return rows_[numRows_ - 1] - previous;
return rowAt(numRows_ - 1) - previous;
}
return currentRow() - previous - 1;
}
Expand All @@ -331,7 +349,7 @@ class ColumnVisitor {
if (isDense) {
return 0;
}
return currentRow() - rows_[rowIndex_ - 1] - 1;
return currentRow() - rowAt(rowIndex_ - 1) - 1;
}

// Returns space for 'size' items of T for a scan to fill. The scan
Expand All @@ -341,26 +359,30 @@ class ColumnVisitor {
return reader_->mutableValues<T>(size);
}

int32_t numRows() const {
return reader_->numRows();
}

SelectiveColumnReader& reader() const {
return *reader_;
}

inline vector_size_t rowAt(vector_size_t index) {
inline vector_size_t rowAt(vector_size_t index) const {
if (isDense) {
return index;
}
return rows_[index];
}

bool atEnd() {
vector_size_t rowIndex() const {
return rowIndex_;
}

void setRowIndex(vector_size_t index) {
rowIndex_ = index;
}

bool atEnd() const {
return rowIndex_ >= numRows_;
}

vector_size_t currentRow() {
vector_size_t currentRow() const {
if (isDense) {
return rowIndex_;
}
Expand All @@ -371,7 +393,7 @@ class ColumnVisitor {
return rows_;
}

vector_size_t numRows() {
vector_size_t numRows() const {
return numRows_;
}

Expand Down Expand Up @@ -504,12 +526,6 @@ inline void ColumnVisitor<T, TFilter, ExtractValues, isDense>::addOutputRow(
reader_->addOutputRow(row);
}

template <typename TReader>
template <typename T>
void ExtractToReader<TReader>::addNull(vector_size_t /*rowIndex*/) {
reader->template addNull<T>();
}

enum FilterResult { kUnknown = 0x40, kSuccess = 0x80, kFailure = 0 };

namespace detail {
Expand Down Expand Up @@ -1390,13 +1406,6 @@ class DirectRleColumnVisitor
rows,
values) {}

// Use for replacing all rows with non-null rows for fast path with
// processRun and processRle.
void setRows(folly::Range<const int32_t*> newRows) {
super::rows_ = newRows.data();
super::numRows_ = newRows.size();
}

// Processes 'numInput' T's in 'input'. Sets 'values' and
// 'numValues'' to the resulting values. 'scatterRows' may be
// non-null if there is no filter and the decoded values should be
Expand Down Expand Up @@ -1479,4 +1488,113 @@ class DirectRleColumnVisitor
}
};

template <bool kEncodingHasNulls>
class StringColumnReadWithVisitorHelper {
public:
StringColumnReadWithVisitorHelper(SelectiveColumnReader& reader, RowSet rows)
: reader_(reader), rows_(rows) {}

template <typename F>
auto operator()(F&& readWithVisitor) {
const bool isDense = rows_.back() == rows_.size() - 1;
if (reader_.scanSpec()->keepValues()) {
if (auto* hook = reader_.scanSpec()->valueHook()) {
if (isDense) {
readHelper<velox::common::AlwaysTrue, true>(
&alwaysTrue(),
ExtractToGenericHook(hook),
std::forward<F>(readWithVisitor));
} else {
readHelper<velox::common::AlwaysTrue, false>(
&alwaysTrue(),
ExtractToGenericHook(hook),
std::forward<F>(readWithVisitor));
}
} else {
if (isDense) {
processFilter<true>(
ExtractToReader(&reader_), std::forward<F>(readWithVisitor));
} else {
processFilter<false>(
ExtractToReader(&reader_), std::forward<F>(readWithVisitor));
}
}
} else {
if (isDense) {
processFilter<true>(DropValues(), std::forward<F>(readWithVisitor));
} else {
processFilter<false>(DropValues(), std::forward<F>(readWithVisitor));
}
}
}

private:
template <typename TFilter, bool kIsDense, typename ExtractValues, typename F>
void readHelper(
velox::common::Filter* filter,
ExtractValues extractValues,
F readWithVisitor) {
readWithVisitor(
ColumnVisitor<folly::StringPiece, TFilter, ExtractValues, kIsDense>(
*static_cast<TFilter*>(filter), &reader_, rows_, extractValues));
}

template <bool kIsDense, typename ExtractValues, typename F>
void processFilter(ExtractValues extractValues, F&& readWithVisitor) {
auto* filter = reader_.scanSpec()->filter();
if (filter == nullptr) {
readHelper<velox::common::AlwaysTrue, kIsDense>(
&alwaysTrue(), extractValues, std::forward<F>(readWithVisitor));
return;
}
switch (filter->kind()) {
case velox::common::FilterKind::kAlwaysTrue:
readHelper<velox::common::AlwaysTrue, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kIsNull:
if constexpr (kEncodingHasNulls) {
reader_.filterNulls<StringView>(
rows_, true, !std::is_same_v<ExtractValues, DropValues>);
} else {
readHelper<velox::common::IsNull, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
}
break;
case velox::common::FilterKind::kIsNotNull:
if constexpr (
kEncodingHasNulls && std::is_same_v<ExtractValues, DropValues>) {
reader_.filterNulls<StringView>(rows_, false, false);
} else {
readHelper<velox::common::IsNotNull, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
}
break;
case velox::common::FilterKind::kBytesRange:
readHelper<velox::common::BytesRange, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kNegatedBytesRange:
readHelper<velox::common::NegatedBytesRange, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kBytesValues:
readHelper<velox::common::BytesValues, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kNegatedBytesValues:
readHelper<velox::common::NegatedBytesValues, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
default:
readHelper<velox::common::Filter, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
}
}

SelectiveColumnReader& reader_;
const RowSet rows_;
};

} // namespace facebook::velox::dwio::common
2 changes: 1 addition & 1 deletion velox/dwio/common/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FormatData {

template <typename T>
T& as() {
return *reinterpret_cast<T*>(this);
return *static_cast<T*>(this);
}

/// Reads nulls if the format has nulls separate from the encoded
Expand Down
Loading

0 comments on commit 0c4dad1

Please sign in to comment.