diff --git a/velox/dwio/common/Reader.cpp b/velox/dwio/common/Reader.cpp index e8b82b08ddbc4..a619d08a2cb0a 100644 --- a/velox/dwio/common/Reader.cpp +++ b/velox/dwio/common/Reader.cpp @@ -204,11 +204,16 @@ void RowReader::readWithRowNumber( std::vector()); flatRowNum = rowNumVector->asUnchecked>(); } - auto rowOffsets = columnReader->outputRows(); - VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); auto* rawRowNum = flatRowNum->mutableRawValues(); - for (int i = 0; i < rowOffsets.size(); ++i) { - rawRowNum[i] = previousRow + rowOffsets[i]; + if (numChildren == 0 && !mutation) { + VELOX_DCHECK_EQ(rowsToRead, result->size()); + std::iota(rawRowNum, rawRowNum + rowsToRead, previousRow); + } else { + auto rowOffsets = columnReader->outputRows(); + VELOX_DCHECK_EQ(rowOffsets.size(), result->size()); + for (int i = 0; i < rowOffsets.size(); ++i) { + rawRowNum[i] = previousRow + rowOffsets[i]; + } } rowVector = result->asUnchecked(); auto& rowType = rowVector->type()->asRow(); diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 7e4f00244a04e..ff98f47840652 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -452,6 +452,14 @@ class SelectiveColumnReader { return std::nullopt; } + // Whether output rows should be filled when there is no column projected out + // and there is delete mutation. Used for row number generation. The case + // for no delete mutation is handled more efficiently outside column reader in + // `RowReader::readWithRowNumber'. + virtual void setFillMutatedOutputRows(bool /*value*/) { + VELOX_UNREACHABLE("Only struct reader supports this method"); + } + protected: template void diff --git a/velox/dwio/common/SelectiveStructColumnReader.cpp b/velox/dwio/common/SelectiveStructColumnReader.cpp index 07880fc7260bd..4d2314060e133 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.cpp +++ b/velox/dwio/common/SelectiveStructColumnReader.cpp @@ -53,6 +53,31 @@ uint64_t SelectiveStructColumnReaderBase::skip(uint64_t numValues) { return numValues; } +void SelectiveStructColumnReaderBase::fillOutputRowsFromMutation( + vector_size_t size) { + if (mutation_->deletedRows) { + bits::forEachUnsetBit(mutation_->deletedRows, 0, size, [&](auto i) { + if (!mutation_->randomSkip || mutation_->randomSkip->testOne()) { + addOutputRow(i); + } + }); + } else { + VELOX_CHECK(mutation_->randomSkip); + vector_size_t i = 0; + while (i < size) { + auto skip = mutation_->randomSkip->nextSkip(); + auto remaining = size - i; + if (skip >= remaining) { + mutation_->randomSkip->consume(remaining); + break; + } + i += skip; + addOutputRow(i++); + mutation_->randomSkip->consume(skip + 1); + } + } +} + void SelectiveStructColumnReaderBase::next( uint64_t numValues, VectorPtr& result, @@ -60,11 +85,16 @@ void SelectiveStructColumnReaderBase::next( process::TraceContext trace("SelectiveStructColumnReaderBase::next"); if (children_.empty()) { if (mutation) { - if (mutation->deletedRows) { - numValues -= bits::countBits(mutation->deletedRows, 0, numValues); - } - if (mutation->randomSkip) { - numValues *= mutation->randomSkip->sampleRate(); + if (fillMutatedOutputRows_) { + fillOutputRowsFromMutation(numValues); + numValues = outputRows_.size(); + } else { + if (mutation->deletedRows) { + numValues -= bits::countBits(mutation->deletedRows, 0, numValues); + } + if (mutation->randomSkip) { + numValues *= mutation->randomSkip->sampleRate(); + } } } @@ -110,27 +140,7 @@ void SelectiveStructColumnReaderBase::read( VELOX_DCHECK(!nullsInReadRange_, "Only top level can have mutation"); VELOX_DCHECK_EQ( rows.back(), rows.size() - 1, "Top level should have a dense row set"); - if (mutation_->deletedRows) { - bits::forEachUnsetBit( - mutation_->deletedRows, 0, rows.back() + 1, [&](auto i) { - if (!mutation_->randomSkip || mutation_->randomSkip->testOne()) { - addOutputRow(i); - } - }); - } else { - VELOX_CHECK(mutation_->randomSkip); - vector_size_t i = 0; - while (i <= rows.back()) { - auto skip = mutation_->randomSkip->nextSkip(); - if (skip > rows.back() - i) { - mutation_->randomSkip->consume(rows.back() - i + 1); - break; - } - i += skip; - addOutputRow(i++); - mutation_->randomSkip->consume(skip + 1); - } - } + fillOutputRowsFromMutation(rows.size()); if (outputRows_.empty()) { readOffset_ = offset + rows.back() + 1; return; diff --git a/velox/dwio/common/SelectiveStructColumnReader.h b/velox/dwio/common/SelectiveStructColumnReader.h index f7a734374dac4..7c1df6c169296 100644 --- a/velox/dwio/common/SelectiveStructColumnReader.h +++ b/velox/dwio/common/SelectiveStructColumnReader.h @@ -98,6 +98,10 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { return debugString_; } + void setFillMutatedOutputRows(bool value) final { + fillMutatedOutputRows_ = value; + } + protected: template friend class SelectiveFlatMapColumnReaderHelper; @@ -132,6 +136,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { // need to read it). bool isChildConstant(const velox::common::ScanSpec& childSpec) const; + void fillOutputRowsFromMutation(vector_size_t size); + const std::shared_ptr requestedType_; std::vector children_; @@ -151,6 +157,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader { // around for lazy columns. bool hasMutation_ = false; + bool fillMutatedOutputRows_ = false; + // Context information obtained from ExceptionContext. Stored here // so that LazyVector readers under this can add this to their // ExceptionContext. Allows contextualizing reader errors to split diff --git a/velox/dwio/dwrf/reader/DwrfReader.cpp b/velox/dwio/dwrf/reader/DwrfReader.cpp index 83c1258014bfd..c8eedc1b72cd2 100644 --- a/velox/dwio/dwrf/reader/DwrfReader.cpp +++ b/velox/dwio/dwrf/reader/DwrfReader.cpp @@ -166,6 +166,8 @@ void DwrfUnit::ensureDecoders() { flatMapContext, true); // isRoot selectiveColumnReader_->setIsTopLevel(); + selectiveColumnReader_->setFillMutatedOutputRows( + options_.getRowNumberColumnInfo().has_value()); } else { columnReader_ = ColumnReader::build( // enqueue streams requestedType, diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 818d5004c8ff2..bed685d9b5304 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -768,6 +768,8 @@ class ParquetRowReader::Impl { readerBase_->schemaWithId(), // Id is schema id params, *options_.getScanSpec()); + columnReader_->setFillMutatedOutputRows( + options_.getRowNumberColumnInfo().has_value()); filterRowGroups(); if (!rowGroupIds_.empty()) { diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index 6150535839caf..44887827ebf07 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -536,6 +536,11 @@ TEST_F(ParquetTableScanTest, rowIndex) { {"_tmp_metadata_row_index", "a"}, assignments, "SELECT _tmp_metadata_row_index, a FROM tmp"); + assertSelectWithAssignments( + {"_tmp_metadata_row_index"}, + assignments, + "SELECT _tmp_metadata_row_index FROM tmp"); + // case 2: file has `_tmp_metadata_row_index` column, then use user data // insteads of generating it. loadData(