Skip to content

Commit

Permalink
Fix row number column when it is the only column projected
Browse files Browse the repository at this point in the history
Summary: Fix facebookincubator#9943

Differential Revision: D57842305
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 27, 2024
1 parent abaf323 commit dbe5969
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 30 deletions.
13 changes: 9 additions & 4 deletions velox/dwio/common/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,16 @@ void RowReader::readWithRowNumber(
std::vector<BufferPtr>());
flatRowNum = rowNumVector->asUnchecked<FlatVector<int64_t>>();
}
auto rowOffsets = columnReader->outputRows();
VELOX_DCHECK_EQ(rowOffsets.size(), result->size());
auto* rawRowNum = flatRowNum->mutableRawValues();
for (int i = 0; i < rowOffsets.size(); ++i) {
rawRowNum[i] = previousRow + rowOffsets[i];
if (numChildren == 0 && !mutation) {
VELOX_DCHECK_EQ(rowsToRead, result->size());
std::iota(rawRowNum, rawRowNum + rowsToRead, previousRow);
} else {
auto rowOffsets = columnReader->outputRows();
VELOX_DCHECK_EQ(rowOffsets.size(), result->size());
for (int i = 0; i < rowOffsets.size(); ++i) {
rawRowNum[i] = previousRow + rowOffsets[i];
}
}
rowVector = result->asUnchecked<RowVector>();
auto& rowType = rowVector->type()->asRow();
Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,14 @@ class SelectiveColumnReader {
return std::nullopt;
}

// Whether output rows should be filled when there is no column projected out
// and there is delete mutation. Used for row number generation. The case
// for no delete mutation is handled more efficiently outside column reader in
// `RowReader::readWithRowNumber'.
virtual void setFillMutatedOutputRows(bool /*value*/) {
VELOX_UNREACHABLE("Only struct reader supports this method");
}

protected:
template <typename T>
void
Expand Down
62 changes: 36 additions & 26 deletions velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,48 @@ uint64_t SelectiveStructColumnReaderBase::skip(uint64_t numValues) {
return numValues;
}

void SelectiveStructColumnReaderBase::fillOutputRowsFromMutation(
vector_size_t size) {
if (mutation_->deletedRows) {
bits::forEachUnsetBit(mutation_->deletedRows, 0, size, [&](auto i) {
if (!mutation_->randomSkip || mutation_->randomSkip->testOne()) {
addOutputRow(i);
}
});
} else {
VELOX_CHECK(mutation_->randomSkip);
vector_size_t i = 0;
while (i < size) {
auto skip = mutation_->randomSkip->nextSkip();
auto remaining = size - i;
if (skip >= remaining) {
mutation_->randomSkip->consume(remaining);
break;
}
i += skip;
addOutputRow(i++);
mutation_->randomSkip->consume(skip + 1);
}
}
}

void SelectiveStructColumnReaderBase::next(
uint64_t numValues,
VectorPtr& result,
const Mutation* mutation) {
process::TraceContext trace("SelectiveStructColumnReaderBase::next");
if (children_.empty()) {
if (mutation) {
if (mutation->deletedRows) {
numValues -= bits::countBits(mutation->deletedRows, 0, numValues);
}
if (mutation->randomSkip) {
numValues *= mutation->randomSkip->sampleRate();
if (fillMutatedOutputRows_) {
fillOutputRowsFromMutation(numValues);
numValues = outputRows_.size();
} else {
if (mutation->deletedRows) {
numValues -= bits::countBits(mutation->deletedRows, 0, numValues);
}
if (mutation->randomSkip) {
numValues *= mutation->randomSkip->sampleRate();
}
}
}

Expand Down Expand Up @@ -110,27 +140,7 @@ void SelectiveStructColumnReaderBase::read(
VELOX_DCHECK(!nullsInReadRange_, "Only top level can have mutation");
VELOX_DCHECK_EQ(
rows.back(), rows.size() - 1, "Top level should have a dense row set");
if (mutation_->deletedRows) {
bits::forEachUnsetBit(
mutation_->deletedRows, 0, rows.back() + 1, [&](auto i) {
if (!mutation_->randomSkip || mutation_->randomSkip->testOne()) {
addOutputRow(i);
}
});
} else {
VELOX_CHECK(mutation_->randomSkip);
vector_size_t i = 0;
while (i <= rows.back()) {
auto skip = mutation_->randomSkip->nextSkip();
if (skip > rows.back() - i) {
mutation_->randomSkip->consume(rows.back() - i + 1);
break;
}
i += skip;
addOutputRow(i++);
mutation_->randomSkip->consume(skip + 1);
}
}
fillOutputRowsFromMutation(rows.size());
if (outputRows_.empty()) {
readOffset_ = offset + rows.back() + 1;
return;
Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
return debugString_;
}

void setFillMutatedOutputRows(bool value) final {
fillMutatedOutputRows_ = value;
}

protected:
template <typename T, typename KeyNode, typename FormatData>
friend class SelectiveFlatMapColumnReaderHelper;
Expand Down Expand Up @@ -132,6 +136,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
// need to read it).
bool isChildConstant(const velox::common::ScanSpec& childSpec) const;

void fillOutputRowsFromMutation(vector_size_t size);

const std::shared_ptr<const dwio::common::TypeWithId> requestedType_;

std::vector<SelectiveColumnReader*> children_;
Expand All @@ -151,6 +157,8 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
// around for lazy columns.
bool hasMutation_ = false;

bool fillMutatedOutputRows_ = false;

// Context information obtained from ExceptionContext. Stored here
// so that LazyVector readers under this can add this to their
// ExceptionContext. Allows contextualizing reader errors to split
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/dwrf/reader/DwrfReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ void DwrfUnit::ensureDecoders() {
flatMapContext,
true); // isRoot
selectiveColumnReader_->setIsTopLevel();
selectiveColumnReader_->setFillMutatedOutputRows(
options_.getRowNumberColumnInfo().has_value());
} else {
columnReader_ = ColumnReader::build( // enqueue streams
requestedType,
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,8 @@ class ParquetRowReader::Impl {
readerBase_->schemaWithId(), // Id is schema id
params,
*options_.getScanSpec());
columnReader_->setFillMutatedOutputRows(
options_.getRowNumberColumnInfo().has_value());

filterRowGroups();
if (!rowGroupIds_.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ TEST_F(ParquetTableScanTest, rowIndex) {
{"_tmp_metadata_row_index", "a"},
assignments,
"SELECT _tmp_metadata_row_index, a FROM tmp");
assertSelectWithAssignments(
{"_tmp_metadata_row_index"},
assignments,
"SELECT _tmp_metadata_row_index FROM tmp");

// case 2: file has `_tmp_metadata_row_index` column, then use user data
// insteads of generating it.
loadData(
Expand Down

0 comments on commit dbe5969

Please sign in to comment.