diff --git a/velox/dwio/common/ColumnVisitors.h b/velox/dwio/common/ColumnVisitors.h index 3ac3d5e219f74..fd0a1542c0719 100644 --- a/velox/dwio/common/ColumnVisitors.h +++ b/velox/dwio/common/ColumnVisitors.h @@ -49,29 +49,32 @@ struct DropValues { } }; -template struct ExtractToReader { using HookType = dwio::common::NoHook; static constexpr bool kSkipNulls = false; - explicit ExtractToReader(TReader* readerIn) : reader(readerIn) {} + explicit ExtractToReader(SelectiveColumnReader* readerIn) + : reader_(readerIn) {} bool acceptsNulls() const { return true; } template - void addNull(vector_size_t rowIndex); + void addNull(vector_size_t /*rowIndex*/) { + reader_->template addNull(); + } template void addValue(vector_size_t /*rowIndex*/, V value) { - reader->addValue(value); + reader_->addValue(value); } - TReader* reader; - dwio::common::NoHook& hook() { return noHook(); } + + private: + SelectiveColumnReader* reader_; }; template @@ -150,6 +153,7 @@ class ColumnVisitor { using DataType = T; static constexpr bool dense = isDense; static constexpr bool kHasBulkPath = true; + ColumnVisitor( TFilter& filter, SelectiveColumnReader* reader, @@ -269,7 +273,7 @@ class ColumnVisitor { } if (++rowIndex_ >= numRows_) { atEnd = true; - return rows_[numRows_ - 1] - previous; + return rowAt(numRows_ - 1) - previous; } if (TFilter::deterministic && isDense) { return 0; @@ -301,7 +305,7 @@ class ColumnVisitor { if (isDense) { return 0; } - return currentRow() - rows_[rowIndex_ - 1] - 1; + return currentRow() - rowAt(rowIndex_ - 1) - 1; } FOLLY_ALWAYS_INLINE vector_size_t process(T value, bool& atEnd) { @@ -314,7 +318,7 @@ class ColumnVisitor { } if (++rowIndex_ >= numRows_) { atEnd = true; - return rows_[numRows_ - 1] - previous; + return rowAt(numRows_ - 1) - previous; } return currentRow() - previous - 1; } @@ -331,7 +335,7 @@ class ColumnVisitor { if (isDense) { return 0; } - return currentRow() - rows_[rowIndex_ - 1] - 1; + return currentRow() - rowAt(rowIndex_ - 1) - 1; } // Returns space for 'size' items of T for a scan to fill. The scan @@ -341,26 +345,30 @@ class ColumnVisitor { return reader_->mutableValues(size); } - int32_t numRows() const { - return reader_->numRows(); - } - SelectiveColumnReader& reader() const { return *reader_; } - inline vector_size_t rowAt(vector_size_t index) { + inline vector_size_t rowAt(vector_size_t index) const { if (isDense) { return index; } return rows_[index]; } + vector_size_t rowIndex() const { + return rowIndex_; + } + + void setRowIndex(vector_size_t index) { + rowIndex_ = index; + } + bool atEnd() { return rowIndex_ >= numRows_; } - vector_size_t currentRow() { + vector_size_t currentRow() const { if (isDense) { return rowIndex_; } @@ -371,7 +379,7 @@ class ColumnVisitor { return rows_; } - vector_size_t numRows() { + vector_size_t numRows() const { return numRows_; } @@ -504,12 +512,6 @@ inline void ColumnVisitor::addOutputRow( reader_->addOutputRow(row); } -template -template -void ExtractToReader::addNull(vector_size_t /*rowIndex*/) { - reader->template addNull(); -} - enum FilterResult { kUnknown = 0x40, kSuccess = 0x80, kFailure = 0 }; namespace detail { @@ -1390,13 +1392,6 @@ class DirectRleColumnVisitor rows, values) {} - // Use for replacing all rows with non-null rows for fast path with - // processRun and processRle. - void setRows(folly::Range newRows) { - super::rows_ = newRows.data(); - super::numRows_ = newRows.size(); - } - // Processes 'numInput' T's in 'input'. Sets 'values' and // 'numValues'' to the resulting values. 'scatterRows' may be // non-null if there is no filter and the decoded values should be @@ -1479,4 +1474,105 @@ class DirectRleColumnVisitor } }; +class StringColumnReadWithVisitorHelper { + public: + StringColumnReadWithVisitorHelper(SelectiveColumnReader& reader, RowSet rows) + : reader_(reader), rows_(rows) {} + + template + auto operator()(F&& readWithVisitor) { + const bool isDense = rows_.back() == rows_.size() - 1; + if (reader_.scanSpec()->keepValues()) { + if (auto* hook = reader_.scanSpec()->valueHook()) { + if (isDense) { + readHelper( + &alwaysTrue(), + ExtractToGenericHook(hook), + std::forward(readWithVisitor)); + } else { + readHelper( + &alwaysTrue(), + ExtractToGenericHook(hook), + std::forward(readWithVisitor)); + } + } + if (isDense) { + processFilter( + ExtractToReader(&reader_), std::forward(readWithVisitor)); + } else { + processFilter( + ExtractToReader(&reader_), std::forward(readWithVisitor)); + } + } else { + if (isDense) { + processFilter(DropValues(), std::forward(readWithVisitor)); + } else { + processFilter(DropValues(), std::forward(readWithVisitor)); + } + } + } + + private: + template + void readHelper( + velox::common::Filter* filter, + ExtractValues extractValues, + F readWithVisitor) { + readWithVisitor( + ColumnVisitor( + *static_cast(filter), &reader_, rows_, extractValues)); + } + + template + void processFilter(ExtractValues extractValues, F&& readWithVisitor) { + auto* filter = reader_.scanSpec()->filter(); + if (filter == nullptr) { + readHelper( + &alwaysTrue(), extractValues, std::forward(readWithVisitor)); + return; + } + switch (filter->kind()) { + case velox::common::FilterKind::kAlwaysTrue: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kIsNull: + reader_.filterNulls( + rows_, true, !std::is_same_v); + break; + case velox::common::FilterKind::kIsNotNull: + if constexpr (std::is_same_v) { + reader_.filterNulls(rows_, false, false); + } else { + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + } + break; + case velox::common::FilterKind::kBytesRange: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kNegatedBytesRange: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kBytesValues: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kNegatedBytesValues: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + default: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + } + } + + SelectiveColumnReader& reader_; + const RowSet rows_; +}; + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/FormatData.h b/velox/dwio/common/FormatData.h index 0348604606465..1f0b5d4426bb8 100644 --- a/velox/dwio/common/FormatData.h +++ b/velox/dwio/common/FormatData.h @@ -34,7 +34,7 @@ class FormatData { template T& as() { - return *reinterpret_cast(this); + return *static_cast(this); } /// Reads nulls if the format has nulls separate from the encoded diff --git a/velox/dwio/common/ReaderFactory.cpp b/velox/dwio/common/ReaderFactory.cpp index 56599c620fd92..e4b82d793ac82 100644 --- a/velox/dwio/common/ReaderFactory.cpp +++ b/velox/dwio/common/ReaderFactory.cpp @@ -33,13 +33,10 @@ ReaderFactoriesMap& readerFactories() { bool registerReaderFactory(std::shared_ptr factory) { [[maybe_unused]] const bool ok = readerFactories().insert({factory->fileFormat(), factory}).second; - // NOTE: re-enable this check after Prestissimo has updated dwrf registration. -#if 0 VELOX_CHECK( ok, "ReaderFactory is already registered for format {}", toString(factory->fileFormat())); -#endif return true; } diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 4b97f4c6f4652..1fdcbc2e2a762 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -294,9 +294,8 @@ class SelectiveColumnReader { template inline void addNull() { VELOX_DCHECK_NE(valueSize_, kNoValueSize); - VELOX_DCHECK_LE( - rawResultNulls_ && rawValues_ && (numValues_ + 1) * valueSize_, - values_->capacity()); + VELOX_DCHECK(rawResultNulls_ && rawValues_); + VELOX_DCHECK_LE((numValues_ + 1) * valueSize_, values_->capacity()); anyNulls_ = true; bits::setNull(rawResultNulls_, numValues_); @@ -441,12 +440,12 @@ class SelectiveColumnReader { isFlatMapValue_ = value; } - protected: // Filters 'rows' according to 'is_null'. Only applies to cases where // scanSpec_->readsNullsOnly() is true. template void filterNulls(RowSet rows, bool isNull, bool extractValues); + protected: template void prepareRead(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls); @@ -670,6 +669,8 @@ inline void SelectiveColumnReader::addValue(const folly::StringPiece value) { addStringValue(value); } +velox::common::AlwaysTrue& alwaysTrue(); + } // namespace facebook::velox::dwio::common namespace facebook::velox::dwio::common { diff --git a/velox/dwio/common/SelectiveColumnReaderInternal.h b/velox/dwio/common/SelectiveColumnReaderInternal.h index 61bcf2b5befa9..ed38c6551fd55 100644 --- a/velox/dwio/common/SelectiveColumnReaderInternal.h +++ b/velox/dwio/common/SelectiveColumnReaderInternal.h @@ -31,8 +31,6 @@ namespace facebook::velox::dwio::common { -velox::common::AlwaysTrue& alwaysTrue(); - class Timer { public: Timer() : startClocks_{folly::hardware_timestamp()} {} diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index 76f17361e6a6f..edd3d02f3f9e3 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -422,9 +422,7 @@ void SelectiveStringDirectColumnReader::readWithVisitor( int32_t current = visitor.start(); constexpr bool isExtract = std::is_same_v && - std::is_same_v< - typename TVisitor::Extract, - dwio::common::ExtractToReader>; + std::is_same_v; auto nulls = nullsInReadRange_ ? nullsInReadRange_->as() : nullptr; if (process::hasAvx2() && isExtract) { @@ -465,73 +463,11 @@ void SelectiveStringDirectColumnReader::readWithVisitor( } } -template -void SelectiveStringDirectColumnReader::readHelper( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - readWithVisitor( - rows, - dwio::common:: - ColumnVisitor( - *reinterpret_cast(filter), this, rows, extractValues)); -} - -template -void SelectiveStringDirectColumnReader::processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - if (filter == nullptr) { - readHelper( - &dwio::common::alwaysTrue(), rows, extractValues); - return; - } - - switch (filter->kind()) { - case common::FilterKind::kAlwaysTrue: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kIsNull: - filterNulls( - rows, - true, - !std::is_same_v); - break; - case common::FilterKind::kIsNotNull: - if (std::is_same_v) { - filterNulls(rows, false, false); - } else { - readHelper(filter, rows, extractValues); - } - break; - case common::FilterKind::kBytesRange: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesRange: - readHelper( - filter, rows, extractValues); - break; - case common::FilterKind::kBytesValues: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesValues: - readHelper( - filter, rows, extractValues); - break; - default: - readHelper(filter, rows, extractValues); - break; - } -} - void SelectiveStringDirectColumnReader::read( vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) { prepareRead(offset, rows, incomingNulls); - bool isDense = rows.back() == rows.size() - 1; - auto numRows = rows.back() + 1; auto numNulls = nullsInReadRange_ ? BaseVector::countNulls(nullsInReadRange_, 0, numRows) @@ -542,38 +478,8 @@ void SelectiveStringDirectColumnReader::read( lengths_->asMutable(), numRows - numNulls); rawLengths_ = lengths_->as(); lengthIndex_ = 0; - if (scanSpec_->keepValues()) { - if (scanSpec_->valueHook()) { - if (isDense) { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } else { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } - } else { - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } - } - } else { - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } - } - + dwio::common::StringColumnReadWithVisitorHelper( + *this, rows)([&](auto visitor) { readWithVisitor(rows, visitor); }); readOffset_ += numRows; } diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h index 21fe4a3a25e53..cfa8a7350136b 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h @@ -66,15 +66,6 @@ class SelectiveStringDirectColumnReader template void readWithVisitor(RowSet rows, TVisitor visitor); - template - void readHelper(common::Filter* filter, RowSet rows, ExtractValues values); - - template - void processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues); - void extractCrossBuffers( const int32_t* lengths, const int32_t* starts, diff --git a/velox/dwio/parquet/reader/StringColumnReader.cpp b/velox/dwio/parquet/reader/StringColumnReader.cpp index 334c3c02a7e20..eeca2f8489330 100644 --- a/velox/dwio/parquet/reader/StringColumnReader.cpp +++ b/velox/dwio/parquet/reader/StringColumnReader.cpp @@ -31,104 +31,15 @@ uint64_t StringColumnReader::skip(uint64_t numValues) { return numValues; } -template -void StringColumnReader::readHelper( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - formatData_->as().readWithVisitor( - dwio::common:: - ColumnVisitor( - *reinterpret_cast(filter), this, rows, extractValues)); -} - -template -void StringColumnReader::processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - if (filter == nullptr) { - readHelper( - &dwio::common::alwaysTrue(), rows, extractValues); - return; - } - - switch (filter->kind()) { - case common::FilterKind::kAlwaysTrue: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kIsNull: - filterNulls( - rows, - true, - !std::is_same:: - value); - break; - case common::FilterKind::kIsNotNull: - if (std::is_same:: - value) { - filterNulls(rows, false, false); - } else { - readHelper(filter, rows, extractValues); - } - break; - case common::FilterKind::kBytesRange: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesRange: - readHelper( - filter, rows, extractValues); - break; - case common::FilterKind::kBytesValues: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesValues: - readHelper( - filter, rows, extractValues); - break; - default: - readHelper(filter, rows, extractValues); - break; - } -} - void StringColumnReader::read( vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) { prepareRead(offset, rows, incomingNulls); - bool isDense = rows.back() == rows.size() - 1; - if (scanSpec_->keepValues()) { - if (scanSpec_->valueHook()) { - if (isDense) { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } else { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } - return; - } - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } - } else { - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } - } + dwio::common::StringColumnReadWithVisitorHelper( + *this, rows)([&](auto visitor) { + formatData_->as().readWithVisitor(visitor); + }); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/reader/StringColumnReader.h b/velox/dwio/parquet/reader/StringColumnReader.h index 23269fda84462..e9bedc2365fc8 100644 --- a/velox/dwio/parquet/reader/StringColumnReader.h +++ b/velox/dwio/parquet/reader/StringColumnReader.h @@ -49,27 +49,6 @@ class StringColumnReader : public dwio::common::SelectiveColumnReader { void getValues(RowSet rows, VectorPtr* result) override; void dedictionarize() override; - - private: - template - void skipInDecode(int32_t numValues, int32_t current, const uint64_t* nulls); - - folly::StringPiece readValue(int32_t length); - - template - void decode(const uint64_t* nulls, Visitor visitor); - - template - void readWithVisitor(RowSet rows, TVisitor visitor); - - template - void readHelper(common::Filter* filter, RowSet rows, ExtractValues values); - - template - void processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues); }; } // namespace facebook::velox::parquet