From 0c4dad14ad9c0a934e67323825b2f2953f6546b4 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 6 May 2024 17:27:20 -0700 Subject: [PATCH] Clean up selective file reader framework (#9704) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9704 Extract reusable common code, remove unused code, and add `const` qualifiers where suitable. No change in functionality. bypass-github-export-checks Reviewed By: oerling Differential Revision: D56945236 fbshipit-source-id: ac93a82769996de780f084e1e9cce785704f25d8 --- velox/connectors/hive/HiveConnectorUtil.cpp | 5 +- velox/dwio/common/ColumnVisitors.h | 180 +++++++++++++++--- velox/dwio/common/FormatData.h | 2 +- .../common/SelectiveByteRleColumnReader.h | 43 +++-- velox/dwio/common/SelectiveColumnReader.h | 9 +- .../common/SelectiveColumnReaderInternal.h | 2 - .../SelectiveFloatingPointColumnReader.h | 39 ++-- .../common/SelectiveIntegerColumnReader.h | 41 ++-- velox/dwio/common/TypeWithId.cpp | 33 ++++ velox/dwio/common/TypeWithId.h | 2 + velox/dwio/dwrf/reader/DwrfReader.cpp | 12 +- velox/dwio/dwrf/reader/DwrfReader.h | 2 +- .../reader/SelectiveByteRleColumnReader.h | 2 +- .../SelectiveFloatingPointColumnReader.h | 2 +- ...SelectiveIntegerDictionaryColumnReader.cpp | 2 +- .../SelectiveIntegerDirectColumnReader.cpp | 2 +- .../SelectiveStringDirectColumnReader.cpp | 100 +--------- .../SelectiveStringDirectColumnReader.h | 9 - .../dwio/parquet/reader/BooleanColumnReader.h | 2 +- .../reader/FloatingPointColumnReader.h | 2 +- .../dwio/parquet/reader/IntegerColumnReader.h | 2 +- .../parquet/reader/StringColumnReader.cpp | 97 +--------- .../dwio/parquet/reader/StringColumnReader.h | 21 -- 23 files changed, 306 insertions(+), 305 deletions(-) diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 09099c37d188..d8cfe275fd71 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -568,7 +568,10 @@ void configureRowReaderOptions( } else { cs = std::make_shared(rowType, columnNames); } - rowReaderOptions.select(cs).range(hiveSplit->start, hiveSplit->length); + rowReaderOptions.select(cs); + if (hiveSplit) { + rowReaderOptions.range(hiveSplit->start, hiveSplit->length); + } } namespace { diff --git a/velox/dwio/common/ColumnVisitors.h b/velox/dwio/common/ColumnVisitors.h index 3ac3d5e219f7..cc81f4505fee 100644 --- a/velox/dwio/common/ColumnVisitors.h +++ b/velox/dwio/common/ColumnVisitors.h @@ -49,29 +49,32 @@ struct DropValues { } }; -template struct ExtractToReader { using HookType = dwio::common::NoHook; static constexpr bool kSkipNulls = false; - explicit ExtractToReader(TReader* readerIn) : reader(readerIn) {} + explicit ExtractToReader(SelectiveColumnReader* readerIn) + : reader_(readerIn) {} bool acceptsNulls() const { return true; } template - void addNull(vector_size_t rowIndex); + void addNull(vector_size_t /*rowIndex*/) { + reader_->template addNull(); + } template void addValue(vector_size_t /*rowIndex*/, V value) { - reader->addValue(value); + reader_->addValue(value); } - TReader* reader; - dwio::common::NoHook& hook() { return noHook(); } + + private: + SelectiveColumnReader* reader_; }; template @@ -150,6 +153,7 @@ class ColumnVisitor { using DataType = T; static constexpr bool dense = isDense; static constexpr bool kHasBulkPath = true; + ColumnVisitor( TFilter& filter, SelectiveColumnReader* reader, @@ -163,6 +167,20 @@ class ColumnVisitor { rowIndex_(0), values_(values) {} + template = 0> + ColumnVisitor( + TFilter& filter, + SelectiveColumnReader* reader, + vector_size_t numRows, + ExtractValues values) + : filter_(filter), + reader_(reader), + allowNulls_(!TFilter::deterministic || filter.testNull()), + rows_(nullptr), + numRows_(numRows), + rowIndex_(0), + values_(values) {} + bool allowNulls() { if (ExtractValues::kSkipNulls && TFilter::deterministic) { return false; @@ -269,7 +287,7 @@ class ColumnVisitor { } if (++rowIndex_ >= numRows_) { atEnd = true; - return rows_[numRows_ - 1] - previous; + return rowAt(numRows_ - 1) - previous; } if (TFilter::deterministic && isDense) { return 0; @@ -301,7 +319,7 @@ class ColumnVisitor { if (isDense) { return 0; } - return currentRow() - rows_[rowIndex_ - 1] - 1; + return currentRow() - rowAt(rowIndex_ - 1) - 1; } FOLLY_ALWAYS_INLINE vector_size_t process(T value, bool& atEnd) { @@ -314,7 +332,7 @@ class ColumnVisitor { } if (++rowIndex_ >= numRows_) { atEnd = true; - return rows_[numRows_ - 1] - previous; + return rowAt(numRows_ - 1) - previous; } return currentRow() - previous - 1; } @@ -331,7 +349,7 @@ class ColumnVisitor { if (isDense) { return 0; } - return currentRow() - rows_[rowIndex_ - 1] - 1; + return currentRow() - rowAt(rowIndex_ - 1) - 1; } // Returns space for 'size' items of T for a scan to fill. The scan @@ -341,26 +359,30 @@ class ColumnVisitor { return reader_->mutableValues(size); } - int32_t numRows() const { - return reader_->numRows(); - } - SelectiveColumnReader& reader() const { return *reader_; } - inline vector_size_t rowAt(vector_size_t index) { + inline vector_size_t rowAt(vector_size_t index) const { if (isDense) { return index; } return rows_[index]; } - bool atEnd() { + vector_size_t rowIndex() const { + return rowIndex_; + } + + void setRowIndex(vector_size_t index) { + rowIndex_ = index; + } + + bool atEnd() const { return rowIndex_ >= numRows_; } - vector_size_t currentRow() { + vector_size_t currentRow() const { if (isDense) { return rowIndex_; } @@ -371,7 +393,7 @@ class ColumnVisitor { return rows_; } - vector_size_t numRows() { + vector_size_t numRows() const { return numRows_; } @@ -504,12 +526,6 @@ inline void ColumnVisitor::addOutputRow( reader_->addOutputRow(row); } -template -template -void ExtractToReader::addNull(vector_size_t /*rowIndex*/) { - reader->template addNull(); -} - enum FilterResult { kUnknown = 0x40, kSuccess = 0x80, kFailure = 0 }; namespace detail { @@ -1390,13 +1406,6 @@ class DirectRleColumnVisitor rows, values) {} - // Use for replacing all rows with non-null rows for fast path with - // processRun and processRle. - void setRows(folly::Range newRows) { - super::rows_ = newRows.data(); - super::numRows_ = newRows.size(); - } - // Processes 'numInput' T's in 'input'. Sets 'values' and // 'numValues'' to the resulting values. 'scatterRows' may be // non-null if there is no filter and the decoded values should be @@ -1479,4 +1488,113 @@ class DirectRleColumnVisitor } }; +template +class StringColumnReadWithVisitorHelper { + public: + StringColumnReadWithVisitorHelper(SelectiveColumnReader& reader, RowSet rows) + : reader_(reader), rows_(rows) {} + + template + auto operator()(F&& readWithVisitor) { + const bool isDense = rows_.back() == rows_.size() - 1; + if (reader_.scanSpec()->keepValues()) { + if (auto* hook = reader_.scanSpec()->valueHook()) { + if (isDense) { + readHelper( + &alwaysTrue(), + ExtractToGenericHook(hook), + std::forward(readWithVisitor)); + } else { + readHelper( + &alwaysTrue(), + ExtractToGenericHook(hook), + std::forward(readWithVisitor)); + } + } else { + if (isDense) { + processFilter( + ExtractToReader(&reader_), std::forward(readWithVisitor)); + } else { + processFilter( + ExtractToReader(&reader_), std::forward(readWithVisitor)); + } + } + } else { + if (isDense) { + processFilter(DropValues(), std::forward(readWithVisitor)); + } else { + processFilter(DropValues(), std::forward(readWithVisitor)); + } + } + } + + private: + template + void readHelper( + velox::common::Filter* filter, + ExtractValues extractValues, + F readWithVisitor) { + readWithVisitor( + ColumnVisitor( + *static_cast(filter), &reader_, rows_, extractValues)); + } + + template + void processFilter(ExtractValues extractValues, F&& readWithVisitor) { + auto* filter = reader_.scanSpec()->filter(); + if (filter == nullptr) { + readHelper( + &alwaysTrue(), extractValues, std::forward(readWithVisitor)); + return; + } + switch (filter->kind()) { + case velox::common::FilterKind::kAlwaysTrue: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kIsNull: + if constexpr (kEncodingHasNulls) { + reader_.filterNulls( + rows_, true, !std::is_same_v); + } else { + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + } + break; + case velox::common::FilterKind::kIsNotNull: + if constexpr ( + kEncodingHasNulls && std::is_same_v) { + reader_.filterNulls(rows_, false, false); + } else { + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + } + break; + case velox::common::FilterKind::kBytesRange: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kNegatedBytesRange: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kBytesValues: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + case velox::common::FilterKind::kNegatedBytesValues: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + default: + readHelper( + filter, extractValues, std::forward(readWithVisitor)); + break; + } + } + + SelectiveColumnReader& reader_; + const RowSet rows_; +}; + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/FormatData.h b/velox/dwio/common/FormatData.h index 034860460646..1f0b5d4426bb 100644 --- a/velox/dwio/common/FormatData.h +++ b/velox/dwio/common/FormatData.h @@ -34,7 +34,7 @@ class FormatData { template T& as() { - return *reinterpret_cast(this); + return *static_cast(this); } /// Reads nulls if the format has nulls separate from the encoded diff --git a/velox/dwio/common/SelectiveByteRleColumnReader.h b/velox/dwio/common/SelectiveByteRleColumnReader.h index 06aae1c4986b..67537ea8d8b8 100644 --- a/velox/dwio/common/SelectiveByteRleColumnReader.h +++ b/velox/dwio/common/SelectiveByteRleColumnReader.h @@ -39,7 +39,11 @@ class SelectiveByteRleColumnReader : public SelectiveColumnReader { void getValues(RowSet rows, VectorPtr* result) override; - template + template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> void processFilter( velox::common::Filter* filter, ExtractValues extractValues, @@ -58,7 +62,7 @@ class SelectiveByteRleColumnReader : public SelectiveColumnReader { RowSet rows, ExtractValues extractValues); - template + template void readCommon(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls); }; @@ -78,7 +82,11 @@ void SelectiveByteRleColumnReader::readHelper( *reinterpret_cast(filter), this, rows, extractValues)); } -template +template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> void SelectiveByteRleColumnReader::processFilter( velox::common::Filter* filter, ExtractValues extractValues, @@ -90,13 +98,20 @@ void SelectiveByteRleColumnReader::processFilter( filter, rows, extractValues); break; case FilterKind::kIsNull: - filterNulls( - rows, - true, - !std::is_same_v); + if constexpr (kEncodingHasNulls) { + filterNulls( + rows, + true, + !std::is_same_v); + } else { + readHelper( + filter, rows, extractValues); + } break; case FilterKind::kIsNotNull: - if (std::is_same_v) { + if constexpr ( + kEncodingHasNulls && + std::is_same_v) { filterNulls(rows, false, false); } else { readHelper( @@ -148,7 +163,7 @@ void SelectiveByteRleColumnReader::processValueHook( } } -template +template void SelectiveByteRleColumnReader::readCommon( vector_size_t offset, RowSet rows, @@ -167,17 +182,19 @@ void SelectiveByteRleColumnReader::readCommon( return; } if (isDense) { - processFilter( + processFilter( filter, dwio::common::ExtractToReader(this), rows); } else { - processFilter( + processFilter( filter, dwio::common::ExtractToReader(this), rows); } } else { if (isDense) { - processFilter(filter, dwio::common::DropValues(), rows); + processFilter( + filter, dwio::common::DropValues(), rows); } else { - processFilter(filter, dwio::common::DropValues(), rows); + processFilter( + filter, dwio::common::DropValues(), rows); } } } diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 4b97f4c6f465..1fdcbc2e2a76 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -294,9 +294,8 @@ class SelectiveColumnReader { template inline void addNull() { VELOX_DCHECK_NE(valueSize_, kNoValueSize); - VELOX_DCHECK_LE( - rawResultNulls_ && rawValues_ && (numValues_ + 1) * valueSize_, - values_->capacity()); + VELOX_DCHECK(rawResultNulls_ && rawValues_); + VELOX_DCHECK_LE((numValues_ + 1) * valueSize_, values_->capacity()); anyNulls_ = true; bits::setNull(rawResultNulls_, numValues_); @@ -441,12 +440,12 @@ class SelectiveColumnReader { isFlatMapValue_ = value; } - protected: // Filters 'rows' according to 'is_null'. Only applies to cases where // scanSpec_->readsNullsOnly() is true. template void filterNulls(RowSet rows, bool isNull, bool extractValues); + protected: template void prepareRead(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls); @@ -670,6 +669,8 @@ inline void SelectiveColumnReader::addValue(const folly::StringPiece value) { addStringValue(value); } +velox::common::AlwaysTrue& alwaysTrue(); + } // namespace facebook::velox::dwio::common namespace facebook::velox::dwio::common { diff --git a/velox/dwio/common/SelectiveColumnReaderInternal.h b/velox/dwio/common/SelectiveColumnReaderInternal.h index 61bcf2b5befa..ed38c6551fd5 100644 --- a/velox/dwio/common/SelectiveColumnReaderInternal.h +++ b/velox/dwio/common/SelectiveColumnReaderInternal.h @@ -31,8 +31,6 @@ namespace facebook::velox::dwio::common { -velox::common::AlwaysTrue& alwaysTrue(); - class Timer { public: Timer() : startClocks_{folly::hardware_timestamp()} {} diff --git a/velox/dwio/common/SelectiveFloatingPointColumnReader.h b/velox/dwio/common/SelectiveFloatingPointColumnReader.h index 61ccd7e4d8b6..ea2455afa0c8 100644 --- a/velox/dwio/common/SelectiveFloatingPointColumnReader.h +++ b/velox/dwio/common/SelectiveFloatingPointColumnReader.h @@ -40,7 +40,7 @@ class SelectiveFloatingPointColumnReader : public SelectiveColumnReader { return std::is_same_v; } - template + template void readCommon(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls); @@ -57,7 +57,11 @@ class SelectiveFloatingPointColumnReader : public SelectiveColumnReader { void readHelper(velox::common::Filter* filter, RowSet rows, ExtractValues values); - template + template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> void processFilter( velox::common::Filter* filter, RowSet rows, @@ -84,7 +88,11 @@ void SelectiveFloatingPointColumnReader::readHelper( } template -template +template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> void SelectiveFloatingPointColumnReader::processFilter( velox::common::Filter* filter, RowSet rows, @@ -101,11 +109,18 @@ void SelectiveFloatingPointColumnReader::processFilter( filter, rows, extractValues); break; case velox::common::FilterKind::kIsNull: - filterNulls( - rows, true, !std::is_same_v); + if constexpr (kEncodingHasNulls) { + filterNulls( + rows, true, !std::is_same_v); + } else { + readHelper( + filter, rows, extractValues); + } break; case velox::common::FilterKind::kIsNotNull: - if (std::is_same_v) { + if constexpr ( + kEncodingHasNulls && + std::is_same_v) { filterNulls(rows, false, false); } else { readHelper( @@ -163,7 +178,7 @@ void SelectiveFloatingPointColumnReader::processValueHook( } template -template +template void SelectiveFloatingPointColumnReader::readCommon( vector_size_t offset, RowSet rows, @@ -179,18 +194,20 @@ void SelectiveFloatingPointColumnReader::readCommon( } } else { if (isDense) { - processFilter( + processFilter( scanSpec_->filter(), rows, ExtractToReader(this)); } else { - processFilter( + processFilter( scanSpec_->filter(), rows, ExtractToReader(this)); } } } else { if (isDense) { - processFilter(scanSpec_->filter(), rows, DropValues()); + processFilter( + scanSpec_->filter(), rows, DropValues()); } else { - processFilter(scanSpec_->filter(), rows, DropValues()); + processFilter( + scanSpec_->filter(), rows, DropValues()); } } } diff --git a/velox/dwio/common/SelectiveIntegerColumnReader.h b/velox/dwio/common/SelectiveIntegerColumnReader.h index ba4b63e168ca..444f341e06b9 100644 --- a/velox/dwio/common/SelectiveIntegerColumnReader.h +++ b/velox/dwio/common/SelectiveIntegerColumnReader.h @@ -41,7 +41,11 @@ class SelectiveIntegerColumnReader : public SelectiveColumnReader { protected: // Switches based on filter type between different readHelper instantiations. - template + template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> void processFilter( velox::common::Filter* filter, ExtractValues extractValues, @@ -66,7 +70,7 @@ class SelectiveIntegerColumnReader : public SelectiveColumnReader { // The common part of integer reading. calls the appropriate // instantiation of processValueHook or processFilter based on // possible value hook, filter and denseness. - template + template void readCommon(RowSet rows); }; @@ -113,7 +117,11 @@ void SelectiveIntegerColumnReader::readHelper( } } -template +template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> void SelectiveIntegerColumnReader::processFilter( velox::common::Filter* filter, ExtractValues extractValues, @@ -130,11 +138,18 @@ void SelectiveIntegerColumnReader::processFilter( filter, rows, extractValues); break; case velox::common::FilterKind::kIsNull: - filterNulls( - rows, true, !std::is_same_v); + if constexpr (kEncodingHasNulls) { + filterNulls( + rows, true, !std::is_same_v); + } else { + readHelper( + filter, rows, extractValues); + } break; case velox::common::FilterKind::kIsNotNull: - if (std::is_same_v) { + if constexpr ( + kEncodingHasNulls && + std::is_same_v) { filterNulls(rows, false, false); } else { readHelper( @@ -211,7 +226,7 @@ void SelectiveIntegerColumnReader::processValueHook( } } -template +template void SelectiveIntegerColumnReader::readCommon(RowSet rows) { bool isDense = rows.back() == rows.size() - 1; velox::common::Filter* filter = @@ -225,16 +240,20 @@ void SelectiveIntegerColumnReader::readCommon(RowSet rows) { } } else { if (isDense) { - processFilter(filter, ExtractToReader(this), rows); + processFilter( + filter, ExtractToReader(this), rows); } else { - processFilter(filter, ExtractToReader(this), rows); + processFilter( + filter, ExtractToReader(this), rows); } } } else { if (isDense) { - processFilter(filter, DropValues(), rows); + processFilter( + filter, DropValues(), rows); } else { - processFilter(filter, DropValues(), rows); + processFilter( + filter, DropValues(), rows); } } } diff --git a/velox/dwio/common/TypeWithId.cpp b/velox/dwio/common/TypeWithId.cpp index 6b803a6249da..c4c9b4f8d36f 100644 --- a/velox/dwio/common/TypeWithId.cpp +++ b/velox/dwio/common/TypeWithId.cpp @@ -86,4 +86,37 @@ std::unique_ptr TypeWithId::create( type, std::move(children), myId, maxId, column); } +std::string TypeWithId::fullName() const { + std::vector path; + auto* child = this; + while (child->parent_) { + switch (child->parent_->type()->kind()) { + case TypeKind::ROW: + VELOX_CHECK( + child == child->parent_->children_.at(child->column_).get()); + path.push_back( + '.' + child->parent_->type()->asRow().nameOf(child->column_)); + break; + case TypeKind::ARRAY: + break; + case TypeKind::MAP: + if (child == child->parent_->children_.at(0).get()) { + path.push_back("."); + } else { + VELOX_CHECK(child == child->children_.at(1).get()); + path.push_back("."); + } + break; + default: + VELOX_UNREACHABLE(); + } + child = parent_; + } + std::string ans = ""; + for (int i = path.size() - 1; i >= 0; --i) { + ans += path[i]; + } + return ans; +} + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/TypeWithId.h b/velox/dwio/common/TypeWithId.h index 39a988f9936a..5c5fbc5d070c 100644 --- a/velox/dwio/common/TypeWithId.h +++ b/velox/dwio/common/TypeWithId.h @@ -73,6 +73,8 @@ class TypeWithId : public velox::Tree> { return children_; } + std::string fullName() const; + private: static std::unique_ptr create( const std::shared_ptr& type, diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 483bde0be4a0..5ca3c2b041df 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -316,6 +316,7 @@ uint64_t DwrfRowReader::seekToRow(uint64_t rowNumber) { if (isEmptyFile()) { return 0; } + nextRowNumber_.reset(); // If we are reading only a portion of the file // (bounded by firstStripe_ and stripeCeiling_), @@ -586,6 +587,9 @@ void DwrfRowReader::readWithRowNumber( } int64_t DwrfRowReader::nextRowNumber() { + if (nextRowNumber_.has_value()) { + return *nextRowNumber_; + } auto strideSize = getReader().getFooter().rowIndexStride(); while (currentStripe_ < stripeCeiling_) { if (currentRowInStripe_ == 0) { @@ -604,20 +608,21 @@ int64_t DwrfRowReader::nextRowNumber() { } checkSkipStrides(strideSize); if (currentRowInStripe_ < rowsInCurrentStripe_) { - return firstRowOfStripe_[currentStripe_] + currentRowInStripe_; + nextRowNumber_ = firstRowOfStripe_[currentStripe_] + currentRowInStripe_; + return *nextRowNumber_; } advanceToNextStripe: ++currentStripe_; currentRowInStripe_ = 0; currentUnit_ = nullptr; } - atEnd_ = true; + nextRowNumber_ = kAtEnd; return kAtEnd; } int64_t DwrfRowReader::nextReadSize(uint64_t size) { VELOX_DCHECK_GT(size, 0); - if (atEnd_) { + if (nextRowNumber() == kAtEnd) { return kAtEnd; } auto rowsToRead = std::min(size, rowsInCurrentStripe_ - currentRowInStripe_); @@ -646,6 +651,7 @@ uint64_t DwrfRowReader::next( return 0; } auto rowsToRead = nextReadSize(size); + nextRowNumber_.reset(); previousRow_ = nextRow; // Record strideIndex for use by the columnReader_ which may delay actual // reading of the data. diff --git a/velox/dwio/dwrf/reader/DwrfReader.h b/velox/dwio/dwrf/reader/DwrfReader.h index acd496e414de..549746776fa1 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.h +++ b/velox/dwio/dwrf/reader/DwrfReader.h @@ -160,7 +160,7 @@ class DwrfRowReader : public StrideIndexProvider, dwio::common::ColumnReaderStatistics columnReaderStatistics_; - bool atEnd_{false}; + std::optional nextRowNumber_; std::unique_ptr unitLoader_; DwrfUnit* currentUnit_; diff --git a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h index 363082eccd64..8684842c6fa2 100644 --- a/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h @@ -79,7 +79,7 @@ class SelectiveByteRleColumnReader void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) override { - readCommon(offset, rows, incomingNulls); + readCommon(offset, rows, incomingNulls); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h b/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h index 5498ea77e8c5..6cce3ad3ec75 100644 --- a/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveFloatingPointColumnReader.h @@ -49,7 +49,7 @@ class SelectiveFloatingPointColumnReader void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) override { using T = SelectiveFloatingPointColumnReader; - this->template readCommon(offset, rows, incomingNulls); + this->template readCommon(offset, rows, incomingNulls); this->readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp index 649319a6f9da..eaf99cafb937 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDictionaryColumnReader.cpp @@ -105,7 +105,7 @@ void SelectiveIntegerDictionaryColumnReader::read( // lazy load dictionary only when it's needed ensureInitialized(); - readCommon(rows); + readCommon(rows); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.cpp index 57dc53090953..c7bd41bda3be 100644 --- a/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveIntegerDirectColumnReader.cpp @@ -34,7 +34,7 @@ void SelectiveIntegerDirectColumnReader::read( offset, rows, incomingNulls); - readCommon(rows); + readCommon(rows); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp index 76f17361e6a6..adc775d2d101 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp @@ -422,9 +422,7 @@ void SelectiveStringDirectColumnReader::readWithVisitor( int32_t current = visitor.start(); constexpr bool isExtract = std::is_same_v && - std::is_same_v< - typename TVisitor::Extract, - dwio::common::ExtractToReader>; + std::is_same_v; auto nulls = nullsInReadRange_ ? nullsInReadRange_->as() : nullptr; if (process::hasAvx2() && isExtract) { @@ -465,73 +463,11 @@ void SelectiveStringDirectColumnReader::readWithVisitor( } } -template -void SelectiveStringDirectColumnReader::readHelper( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - readWithVisitor( - rows, - dwio::common:: - ColumnVisitor( - *reinterpret_cast(filter), this, rows, extractValues)); -} - -template -void SelectiveStringDirectColumnReader::processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - if (filter == nullptr) { - readHelper( - &dwio::common::alwaysTrue(), rows, extractValues); - return; - } - - switch (filter->kind()) { - case common::FilterKind::kAlwaysTrue: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kIsNull: - filterNulls( - rows, - true, - !std::is_same_v); - break; - case common::FilterKind::kIsNotNull: - if (std::is_same_v) { - filterNulls(rows, false, false); - } else { - readHelper(filter, rows, extractValues); - } - break; - case common::FilterKind::kBytesRange: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesRange: - readHelper( - filter, rows, extractValues); - break; - case common::FilterKind::kBytesValues: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesValues: - readHelper( - filter, rows, extractValues); - break; - default: - readHelper(filter, rows, extractValues); - break; - } -} - void SelectiveStringDirectColumnReader::read( vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) { prepareRead(offset, rows, incomingNulls); - bool isDense = rows.back() == rows.size() - 1; - auto numRows = rows.back() + 1; auto numNulls = nullsInReadRange_ ? BaseVector::countNulls(nullsInReadRange_, 0, numRows) @@ -542,38 +478,8 @@ void SelectiveStringDirectColumnReader::read( lengths_->asMutable(), numRows - numNulls); rawLengths_ = lengths_->as(); lengthIndex_ = 0; - if (scanSpec_->keepValues()) { - if (scanSpec_->valueHook()) { - if (isDense) { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } else { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } - } else { - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } - } - } else { - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } - } - + dwio::common::StringColumnReadWithVisitorHelper( + *this, rows)([&](auto visitor) { readWithVisitor(rows, visitor); }); readOffset_ += numRows; } diff --git a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h index 21fe4a3a25e5..cfa8a7350136 100644 --- a/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h @@ -66,15 +66,6 @@ class SelectiveStringDirectColumnReader template void readWithVisitor(RowSet rows, TVisitor visitor); - template - void readHelper(common::Filter* filter, RowSet rows, ExtractValues values); - - template - void processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues); - void extractCrossBuffers( const int32_t* lengths, const int32_t* starts, diff --git a/velox/dwio/parquet/reader/BooleanColumnReader.h b/velox/dwio/parquet/reader/BooleanColumnReader.h index 41d3405abd54..73126f467988 100644 --- a/velox/dwio/parquet/reader/BooleanColumnReader.h +++ b/velox/dwio/parquet/reader/BooleanColumnReader.h @@ -49,7 +49,7 @@ class BooleanColumnReader : public dwio::common::SelectiveByteRleColumnReader { void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) override { - readCommon(offset, rows, incomingNulls); + readCommon(offset, rows, incomingNulls); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/reader/FloatingPointColumnReader.h b/velox/dwio/parquet/reader/FloatingPointColumnReader.h index be4c2cd84363..ed91e67a739f 100644 --- a/velox/dwio/parquet/reader/FloatingPointColumnReader.h +++ b/velox/dwio/parquet/reader/FloatingPointColumnReader.h @@ -48,7 +48,7 @@ class FloatingPointColumnReader void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) override { using T = FloatingPointColumnReader; - this->template readCommon(offset, rows, incomingNulls); + this->template readCommon(offset, rows, incomingNulls); this->readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/reader/IntegerColumnReader.h b/velox/dwio/parquet/reader/IntegerColumnReader.h index 59a9fc12bf91..d7b458c73953 100644 --- a/velox/dwio/parquet/reader/IntegerColumnReader.h +++ b/velox/dwio/parquet/reader/IntegerColumnReader.h @@ -75,7 +75,7 @@ class IntegerColumnReader : public dwio::common::SelectiveIntegerColumnReader { offset, rows, nullptr); - readCommon(rows); + readCommon(rows); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/reader/StringColumnReader.cpp b/velox/dwio/parquet/reader/StringColumnReader.cpp index 334c3c02a7e2..2dd0250159c3 100644 --- a/velox/dwio/parquet/reader/StringColumnReader.cpp +++ b/velox/dwio/parquet/reader/StringColumnReader.cpp @@ -31,104 +31,15 @@ uint64_t StringColumnReader::skip(uint64_t numValues) { return numValues; } -template -void StringColumnReader::readHelper( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - formatData_->as().readWithVisitor( - dwio::common:: - ColumnVisitor( - *reinterpret_cast(filter), this, rows, extractValues)); -} - -template -void StringColumnReader::processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues) { - if (filter == nullptr) { - readHelper( - &dwio::common::alwaysTrue(), rows, extractValues); - return; - } - - switch (filter->kind()) { - case common::FilterKind::kAlwaysTrue: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kIsNull: - filterNulls( - rows, - true, - !std::is_same:: - value); - break; - case common::FilterKind::kIsNotNull: - if (std::is_same:: - value) { - filterNulls(rows, false, false); - } else { - readHelper(filter, rows, extractValues); - } - break; - case common::FilterKind::kBytesRange: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesRange: - readHelper( - filter, rows, extractValues); - break; - case common::FilterKind::kBytesValues: - readHelper(filter, rows, extractValues); - break; - case common::FilterKind::kNegatedBytesValues: - readHelper( - filter, rows, extractValues); - break; - default: - readHelper(filter, rows, extractValues); - break; - } -} - void StringColumnReader::read( vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) { prepareRead(offset, rows, incomingNulls); - bool isDense = rows.back() == rows.size() - 1; - if (scanSpec_->keepValues()) { - if (scanSpec_->valueHook()) { - if (isDense) { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } else { - readHelper( - &dwio::common::alwaysTrue(), - rows, - dwio::common::ExtractToGenericHook(scanSpec_->valueHook())); - } - return; - } - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::ExtractToReader(this)); - } - } else { - if (isDense) { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } else { - processFilter( - scanSpec_->filter(), rows, dwio::common::DropValues()); - } - } + dwio::common::StringColumnReadWithVisitorHelper( + *this, rows)([&](auto visitor) { + formatData_->as().readWithVisitor(visitor); + }); readOffset_ += rows.back() + 1; } diff --git a/velox/dwio/parquet/reader/StringColumnReader.h b/velox/dwio/parquet/reader/StringColumnReader.h index 23269fda8446..e9bedc2365fc 100644 --- a/velox/dwio/parquet/reader/StringColumnReader.h +++ b/velox/dwio/parquet/reader/StringColumnReader.h @@ -49,27 +49,6 @@ class StringColumnReader : public dwio::common::SelectiveColumnReader { void getValues(RowSet rows, VectorPtr* result) override; void dedictionarize() override; - - private: - template - void skipInDecode(int32_t numValues, int32_t current, const uint64_t* nulls); - - folly::StringPiece readValue(int32_t length); - - template - void decode(const uint64_t* nulls, Visitor visitor); - - template - void readWithVisitor(RowSet rows, TVisitor visitor); - - template - void readHelper(common::Filter* filter, RowSet rows, ExtractValues values); - - template - void processFilter( - common::Filter* filter, - RowSet rows, - ExtractValues extractValues); }; } // namespace facebook::velox::parquet