Skip to content

Commit

Permalink
Clean up file reader framework
Browse files Browse the repository at this point in the history
Summary:
Extract reusable common code, remove unused code, and add `const`
qualifiers where suitable.  No change in functionality.

Differential Revision: D56945236
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 3, 2024
1 parent f54787b commit 4a2653c
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 260 deletions.
156 changes: 126 additions & 30 deletions velox/dwio/common/ColumnVisitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,32 @@ struct DropValues {
}
};

template <typename TReader>
struct ExtractToReader {
using HookType = dwio::common::NoHook;
static constexpr bool kSkipNulls = false;
explicit ExtractToReader(TReader* readerIn) : reader(readerIn) {}
explicit ExtractToReader(SelectiveColumnReader* readerIn)
: reader_(readerIn) {}

bool acceptsNulls() const {
return true;
}

template <typename T>
void addNull(vector_size_t rowIndex);
void addNull(vector_size_t rowIndex) {
reader_->template addNull<T>();
}

template <typename V>
void addValue(vector_size_t /*rowIndex*/, V value) {
reader->addValue(value);
reader_->addValue(value);
}

TReader* reader;

dwio::common::NoHook& hook() {
return noHook();
}

private:
SelectiveColumnReader* reader_;
};

template <typename THook>
Expand Down Expand Up @@ -150,6 +153,7 @@ class ColumnVisitor {
using DataType = T;
static constexpr bool dense = isDense;
static constexpr bool kHasBulkPath = true;

ColumnVisitor(
TFilter& filter,
SelectiveColumnReader* reader,
Expand Down Expand Up @@ -269,7 +273,7 @@ class ColumnVisitor {
}
if (++rowIndex_ >= numRows_) {
atEnd = true;
return rows_[numRows_ - 1] - previous;
return rowAt(numRows_ - 1) - previous;
}
if (TFilter::deterministic && isDense) {
return 0;
Expand Down Expand Up @@ -301,7 +305,7 @@ class ColumnVisitor {
if (isDense) {
return 0;
}
return currentRow() - rows_[rowIndex_ - 1] - 1;
return currentRow() - rowAt(rowIndex_ - 1) - 1;
}

FOLLY_ALWAYS_INLINE vector_size_t process(T value, bool& atEnd) {
Expand All @@ -314,7 +318,7 @@ class ColumnVisitor {
}
if (++rowIndex_ >= numRows_) {
atEnd = true;
return rows_[numRows_ - 1] - previous;
return rowAt(numRows_ - 1) - previous;
}
return currentRow() - previous - 1;
}
Expand All @@ -331,7 +335,7 @@ class ColumnVisitor {
if (isDense) {
return 0;
}
return currentRow() - rows_[rowIndex_ - 1] - 1;
return currentRow() - rowAt(rowIndex_ - 1) - 1;
}

// Returns space for 'size' items of T for a scan to fill. The scan
Expand All @@ -341,26 +345,30 @@ class ColumnVisitor {
return reader_->mutableValues<T>(size);
}

int32_t numRows() const {
return reader_->numRows();
}

SelectiveColumnReader& reader() const {
return *reader_;
}

inline vector_size_t rowAt(vector_size_t index) {
inline vector_size_t rowAt(vector_size_t index) const {
if (isDense) {
return index;
}
return rows_[index];
}

vector_size_t rowIndex() const {
return rowIndex_;
}

void setRowIndex(vector_size_t index) {
rowIndex_ = index;
}

bool atEnd() {
return rowIndex_ >= numRows_;
}

vector_size_t currentRow() {
vector_size_t currentRow() const {
if (isDense) {
return rowIndex_;
}
Expand All @@ -371,7 +379,7 @@ class ColumnVisitor {
return rows_;
}

vector_size_t numRows() {
vector_size_t numRows() const {
return numRows_;
}

Expand Down Expand Up @@ -504,12 +512,6 @@ inline void ColumnVisitor<T, TFilter, ExtractValues, isDense>::addOutputRow(
reader_->addOutputRow(row);
}

template <typename TReader>
template <typename T>
void ExtractToReader<TReader>::addNull(vector_size_t /*rowIndex*/) {
reader->template addNull<T>();
}

enum FilterResult { kUnknown = 0x40, kSuccess = 0x80, kFailure = 0 };

namespace detail {
Expand Down Expand Up @@ -1390,13 +1392,6 @@ class DirectRleColumnVisitor
rows,
values) {}

// Use for replacing all rows with non-null rows for fast path with
// processRun and processRle.
void setRows(folly::Range<const int32_t*> newRows) {
super::rows_ = newRows.data();
super::numRows_ = newRows.size();
}

// Processes 'numInput' T's in 'input'. Sets 'values' and
// 'numValues'' to the resulting values. 'scatterRows' may be
// non-null if there is no filter and the decoded values should be
Expand Down Expand Up @@ -1479,4 +1474,105 @@ class DirectRleColumnVisitor
}
};

class StringColumnReadWithVisitorHelper {
public:
StringColumnReadWithVisitorHelper(SelectiveColumnReader& reader, RowSet rows)
: reader_(reader), rows_(rows) {}

template <typename F>
auto operator()(F&& readWithVisitor) {
const bool isDense = rows_.back() == rows_.size() - 1;
if (reader_.scanSpec()->keepValues()) {
if (auto* hook = reader_.scanSpec()->valueHook()) {
if (isDense) {
readHelper<velox::common::AlwaysTrue, true>(
&alwaysTrue(),
ExtractToGenericHook(hook),
std::forward<F>(readWithVisitor));
} else {
readHelper<velox::common::AlwaysTrue, false>(
&alwaysTrue(),
ExtractToGenericHook(hook),
std::forward<F>(readWithVisitor));
}
}
if (isDense) {
processFilter<true>(
ExtractToReader(&reader_), std::forward<F>(readWithVisitor));
} else {
processFilter<false>(
ExtractToReader(&reader_), std::forward<F>(readWithVisitor));
}
} else {
if (isDense) {
processFilter<true>(DropValues(), std::forward<F>(readWithVisitor));
} else {
processFilter<false>(DropValues(), std::forward<F>(readWithVisitor));
}
}
}

private:
template <typename TFilter, bool kIsDense, typename ExtractValues, typename F>
void readHelper(
velox::common::Filter* filter,
ExtractValues extractValues,
F readWithVisitor) {
readWithVisitor(
ColumnVisitor<folly::StringPiece, TFilter, ExtractValues, kIsDense>(
*static_cast<TFilter*>(filter), &reader_, rows_, extractValues));
}

template <bool kIsDense, typename ExtractValues, typename F>
void processFilter(ExtractValues extractValues, F&& readWithVisitor) {
auto* filter = reader_.scanSpec()->filter();
if (filter == nullptr) {
readHelper<velox::common::AlwaysTrue, kIsDense>(
&alwaysTrue(), extractValues, std::forward<F>(readWithVisitor));
return;
}
switch (filter->kind()) {
case velox::common::FilterKind::kAlwaysTrue:
readHelper<velox::common::AlwaysTrue, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kIsNull:
reader_.filterNulls<StringView>(
rows_, true, !std::is_same_v<ExtractValues, DropValues>);
break;
case velox::common::FilterKind::kIsNotNull:
if constexpr (std::is_same_v<ExtractValues, DropValues>) {
reader_.filterNulls<StringView>(rows_, false, false);
} else {
readHelper<velox::common::IsNotNull, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
}
break;
case velox::common::FilterKind::kBytesRange:
readHelper<velox::common::BytesRange, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kNegatedBytesRange:
readHelper<velox::common::NegatedBytesRange, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kBytesValues:
readHelper<velox::common::BytesValues, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
case velox::common::FilterKind::kNegatedBytesValues:
readHelper<velox::common::NegatedBytesValues, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
default:
readHelper<velox::common::Filter, kIsDense>(
filter, extractValues, std::forward<F>(readWithVisitor));
break;
}
}

SelectiveColumnReader& reader_;
const RowSet rows_;
};

} // namespace facebook::velox::dwio::common
2 changes: 1 addition & 1 deletion velox/dwio/common/FormatData.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FormatData {

template <typename T>
T& as() {
return *reinterpret_cast<T*>(this);
return *static_cast<T*>(this);
}

/// Reads nulls if the format has nulls separate from the encoded
Expand Down
3 changes: 0 additions & 3 deletions velox/dwio/common/ReaderFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ ReaderFactoriesMap& readerFactories() {
bool registerReaderFactory(std::shared_ptr<ReaderFactory> factory) {
[[maybe_unused]] const bool ok =
readerFactories().insert({factory->fileFormat(), factory}).second;
// NOTE: re-enable this check after Prestissimo has updated dwrf registration.
#if 0
VELOX_CHECK(
ok,
"ReaderFactory is already registered for format {}",
toString(factory->fileFormat()));
#endif
return true;
}

Expand Down
9 changes: 5 additions & 4 deletions velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,8 @@ class SelectiveColumnReader {
template <typename T>
inline void addNull() {
VELOX_DCHECK_NE(valueSize_, kNoValueSize);
VELOX_DCHECK_LE(
rawResultNulls_ && rawValues_ && (numValues_ + 1) * valueSize_,
values_->capacity());
VELOX_DCHECK(rawResultNulls_ && rawValues_);
VELOX_DCHECK_LE((numValues_ + 1) * valueSize_, values_->capacity());

anyNulls_ = true;
bits::setNull(rawResultNulls_, numValues_);
Expand Down Expand Up @@ -441,12 +440,12 @@ class SelectiveColumnReader {
isFlatMapValue_ = value;
}

protected:
// Filters 'rows' according to 'is_null'. Only applies to cases where
// scanSpec_->readsNullsOnly() is true.
template <typename T>
void filterNulls(RowSet rows, bool isNull, bool extractValues);

protected:
template <typename T>
void
prepareRead(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls);
Expand Down Expand Up @@ -670,6 +669,8 @@ inline void SelectiveColumnReader::addValue(const folly::StringPiece value) {
addStringValue(value);
}

velox::common::AlwaysTrue& alwaysTrue();

} // namespace facebook::velox::dwio::common

namespace facebook::velox::dwio::common {
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/common/SelectiveColumnReaderInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

namespace facebook::velox::dwio::common {

velox::common::AlwaysTrue& alwaysTrue();

class Timer {
public:
Timer() : startClocks_{folly::hardware_timestamp()} {}
Expand Down
Loading

0 comments on commit 4a2653c

Please sign in to comment.