Skip to content

Commit

Permalink
Parquet LazyVector support (facebookincubator#11010)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#11010

Parquet reader was not generating `LazyVector` at all.  Fix this and
address the bugs along the way:

1. Fix the `rowIndex` passed to hook by considering `numValuesBias_`.
2. Add the missing `setNumValues` call in Parquet `StringDecoder`.
3. Fix the Parquet string dictionary column visitor call and reuse the same code
   in DWRF string dictionary column reader.
4. Fix write over boundary in `VectorLoader::load`.

Fix facebookincubator#9563

Differential Revision: D62724551
  • Loading branch information
Yuhta authored and facebook-github-bot committed Sep 16, 2024
1 parent af2513b commit e72cc33
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 204 deletions.
9 changes: 5 additions & 4 deletions velox/common/base/RawVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ bool initializeIota() {
}
} // namespace

const int32_t* iota(int32_t size, raw_vector<int32_t>& storage) {
if (iotaData.size() < size) {
const int32_t*
iota(int32_t size, raw_vector<int32_t>& storage, int32_t offset) {
if (iotaData.size() < offset + size) {
storage.resize(size);
std::iota(&storage[0], &storage[storage.size()], 0);
std::iota(storage.begin(), storage.end(), offset);
return storage.data();
}
return iotaData.data();
return iotaData.data() + offset;
}

static bool FB_ANONYMOUS_VARIABLE(g_iotaConstants) = initializeIota();
Expand Down
3 changes: 2 additions & 1 deletion velox/common/base/RawVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class raw_vector {
// SIMD width. Typically returns preallocated memory but if this is
// not large enough,resizes and initializes 'storage' to the requested
// size and returns storage.data().
const int32_t* iota(int32_t size, raw_vector<int32_t>& storage);
const int32_t*
iota(int32_t size, raw_vector<int32_t>& storage, int32_t offset = 0);

} // namespace facebook::velox
93 changes: 20 additions & 73 deletions velox/dwio/common/ColumnVisitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ class ColumnVisitor {
return reader_->mutableOutputRows(size);
}

int32_t numValuesBias() const {
return numValuesBias_;
}

void setNumValuesBias(int32_t bias) {
numValuesBias_ = bias;
}
Expand Down Expand Up @@ -515,12 +519,12 @@ ColumnVisitor<T, TFilter, ExtractValues, isDense>::filterFailed() {
template <typename T, typename TFilter, typename ExtractValues, bool isDense>
inline void ColumnVisitor<T, TFilter, ExtractValues, isDense>::addResult(
T value) {
values_.addValue(rowIndex_, value);
values_.addValue(rowIndex_ + numValuesBias_, value);
}

template <typename T, typename TFilter, typename ExtractValues, bool isDense>
inline void ColumnVisitor<T, TFilter, ExtractValues, isDense>::addNull() {
values_.template addNull<T>(rowIndex_);
values_.template addNull<T>(rowIndex_ + numValuesBias_);
}

template <typename T, typename TFilter, typename ExtractValues, bool isDense>
Expand Down Expand Up @@ -819,7 +823,10 @@ class DictionaryColumnVisitor
translateByDict(input, numInput, values);
super::values_.hook().addValues(
scatter ? scatterRows + super::rowIndex_
: velox::iota(super::numRows_, super::innerNonNullRows()) +
: velox::iota(
super::numRows_,
super::innerNonNullRows(),
super::numValuesBias_) +
super::rowIndex_,
values,
numInput);
Expand Down Expand Up @@ -1174,7 +1181,7 @@ class StringDictionaryColumnVisitor
super::filterFailed();
} else {
if (velox::common::applyFilter(
super::filter_, valueInDictionary(value, inStrideDict))) {
super::filter_, valueInDictionary(index))) {
super::filterPassed(index);
if (TFilter::deterministic) {
DictSuper::filterCache()[index] = FilterResult::kSuccess;
Expand Down Expand Up @@ -1217,11 +1224,10 @@ class StringDictionaryColumnVisitor
if constexpr (!DictSuper::hasFilter()) {
if (hasHook) {
for (auto i = 0; i < numInput; ++i) {
auto value = input[i];
super::values_.addValue(
scatterRows ? scatterRows[super::rowIndex_ + i]
: super::rowIndex_ + i,
value);
valueInDictionary(input[i]));
}
}
if constexpr (std::is_same_v<TFilter, velox::common::IsNotNull>) {
Expand Down Expand Up @@ -1266,16 +1272,7 @@ class StringDictionaryColumnVisitor
while (bits) {
int index = bits::getAndClearLastSetBit(bits);
int32_t value = input[i + index];
bool result;
if (value >= DictSuper::dictionarySize()) {
result = applyFilter(
super::filter_,
valueInDictionary(value - DictSuper::dictionarySize(), true));
} else {
result =
applyFilter(super::filter_, valueInDictionary(value, false));
}
if (result) {
if (applyFilter(super::filter_, valueInDictionary(value))) {
DictSuper::filterCache()[value] = FilterResult::kSuccess;
passed |= 1 << index;
} else {
Expand Down Expand Up @@ -1355,65 +1352,15 @@ class StringDictionaryColumnVisitor
}
}

folly::StringPiece valueInDictionary(int64_t index, bool inStrideDict) {
if (inStrideDict) {
return folly::StringPiece(reinterpret_cast<const StringView*>(
DictSuper::state_.dictionary2.values)[index]);
}
return folly::StringPiece(reinterpret_cast<const StringView*>(
DictSuper::state_.dictionary.values)[index]);
}
};

class ExtractStringDictionaryToGenericHook {
public:
static constexpr bool kSkipNulls = true;
using HookType = ValueHook;

ExtractStringDictionaryToGenericHook(
ValueHook* hook,
RowSet rows,
RawScanState state)

: hook_(hook), rows_(rows), state_(state) {}

bool acceptsNulls() {
return hook_->acceptsNulls();
}

template <typename T>
void addNull(vector_size_t rowIndex) {
hook_->addNull(rowIndex);
}

void addValue(vector_size_t rowIndex, int32_t value) {
// We take the string from the stripe or stride dictionary
// according to the index. Stride dictionary indices are offset up
// by the stripe dict size.
if (value < dictionarySize()) {
auto* strings =
reinterpret_cast<const StringView*>(state_.dictionary.values);
hook_->addValue(rowIndex, strings[value]);
} else {
VELOX_DCHECK(state_.inDictionary);
auto* strings =
reinterpret_cast<const StringView*>(state_.dictionary2.values);
hook_->addValue(rowIndex, strings[value - dictionarySize()]);
folly::StringPiece valueInDictionary(int64_t index) {
auto stripeDictSize = DictSuper::state_.dictionary.numValues;
if (index < stripeDictSize) {
return reinterpret_cast<const StringView*>(
DictSuper::state_.dictionary.values)[index];
}
return reinterpret_cast<const StringView*>(
DictSuper::state_.dictionary2.values)[index - stripeDictSize];
}

ValueHook& hook() {
return *hook_;
}

private:
int32_t dictionarySize() const {
return state_.dictionary.numValues;
}

ValueHook* const hook_;
RowSet const rows_;
RawScanState state_;
};

template <typename T, typename TFilter, typename ExtractValues, bool isDense>
Expand Down
7 changes: 3 additions & 4 deletions velox/dwio/common/DecoderUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,9 @@ void fixedWidthScan(
}
if (!hasFilter) {
if (hasHook) {
hook.addValues(
scatterRows + rowIndex,
reinterpret_cast<T*>(&values),
width);
T values2[values.size];
values.store_unaligned(values2);
hook.addValues(scatterRows + rowIndex, values2, width);
} else {
if (scatter) {
scatterDense<T>(
Expand Down
15 changes: 13 additions & 2 deletions velox/dwio/common/DirectDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ class DirectDecoder : public IntDecoder<isSigned> {
return;
}
}
if (hasHook && visitor.numValuesBias() > 0) {
for (auto& row : *outerVector) {
row += visitor.numValuesBias();
}
}
if (super::useVInts_) {
if (Visitor::dense) {
super::bulkRead(numNonNull, data);
Expand Down Expand Up @@ -244,7 +249,10 @@ class DirectDecoder : public IntDecoder<isSigned> {
rowsAsRange,
0,
rowsAsRange.size(),
hasHook ? velox::iota(numRows, visitor.innerNonNullRows())
hasHook ? velox::iota(
numRows,
visitor.innerNonNullRows(),
visitor.numValuesBias())
: nullptr,
visitor.rawValues(numRows),
hasFilter ? visitor.outputRows(numRows) : nullptr,
Expand All @@ -254,7 +262,10 @@ class DirectDecoder : public IntDecoder<isSigned> {
} else {
dwio::common::fixedWidthScan<T, filterOnly, false>(
rowsAsRange,
hasHook ? velox::iota(numRows, visitor.innerNonNullRows())
hasHook ? velox::iota(
numRows,
visitor.innerNonNullRows(),
visitor.numValuesBias())
: nullptr,
visitor.rawValues(numRows),
hasFilter ? visitor.outputRows(numRows) : nullptr,
Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/common/tests/utils/E2EFilterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class E2EFilterTestBase : public testing::Test {

static bool typeKindSupportsValueHook(TypeKind kind) {
return kind != TypeKind::TIMESTAMP && kind != TypeKind::ARRAY &&
kind != TypeKind::ROW && kind != TypeKind::MAP;
kind != TypeKind::ROW && kind != TypeKind::MAP &&
kind != TypeKind::HUGEINT;
}

std::vector<RowVectorPtr> makeDataset(
Expand Down Expand Up @@ -257,7 +258,7 @@ class E2EFilterTestBase : public testing::Test {
for (int32_t i = 0; i < 5 && i < batch->size(); ++i) {
rows.push_back(i);
}
for (int32_t i = 5; i < 5 && i < batch->size(); i += 2) {
for (int32_t i = 5; i < batch->size(); i += 2) {
rows.push_back(i);
}
auto result = std::static_pointer_cast<FlatVector<T>>(
Expand Down
35 changes: 4 additions & 31 deletions velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,37 +228,10 @@ void SelectiveStringDictionaryColumnReader::read(
loadStrideDictionary();
}

if (scanSpec_->keepValues()) {
if (scanSpec_->valueHook()) {
if (isDense) {
readHelper<common::AlwaysTrue, true>(
&alwaysTrue(),
rows,
ExtractStringDictionaryToGenericHook(
scanSpec_->valueHook(), rows, scanState_.rawState));
} else {
readHelper<common::AlwaysTrue, false>(
&alwaysTrue(),
rows,
ExtractStringDictionaryToGenericHook(
scanSpec_->valueHook(), rows, scanState_.rawState));
}
} else {
if (isDense) {
processFilter<true>(scanSpec_->filter(), rows, ExtractToReader(this));
} else {
processFilter<false>(scanSpec_->filter(), rows, ExtractToReader(this));
}
}
} else {
if (isDense) {
processFilter<true>(
scanSpec_->filter(), rows, dwio::common::DropValues());
} else {
processFilter<false>(
scanSpec_->filter(), rows, dwio::common::DropValues());
}
}
dwio::common::StringColumnReadWithVisitorHelper<true>(
*this, rows)([&](auto visitor) {
readWithVisitor(visitor.toStringDictionaryColumnVisitor());
});

readOffset_ += rows.back() + 1;
numRowsScanned_ = readOffset_ - offset;
Expand Down
76 changes: 2 additions & 74 deletions velox/dwio/dwrf/reader/SelectiveStringDictionaryColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,7 @@ class SelectiveStringDictionaryColumnReader
void makeDictionaryBaseVector();

template <typename TVisitor>
void readWithVisitor(const RowSet& rows, TVisitor visitor);

template <typename TFilter, bool isDense, typename ExtractValues>
void
readHelper(common::Filter* filter, const RowSet& rows, ExtractValues values);

template <bool isDense, typename ExtractValues>
void processFilter(
common::Filter* filter,
const RowSet& rows,
ExtractValues extractValues);
void readWithVisitor(TVisitor visitor);

// Fills 'values' from 'data' and 'lengthDecoder'. The count of
// values is in 'values.numValues'.
Expand Down Expand Up @@ -118,9 +108,7 @@ class SelectiveStringDictionaryColumnReader
};

template <typename TVisitor>
void SelectiveStringDictionaryColumnReader::readWithVisitor(
const RowSet& /*rows*/,
TVisitor visitor) {
void SelectiveStringDictionaryColumnReader::readWithVisitor(TVisitor visitor) {
if (version_ == velox::dwrf::RleVersion_1) {
decodeWithVisitor<velox::dwrf::RleDecoderV1<false>>(
dictIndex_.get(), visitor);
Expand All @@ -130,64 +118,4 @@ void SelectiveStringDictionaryColumnReader::readWithVisitor(
}
}

template <typename TFilter, bool isDense, typename ExtractValues>
void SelectiveStringDictionaryColumnReader::readHelper(
common::Filter* filter,
const RowSet& rows,
ExtractValues values) {
readWithVisitor(
rows,
dwio::common::
StringDictionaryColumnVisitor<TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter), this, rows, values));
}

template <bool isDense, typename ExtractValues>
void SelectiveStringDictionaryColumnReader::processFilter(
common::Filter* filter,
const RowSet& rows,
ExtractValues extractValues) {
if (filter == nullptr) {
readHelper<common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(), rows, extractValues);
return;
}

switch (filter->kind()) {
case common::FilterKind::kAlwaysTrue:
readHelper<common::AlwaysTrue, isDense>(filter, rows, extractValues);
break;
case common::FilterKind::kIsNull:
filterNulls<int32_t>(
rows,
true,
!std::is_same_v<decltype(extractValues), dwio::common::DropValues>);
break;
case common::FilterKind::kIsNotNull:
if (std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<int32_t>(rows, false, false);
} else {
readHelper<common::IsNotNull, isDense>(filter, rows, extractValues);
}
break;
case common::FilterKind::kBytesRange:
readHelper<common::BytesRange, isDense>(filter, rows, extractValues);
break;
case common::FilterKind::kNegatedBytesRange:
readHelper<common::NegatedBytesRange, isDense>(
filter, rows, extractValues);
break;
case common::FilterKind::kBytesValues:
readHelper<common::BytesValues, isDense>(filter, rows, extractValues);
break;
case common::FilterKind::kNegatedBytesValues:
readHelper<common::NegatedBytesValues, isDense>(
filter, rows, extractValues);
break;
default:
readHelper<common::Filter, isDense>(filter, rows, extractValues);
break;
}
}

} // namespace facebook::velox::dwrf
1 change: 1 addition & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ class ParquetRowReader::Impl {
*options_.scanSpec());
columnReader_->setFillMutatedOutputRows(
options_.rowNumberColumnInfo().has_value());
columnReader_->setIsTopLevel();

filterRowGroups();
if (!rowGroupIds_.empty()) {
Expand Down
Loading

0 comments on commit e72cc33

Please sign in to comment.