Skip to content

Commit

Permalink
Clean up selective file reader framework (facebookincubator#9704)
Browse files Browse the repository at this point in the history
Summary:

Extract reusable common code, remove unused code, and add `const`
qualifiers where suitable.  No change in functionality.

Differential Revision: D56945236
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 6, 2024
1 parent 9fde0a2 commit 7a9becd
Show file tree
Hide file tree
Showing 18 changed files with 241 additions and 300 deletions.
166 changes: 135 additions & 31 deletions velox/dwio/common/ColumnVisitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,32 @@ struct DropValues {
}
};

template <typename TReader>
struct ExtractToReader {
using HookType = dwio::common::NoHook;
static constexpr bool kSkipNulls = false;
explicit ExtractToReader(TReader* readerIn) : reader(readerIn) {}
explicit ExtractToReader(SelectiveColumnReader* readerIn)
: reader_(readerIn) {}

bool acceptsNulls() const {
return true;
}

template <typename T>
void addNull(vector_size_t rowIndex);
void addNull(vector_size_t /*rowIndex*/) {
reader_->template addNull<T>();
}

template <typename V>
void addValue(vector_size_t /*rowIndex*/, V value) {
reader->addValue(value);
reader_->addValue(value);
}

TReader* reader;

dwio::common::NoHook& hook() {
return noHook();
}

private:
SelectiveColumnReader* reader_;
};

