Skip to content

Commit

Permalink
Selective flat map column reader (facebookincubator#9790)
Browse files Browse the repository at this point in the history
Summary:
X-link: facebookincubator/nimble#55

Pull Request resolved: facebookincubator#9790

- Refactor existing DWRF selective flat map reader code for reusing
- Add Nimble selective flat map column reader
- Optimize `ChunkedBoolsDecoder::next` (showing up >15% CPU in query)
- Reuse reader nulls when possible in array/map/flatmap column readers
- Add `estimatedRowSize` using encoding information

bypass-github-export-checks

Reviewed By: oerling

Differential Revision: D57276011

fbshipit-source-id: 9bc80a4e79e9091ed1763f2c04a420f2d1f5c5aa
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 15, 2024
1 parent 660f17d commit 92c5911
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 259 deletions.
16 changes: 7 additions & 9 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,15 @@ void SelectiveColumnReader::prepareNulls(
returnReaderNulls_ = false;
if (resultNulls_ && resultNulls_->unique() &&
resultNulls_->capacity() >= bits::nbytes(numRows) + simd::kPadding) {
// Clear whole capacity because future uses could hit
// uncleared data between capacity() and 'numBytes'.
simd::memset(rawResultNulls_, bits::kNotNullByte, resultNulls_->capacity());
anyNulls_ = false;
return;
resultNulls_->setSize(bits::nbytes(numRows));
} else {
resultNulls_ = AlignedBuffer::allocate<bool>(
numRows + (simd::kPadding * 8), &memoryPool_);
rawResultNulls_ = resultNulls_->asMutable<uint64_t>();
}

anyNulls_ = false;
resultNulls_ = AlignedBuffer::allocate<bool>(
numRows + (simd::kPadding * 8), &memoryPool_);
rawResultNulls_ = resultNulls_->asMutable<uint64_t>();
// Clear whole capacity because future uses could hit uncleared data between
// capacity() and 'numBytes'.
simd::memset(rawResultNulls_, bits::kNotNullByte, resultNulls_->capacity());
}

Expand Down
9 changes: 8 additions & 1 deletion velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class SelectiveColumnReader {
// discard nulls. This is used at read prepare time. useFastPath()
// in DecoderUtil.h is used at read time and is expected to produce
// the same result.
virtual bool useBulkPath() const {
bool useBulkPath() const {
auto filter = scanSpec_->filter();
return hasBulkPath() && process::hasAvx2() &&
(!filter ||
Expand Down Expand Up @@ -445,6 +445,13 @@ class SelectiveColumnReader {
template <typename T>
void filterNulls(RowSet rows, bool isNull, bool extractValues);

// Temporary method for estimate in-memory row size (number of bits) of this
// column for Nimble. Will be removed once column statistics are added for
// Nimble.
virtual std::optional<size_t> estimatedRowBitSize() const {
return std::nullopt;
}

protected:
template <typename T>
void
Expand Down
17 changes: 5 additions & 12 deletions velox/dwio/common/SelectiveRepeatedColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ void SelectiveRepeatedColumnReader::makeOffsetsAndSizes(
rawOffsets[i] = nestedRowIndex;
if (nulls && bits::isBitNull(nulls, row)) {
rawSizes[i] = 0;
bits::setNull(rawResultNulls_, i);
if (!returnReaderNulls_) {
bits::setNull(rawResultNulls_, i);
}
anyNulls_ = true;
} else {
currentOffset += allLengths_[row];
Expand Down Expand Up @@ -186,15 +188,6 @@ RowSet SelectiveRepeatedColumnReader::applyFilter(RowSet rows) {
return outputRows_;
}

void SelectiveRepeatedColumnReader::setResultNulls(BaseVector& result) {
if (anyNulls_) {
resultNulls_->setSize(bits::nbytes(result.size()));
result.setNulls(resultNulls_);
} else {
result.resetNulls();
}
}

SelectiveListColumnReader::SelectiveListColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const std::shared_ptr<const dwio::common::TypeWithId>& fileType,
Expand Down Expand Up @@ -251,7 +244,7 @@ void SelectiveListColumnReader::getValues(RowSet rows, VectorPtr* result) {
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
auto* resultArray = result->get()->asUnchecked<ArrayVector>();
makeOffsetsAndSizes(rows, *resultArray);
setResultNulls(**result);
result->get()->setNulls(resultNulls());
if (child_ && !nestedRows_.empty()) {
auto& elements = resultArray->elements();
prepareStructResult(requestedType_->type()->childAt(0), &elements);
Expand Down Expand Up @@ -332,7 +325,7 @@ void SelectiveMapColumnReader::getValues(RowSet rows, VectorPtr* result) {
prepareResult(*result, requestedType_->type(), rows.size(), &memoryPool_);
auto* resultMap = result->get()->asUnchecked<MapVector>();
makeOffsetsAndSizes(rows, *resultMap);
setResultNulls(**result);
result->get()->setNulls(resultNulls());
VELOX_CHECK(
keyReader_ && elementReader_,
"keyReader_ and elementReaer_ must exist in "
Expand Down
6 changes: 0 additions & 6 deletions velox/dwio/common/SelectiveRepeatedColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ namespace facebook::velox::dwio::common {
// logic for dealing with mapping between enclosing and nested rows.
class SelectiveRepeatedColumnReader : public SelectiveColumnReader {
public:
bool useBulkPath() const override {
return false;
}

const std::vector<SelectiveColumnReader*>& children() const override {
return children_;
}
Expand Down Expand Up @@ -73,8 +69,6 @@ class SelectiveRepeatedColumnReader : public SelectiveColumnReader {
// in subclasses.
RowSet applyFilter(RowSet rows);

void setResultNulls(BaseVector& result);

BufferPtr allLengthsHolder_;
vector_size_t* allLengths_;
RowSet nestedRows_;
Expand Down
266 changes: 266 additions & 0 deletions velox/dwio/common/SelectiveStructColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

namespace facebook::velox::dwio::common {

template <typename T, typename KeyNode, typename FormatData>
class SelectiveFlatMapColumnReaderHelper;

class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
public:
void resetFilterCaches() override {
Expand Down Expand Up @@ -96,6 +99,9 @@ class SelectiveStructColumnReaderBase : public SelectiveColumnReader {
}

protected:
template <typename T, typename KeyNode, typename FormatData>
friend class SelectiveFlatMapColumnReaderHelper;

// The subscript of childSpecs will be set to this value if the column is
// constant (either explicitly or because it's missing).
static constexpr int32_t kConstantChildSpecSubscript = -1;
Expand Down Expand Up @@ -171,4 +177,264 @@ struct SelectiveStructColumnReader : SelectiveStructColumnReaderBase {
std::vector<std::unique_ptr<SelectiveColumnReader>> childrenOwned_;
};

template <typename T, typename KeyNode, typename FormatData>
class SelectiveFlatMapColumnReaderHelper {
public:
SelectiveFlatMapColumnReaderHelper(
SelectiveStructColumnReaderBase& reader,
std::vector<KeyNode>&& keyNodes)
: reader_(reader), keyNodes_(std::move(keyNodes)) {
reader_.children_.resize(keyNodes_.size());
for (int i = 0; i < keyNodes_.size(); ++i) {
reader_.children_[i] = keyNodes_[i].reader.get();
reader_.children_[i]->setIsFlatMapValue(true);
}
if (auto type = reader_.requestedType_->type()->childAt(1); type->isRow()) {
childValues_ = BaseVector::create(type, 0, &reader_.memoryPool_);
}
}

void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls);

void getValues(RowSet rows, VectorPtr* result);

private:
MapVector& prepareResult(VectorPtr& result, vector_size_t size) {
if (result && result->encoding() == VectorEncoding::Simple::MAP &&
result.unique()) {
result->resetDataDependentFlags(nullptr);
result->resize(size);
} else {
VLOG(1) << "Reallocating result MAP vector of size " << size;
result = BaseVector::create(
reader_.requestedType_->type(), size, &reader_.memoryPool_);
}
return *result->asUnchecked<MapVector>();
}

vector_size_t
calculateOffsets(RowSet rows, vector_size_t* offsets, vector_size_t* sizes);

template <TypeKind kKind>
void copyValues(
RowSet rows,
FlatVector<T>* flatKeys,
vector_size_t* rawOffsets,
BaseVector& values);

SelectiveStructColumnReaderBase& reader_;
std::vector<KeyNode> keyNodes_;
VectorPtr childValues_;
DecodedVector decodedChildValues_;
std::vector<const uint64_t*> inMaps_;
std::vector<uint64_t> columnRowBits_;
std::vector<BaseVector::CopyRange> copyRanges_;
};

template <typename T, typename KeyNode, typename FormatData>
void SelectiveFlatMapColumnReaderHelper<T, KeyNode, FormatData>::read(
vector_size_t offset,
RowSet rows,
const uint64_t* incomingNulls) {
reader_.numReads_ = reader_.scanSpec_->newRead();
reader_.prepareRead<char>(offset, rows, incomingNulls);
VELOX_DCHECK(!reader_.hasMutation());
auto activeRows = rows;
auto* mapNulls = reader_.nullsInReadRange_
? reader_.nullsInReadRange_->as<uint64_t>()
: nullptr;
if (reader_.scanSpec_->filter()) {
auto kind = reader_.scanSpec_->filter()->kind();
VELOX_CHECK(
kind == velox::common::FilterKind::kIsNull ||
kind == velox::common::FilterKind::kIsNotNull);
reader_.filterNulls<int32_t>(
rows, kind == velox::common::FilterKind::kIsNull, false);
if (reader_.outputRows_.empty()) {
for (auto* child : reader_.children_) {
child->addParentNulls(offset, mapNulls, rows);
}
return;
}
activeRows = reader_.outputRows_;
}
// Separate the loop to be cache friendly.
for (auto* child : reader_.children_) {
reader_.advanceFieldReader(child, offset);
}
for (auto* child : reader_.children_) {
child->read(offset, activeRows, mapNulls);
child->addParentNulls(offset, mapNulls, rows);
}
reader_.lazyVectorReadOffset_ = offset;
reader_.readOffset_ = offset + rows.back() + 1;
}

template <typename T, typename KeyNode, typename FormatData>
vector_size_t
SelectiveFlatMapColumnReaderHelper<T, KeyNode, FormatData>::calculateOffsets(
RowSet rows,
vector_size_t* offsets,
vector_size_t* sizes) {
auto* nulls = reader_.nullsInReadRange_
? reader_.nullsInReadRange_->as<uint64_t>()
: nullptr;
inMaps_.resize(reader_.children_.size());
for (int k = 0; k < reader_.children_.size(); ++k) {
auto& data =
static_cast<const FormatData&>(reader_.children_[k]->formatData());
inMaps_[k] = data.inMap();
if (!inMaps_[k]) {
inMaps_[k] = nulls;
}
}
columnRowBits_.resize(bits::nwords(reader_.children_.size() * rows.size()));
std::fill(columnRowBits_.begin(), columnRowBits_.end(), 0);
std::fill(sizes, sizes + rows.size(), 0);
for (int k = 0; k < reader_.children_.size(); ++k) {
if (inMaps_[k]) {
for (vector_size_t i = 0; i < rows.size(); ++i) {
if (bits::isBitSet(inMaps_[k], rows[i])) {
bits::setBit(columnRowBits_.data(), i + k * rows.size());
++sizes[i];
}
}
} else {
bits::fillBits(
columnRowBits_.data(), k * rows.size(), (k + 1) * rows.size(), true);
for (vector_size_t i = 0; i < rows.size(); ++i) {
++sizes[i];
}
}
}
vector_size_t numNestedRows = 0;
for (vector_size_t i = 0; i < rows.size(); ++i) {
if (!reader_.returnReaderNulls_ && nulls &&
bits::isBitNull(nulls, rows[i])) {
bits::setNull(reader_.rawResultNulls_, i);
reader_.anyNulls_ = true;
}
offsets[i] = numNestedRows;
numNestedRows += sizes[i];
}
return numNestedRows;
}

template <typename T, typename KeyNode, typename FormatData>
template <TypeKind kKind>
void SelectiveFlatMapColumnReaderHelper<T, KeyNode, FormatData>::copyValues(
RowSet rows,
FlatVector<T>* flatKeys,
vector_size_t* rawOffsets,
BaseVector& values) {
// String values are not copied directly because currently we don't have
// them in production so no need to optimize.
constexpr bool kDirectCopy =
TypeKind::TINYINT <= kKind && kKind <= TypeKind::DOUBLE;
using ValueType = typename TypeTraits<kKind>::NativeType;
T* rawKeys = flatKeys->mutableRawValues();
[[maybe_unused]] size_t strKeySize;
[[maybe_unused]] char* rawStrKeyBuffer;
if constexpr (std::is_same_v<T, StringView>) {
strKeySize = 0;
for (int k = 0; k < reader_.children_.size(); ++k) {
if (!keyNodes_[k].key.get().isInline()) {
strKeySize += keyNodes_[k].key.get().size();
}
}
if (strKeySize > 0) {
auto buf =
AlignedBuffer::allocate<char>(strKeySize, &reader_.memoryPool_);
rawStrKeyBuffer = buf->template asMutable<char>();
flatKeys->addStringBuffer(buf);
strKeySize = 0;
for (int k = 0; k < reader_.children_.size(); ++k) {
auto& s = keyNodes_[k].key.get();
if (!s.isInline()) {
memcpy(&rawStrKeyBuffer[strKeySize], s.data(), s.size());
strKeySize += s.size();
}
}
strKeySize = 0;
}
}
[[maybe_unused]] ValueType* targetValues;
[[maybe_unused]] uint64_t* targetNulls;
if constexpr (kDirectCopy) {
VELOX_CHECK(values.isFlatEncoding());
auto* flat = values.asUnchecked<FlatVector<ValueType>>();
targetValues = flat->mutableRawValues();
targetNulls = flat->mutableRawNulls();
}
for (int k = 0; k < reader_.children_.size(); ++k) {
[[maybe_unused]] StringView strKey;
if constexpr (std::is_same_v<T, StringView>) {
strKey = keyNodes_[k].key.get();
if (!strKey.isInline()) {
strKey = {
&rawStrKeyBuffer[strKeySize], static_cast<int32_t>(strKey.size())};
strKeySize += strKey.size();
}
}
reader_.children_[k]->getValues(rows, &childValues_);
if constexpr (kDirectCopy) {
decodedChildValues_.decode(*childValues_);
}
const auto begin = k * rows.size();
bits::forEachSetBit(
columnRowBits_.data(),
begin,
begin + rows.size(),
[&](vector_size_t i) {
i -= begin;
if constexpr (std::is_same_v<T, StringView>) {
rawKeys[rawOffsets[i]] = strKey;
} else {
rawKeys[rawOffsets[i]] = keyNodes_[k].key.get();
}
if constexpr (kDirectCopy) {
targetValues[rawOffsets[i]] =
decodedChildValues_.valueAt<ValueType>(i);
bits::setNull(
targetNulls, rawOffsets[i], decodedChildValues_.isNullAt(i));
} else {
copyRanges_.push_back({
.sourceIndex = i,
.targetIndex = rawOffsets[i],
.count = 1,
});
}
++rawOffsets[i];
});
if constexpr (!kDirectCopy) {
values.copyRanges(childValues_.get(), copyRanges_);
copyRanges_.clear();
}
}
}

template <typename T, typename KeyNode, typename FormatData>
void SelectiveFlatMapColumnReaderHelper<T, KeyNode, FormatData>::getValues(
RowSet rows,
VectorPtr* result) {
auto& mapResult = prepareResult(*result, rows.size());
auto* rawOffsets = mapResult.mutableOffsets(rows.size())
->template asMutable<vector_size_t>();
auto* rawSizes =
mapResult.mutableSizes(rows.size())->template asMutable<vector_size_t>();
auto numNestedRows = calculateOffsets(rows, rawOffsets, rawSizes);
auto& keys = mapResult.mapKeys();
auto& values = mapResult.mapValues();
BaseVector::prepareForReuse(keys, numNestedRows);
BaseVector::prepareForReuse(values, numNestedRows);
auto* flatKeys = keys->template asFlatVector<T>();
VELOX_DYNAMIC_TYPE_DISPATCH(
copyValues, values->typeKind(), rows, flatKeys, rawOffsets, *values);
VELOX_CHECK_EQ(rawOffsets[rows.size() - 1], numNestedRows);
std::copy_backward(
rawOffsets, rawOffsets + rows.size() - 1, rawOffsets + rows.size());
rawOffsets[0] = 0;
result->get()->setNulls(reader_.resultNulls());
}

} // namespace facebook::velox::dwio::common
Loading

0 comments on commit 92c5911

Please sign in to comment.