template <typename THook>
Expand Down Expand Up @@ -150,6 +153,7 @@ class ColumnVisitor {
using DataType = T;
static constexpr bool dense = isDense;
static constexpr bool kHasBulkPath = true;

ColumnVisitor(
TFilter& filter,
SelectiveColumnReader* reader,
Expand Down Expand Up @@ -269,7 +273,7 @@ class ColumnVisitor {
}
if (++rowIndex_ >= numRows_) {
atEnd = true;
return rows_[numRows_ - 1] - previous;
return rowAt(numRows_ - 1) - previous;
}
if (TFilter::deterministic && isDense) {
return 0;
Expand Down Expand Up @@ -301,7 +305,7 @@ class ColumnVisitor {
if (isDense) {
return 0;
}
return currentRow() - rows_[rowIndex_ - 1] - 1;
return currentRow() - rowAt(rowIndex_ - 1) - 1;
}

FOLLY_ALWAYS_INLINE vector_size_t process(T value, bool& atEnd) {
Expand All @@ -314,7 +318,7 @@ class ColumnVisitor {
}
if (++rowIndex_ >= numRows_) {
atEnd = true;
return rows_[numRows_ - 1] - previous;
return rowAt(numRows_ - 1) - previous;
}
return currentRow() - previous - 1;
}
Expand All @@ -331,7 +335,7 @@ class ColumnVisitor {
if (isDense) {
return 0;
}
return currentRow() - rows_[rowIndex_ - 1] - 1;
return currentRow() - rowAt(rowIndex_ - 1) - 1;
}

// Returns space for 'size' items of T for a scan to fill. The scan
Expand All @@ -341,26 +345,30 @@ class ColumnVisitor {
return reader_->mutableValues<T>(size);
}

int32_t numRows() const {
return reader_->numRows();
}

SelectiveColumnReader& reader() const {
return *reader_;
}

inline vector_size_t rowAt(vector_size_t index) {
inline vector_size_t rowAt(vector_size_t index) const {
if (isDense) {
return index;
}
return rows_[index];
}

bool atEnd() {
vector_size_t rowIndex() const {
return rowIndex_;
}

void setRowIndex(vector_size_t index) {
rowIndex_ = index;
}

bool atEnd() const {
return rowIndex_ >= numRows_;
}

vector_size_t currentRow() {
vector_size_t currentRow() const {
if (isDense) {
return rowIndex_;
}
Expand All @@ -371,7 +379,7 @@ class ColumnVisitor {
return rows_;
}

vector_size_t numRows() {
vector_size_t numRows() const {
return numRows_;
}

Expand Down Expand Up @@ -504,12 +512,6 @@ inline void ColumnVisitor<T, TFilter, ExtractValues, isDense>::addOutputRow(
reader_->addOutputRow(row);
}

template <typename TReader>
template <typename T>
void ExtractToReader<TReader>::addNull(vector_size_t /*rowIndex*/) {
reader->template addNull<T>();
}

enum FilterResult { kUnknown = 0x40, kSuccess = 0x80, kFailure = 0 };

namespace detail {
Expand Down Expand Up @@ -1390,13 +1392,6 @@ class DirectRleColumnVisitor
rows,
values) {}

// Use for replacing all rows with non-null rows for fast path with
// processRun and processRle.
void setRows(folly::Range<const int32_t*> newRows) {
super::rows_ = newRows.data();
super::numRows_ = newRows.size();
}

// Processes 'numInput' T's in 'input'. Sets 'values' and
// 'numValues'' to the resulting values. 'scatterRows' may be
// non-null if there is no filter and the decoded values should be
Expand Down Expand Up @@ -1479,4 +1474,113 @@ class DirectRleColumnVisitor
}
};

template <bool kSeparateNulls>
class StringColumnReadWithVisitorHelper {
public:
StringColumnReadWithVisitorHelper(SelectiveColumnReader& reader, RowSet rows)
: reader_(reader), rows_(rows) {}

template <typename F>
auto operator()(F&& readWithVisitor) {
const bool isDense = rows_.back() == rows_.size() - 1;
if (reader_.scanSpec()->keepValues()) {
if (auto* hook = reader_.scanSpec()->valueHook()) {
if (isDense) {
readHelper<velox::common::AlwaysTrue, true>(
&alwaysTrue(),
ExtractToGenericHook(hook),
std::forward<F>(readWithVisitor));
} else {
readHelper<velox::common::AlwaysTrue, false>(
&alwaysTrue(),
ExtractToGenericHook(hook),
std::forward<F>(readWithVisitor));
}
} else {
if (isDense) {
processFilter<true>(
ExtractToReader(&reader_), std::forward<F>(readWithVisitor));
} else {
processFilter<false>(
ExtractToReader(&reader_), std::forward<F>(readWithVisitor));
}
}
} else {
if (isDense) {
processFilter<true>(DropValues(), std::forward<F>(readWithVisitor));
} else {
processFilter<false>(DropValues(), std::forward<F>(readWithVisitor));
}
}
}

private:
template <typename TFilter, bool kIsDense, typename ExtractValues, typename F>
void readHelper(
velox::common::Filter* filter,
ExtractValues extractValues,
F readWithVisitor) {
readWithVisitor(
ColumnVisitor<folly::StringPiece, TFilter, ExtractValues, kIsDense>(
*static_cast<TFilter*>(filter), &reader_, rows_, extractValues));
}

template <bool kIsDense, typename ExtractValues, typename F>
void processFilter(ExtractValues extractValues, F&& readWithVisitor) {
auto* filter = reader_.scanSpec()->filter();
if (filter == nullptr) {
readHelper<velox::common::AlwaysTrue, kIsDense>(
&alwaysTrue(), extractValues, std::forward<F>(readWithVisitor));
return;
}
switch (filter->kind()) {
case velox::common::FilterKind::kAlwaysTrue:
readHelper<velox::common::AlwaysTrue, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kIsNull:
if constexpr (kSeparateNulls) {
reader_.filterNulls<StringView>(
rows_, true, !std::is_same_v<ExtractValues, DropValues>);
} else {
readHelper<velox::common::IsNull, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
}
break;
case velox::common::FilterKind::kIsNotNull:
if constexpr (
kSeparateNulls && std::is_same_v<ExtractValues, DropValues>) {
reader_.filterNulls<StringView>(rows_, false, false);
} else {
readHelper<velox::common::IsNotNull, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
}
break;
case velox::common::FilterKind::kBytesRange:
readHelper<velox::common::BytesRange, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kNegatedBytesRange:
readHelper<velox::common::NegatedBytesRange, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kBytesValues:
readHelper<velox::common::BytesValues, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kNegatedBytesValues:
readHelper<velox::common::NegatedBytesValues, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
default:
readHelper<velox::common::Filter, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
}
}

SelectiveColumnReader& reader_;
const RowSet rows_;
};

} // namespace facebook::velox::dwio::common
2 changes: 1 addition & 1 deletion velox/dwio/common/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FormatData {

template <typename T>
T& as() {
return *reinterpret_cast<T*>(this);
return *static_cast<T*>(this);
}

/// Reads nulls if the format has nulls separate from the encoded
Expand Down
43 changes: 30 additions & 13 deletions velox/dwio/common/SelectiveByteRleColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class SelectiveByteRleColumnReader : public SelectiveColumnReader {

void getValues(RowSet rows, VectorPtr* result) override;

template <typename Reader, bool isDense, typename ExtractValues>
template <
typename Reader,
bool isDense,
bool kSeparateNulls,
typename ExtractValues>
void processFilter(
velox::common::Filter* filter,
ExtractValues extractValues,
Expand All @@ -58,7 +62,7 @@ class SelectiveByteRleColumnReader : public SelectiveColumnReader {
RowSet rows,
ExtractValues extractValues);

template <typename Reader>
template <typename Reader, bool kSeparateNulls>
void
readCommon(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls);
};
Expand All @@ -78,7 +82,11 @@ void SelectiveByteRleColumnReader::readHelper(
*reinterpret_cast<TFilter*>(filter), this, rows, extractValues));
}

template <typename Reader, bool isDense, typename ExtractValues>
template <
typename Reader,
bool isDense,
bool kSeparateNulls,
typename ExtractValues>
void SelectiveByteRleColumnReader::processFilter(
velox::common::Filter* filter,
ExtractValues extractValues,
Expand All @@ -90,13 +98,20 @@ void SelectiveByteRleColumnReader::processFilter(
filter, rows, extractValues);
break;
case FilterKind::kIsNull:
filterNulls<int8_t>(
rows,
true,
!std::is_same_v<decltype(extractValues), dwio::common::DropValues>);
if constexpr (kSeparateNulls) {
filterNulls<int8_t>(
rows,
true,
!std::is_same_v<decltype(extractValues), dwio::common::DropValues>);
} else {
readHelper<Reader, velox::common::IsNull, isDense>(
filter, rows, extractValues);
}
break;
case FilterKind::kIsNotNull:
if (std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
if constexpr (
kSeparateNulls &&
std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<int8_t>(rows, false, false);
} else {
readHelper<Reader, velox::common::IsNotNull, isDense>(
Expand Down Expand Up @@ -148,7 +163,7 @@ void SelectiveByteRleColumnReader::processValueHook(
}
}

template <typename Reader>
template <typename Reader, bool kSeparateNulls>
void SelectiveByteRleColumnReader::readCommon(
vector_size_t offset,
RowSet rows,
Expand All @@ -167,17 +182,19 @@ void SelectiveByteRleColumnReader::readCommon(
return;
}
if (isDense) {
processFilter<Reader, true>(
processFilter<Reader, true, kSeparateNulls>(
filter, dwio::common::ExtractToReader(this), rows);
} else {
processFilter<Reader, false>(
processFilter<Reader, false, kSeparateNulls>(
filter, dwio::common::ExtractToReader(this), rows);
}
} else {
if (isDense) {
processFilter<Reader, true>(filter, dwio::common::DropValues(), rows);
processFilter<Reader, true, kSeparateNulls>(
filter, dwio::common::DropValues(), rows);
} else {
processFilter<Reader, false>(filter, dwio::common::DropValues(), rows);
processFilter<Reader, false, kSeparateNulls>(
filter, dwio::common::DropValues(), rows);
}
}
}
Expand Down
Loading

0 comments on commit 7a9becd

Please sign in to comment.