Skip to content

Commit

Permalink
Optimize DWRF flatmap reader (facebookincubator#9486)
Browse files Browse the repository at this point in the history
Summary:

1. Avoid copying and replicating value scan spec
2. Access inMap and child values sequentially one at a time to avoid cache thrashing
3. Reuse column reader `nullsInReadRange`, `values_`, child value vector, and result vector
4. Use thread local instead of atomic for `MmapAllocator::numMallocBytes_`

Differential Revision: D56124127
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 15, 2024
1 parent d796cfc commit 36a9fb8
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 440 deletions.
4 changes: 2 additions & 2 deletions velox/common/memory/MmapAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ void* MmapAllocator::allocateBytesWithoutRetry(
VELOX_MEM_LOG(ERROR) << "Failed to allocateBytes " << bytes
<< " bytes with " << alignment << " alignment";
} else {
numMallocBytes_.fetch_add(bytes);
numMallocBytes_ += bytes;
}
return result;
}
Expand Down Expand Up @@ -472,7 +472,7 @@ void* MmapAllocator::allocateBytesWithoutRetry(
void MmapAllocator::freeBytes(void* p, uint64_t bytes) noexcept {
if (useMalloc(bytes)) {
::free(p); // NOLINT
numMallocBytes_.fetch_sub(bytes);
numMallocBytes_ -= bytes;
return;
}

Expand Down
8 changes: 5 additions & 3 deletions velox/common/memory/MmapAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <mutex>
#include <unordered_set>

#include <folly/ThreadCachedInt.h>

#include "velox/common/base/SimdUtil.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MemoryPool.h"
Expand Down Expand Up @@ -130,7 +132,7 @@ class MmapAllocator : public MemoryAllocator {
}

size_t totalUsedBytes() const override {
return numMallocBytes_ + AllocationTraits::pageBytes(numAllocated_);
return numMallocBytes() + AllocationTraits::pageBytes(numAllocated_);
}

MachinePageCount numAllocated() const override {
Expand All @@ -146,7 +148,7 @@ class MmapAllocator : public MemoryAllocator {
}

uint64_t numMallocBytes() const {
return numMallocBytes_;
return numMallocBytes_.readFull();
}

Stats stats() const override {
Expand Down Expand Up @@ -410,7 +412,7 @@ class MmapAllocator : public MemoryAllocator {
std::atomic<uint64_t> numAllocations_ = 0;
std::atomic<uint64_t> numAllocatedPages_ = 0;
std::atomic<uint64_t> numAdvisedPages_ = 0;
std::atomic<uint64_t> numMallocBytes_ = 0;
folly::ThreadCachedInt<int64_t, MmapAllocator> numMallocBytes_;

// Allocations that are larger than largest size classes will be delegated to
// ManagedMmapArenas, to avoid calling mmap on every allocation.
Expand Down
12 changes: 0 additions & 12 deletions velox/dwio/common/ScanSpec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,6 @@ std::string ScanSpec::toString() const {
return out.str();
}

std::shared_ptr<ScanSpec> ScanSpec::removeChild(const ScanSpec* child) {
for (auto it = children_.begin(); it != children_.end(); ++it) {
if (it->get() == child) {
auto removed = std::move(*it);
children_.erase(it);
childByFieldName_.erase(removed->fieldName());
return removed;
}
}
return nullptr;
}

void ScanSpec::addFilter(const Filter& filter) {
filter_ = filter_ ? filter_->mergeWith(&filter) : filter.clone();
}
Expand Down
4 changes: 0 additions & 4 deletions velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ class ScanSpec {
return it->second;
}

// Remove a child from this scan spec, returning the removed child. This is
// used for example to transform a flatmap scan spec into a struct scan spec.
std::shared_ptr<ScanSpec> removeChild(const ScanSpec* child);

SelectivityInfo& selectivity() {
return selectivity_;
}
Expand Down
12 changes: 0 additions & 12 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ const std::vector<SelectiveColumnReader*>& SelectiveColumnReader::children()
}

void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) {
VELOX_TRACE_HISTORY_PUSH("seekTo %d %d", offset, readsNullsOnly);
if (offset == readOffset_) {
return;
}
Expand Down Expand Up @@ -393,17 +392,6 @@ void SelectiveColumnReader::addStringValue(folly::StringPiece value) {
StringView(copy, value.size());
}

bool SelectiveColumnReader::readsNullsOnly() const {
auto filter = scanSpec_->filter();
if (filter) {
auto kind = filter->kind();
return kind == velox::common::FilterKind::kIsNull ||
(!scanSpec_->keepValues() &&
kind == velox::common::FilterKind::kIsNotNull);
}
return false;
}

void SelectiveColumnReader::setNulls(BufferPtr resultNulls) {
resultNulls_ = resultNulls;
rawResultNulls_ = resultNulls ? resultNulls->asMutable<uint64_t>() : nullptr;
Expand Down
16 changes: 10 additions & 6 deletions velox/dwio/common/SelectiveColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,21 +430,20 @@ class SelectiveColumnReader {
static constexpr int8_t kNoValueSize = -1;
static constexpr uint32_t kRowGroupNotSet = ~0;

// True if we have an is null filter and optionally return column
// values or we have an is not null filter and do not return column
// values. This means that only null flags need be accessed.
bool readsNullsOnly() const;

template <typename T>
void ensureValuesCapacity(vector_size_t numRows);

// Prepares the result buffer for nulls for reading 'rows'. Leaves
// 'extraSpace' bits worth of space in the nulls buffer.
void prepareNulls(RowSet rows, bool hasNulls, int32_t extraRows = 0);

void setFlatMapValue(bool value) {
flatMapValue_ = value;
}

protected:
// Filters 'rows' according to 'is_null'. Only applies to cases where
// readsNullsOnly() is true.
// scanSpec_->readsNullsOnly() is true.
template <typename T>
void filterNulls(RowSet rows, bool isNull, bool extractValues);

Expand Down Expand Up @@ -638,6 +637,11 @@ class SelectiveColumnReader {

// Encoding-related state to keep between reads, e.g. dictionaries.
ScanState scanState_;

bool flatMapValue_ = false;
BufferPtr flatMapValueNullsInReadRange_;
VectorPtr flatMapValueFlatValues_;
VectorPtr flatMapValueConstantNullValues_;
};

template <>
Expand Down
71 changes: 51 additions & 20 deletions velox/dwio/common/SelectiveColumnReaderInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Timer {

template <typename T>
void SelectiveColumnReader::ensureValuesCapacity(vector_size_t numRows) {
if (values_ && values_->unique() &&
if (values_ && (flatMapValue_ || values_->unique()) &&
values_->capacity() >=
BaseVector::byteSize<T>(numRows) + simd::kPadding) {
return;
Expand All @@ -62,15 +62,22 @@ void SelectiveColumnReader::prepareRead(
vector_size_t offset,
RowSet rows,
const uint64_t* incomingNulls) {
seekTo(offset, scanSpec_->readsNullsOnly());
const bool readsNullsOnly = scanSpec_->readsNullsOnly();
seekTo(offset, readsNullsOnly);
vector_size_t numRows = rows.back() + 1;

// Do not re-use unless singly-referenced.
if (nullsInReadRange_ && !nullsInReadRange_->unique()) {
if (flatMapValue_) {
if (!nullsInReadRange_) {
nullsInReadRange_ = std::move(flatMapValueNullsInReadRange_);
}
} else if (nullsInReadRange_ && !nullsInReadRange_->unique()) {
nullsInReadRange_.reset();
}
formatData_->readNulls(
numRows, incomingNulls, nullsInReadRange_, readsNullsOnly());
numRows, incomingNulls, nullsInReadRange_, readsNullsOnly);
if (flatMapValue_ && nullsInReadRange_) {
flatMapValueNullsInReadRange_ = nullsInReadRange_;
}
// We check for all nulls and no nulls. We expect both calls to
// bits::isAllSet to fail early in the common case. We could do a
// single traversal of null bits counting the bits and then compare
Expand Down Expand Up @@ -116,14 +123,19 @@ void SelectiveColumnReader::getFlatValues(
mayGetValues_ = false;
}
if (allNull_) {
*result = std::make_shared<ConstantVector<TVector>>(
&memoryPool_,
rows.size(),
true,
type,
T(),
SimpleVectorStats<TVector>{},
sizeof(TVector) * rows.size());
if (flatMapValue_) {
if (flatMapValueConstantNullValues_) {
flatMapValueConstantNullValues_->resize(rows.size());
} else {
flatMapValueConstantNullValues_ =
std::make_shared<ConstantVector<TVector>>(
&memoryPool_, rows.size(), true, type, T());
}
*result = flatMapValueConstantNullValues_;
} else {
*result = std::make_shared<ConstantVector<TVector>>(
&memoryPool_, rows.size(), true, type, T());
}
return;
}
if (valueSize_ == sizeof(TVector)) {
Expand All @@ -134,13 +146,32 @@ void SelectiveColumnReader::getFlatValues(
upcastScalarValues<T, TVector>(rows);
}
valueSize_ = sizeof(TVector);
*result = std::make_shared<FlatVector<TVector>>(
&memoryPool_,
type,
resultNulls(),
numValues_,
values_,
std::move(stringBuffers_));
if (flatMapValue_) {
if (flatMapValueFlatValues_) {
auto* flat = flatMapValueFlatValues_->asUnchecked<FlatVector<TVector>>();
flat->unsafeSetSize(numValues_);
flat->setNulls(resultNulls());
flat->unsafeSetValues(values_);
flat->setStringBuffers(std::move(stringBuffers_));
} else {
flatMapValueFlatValues_ = std::make_shared<FlatVector<TVector>>(
&memoryPool_,
type,
resultNulls(),
numValues_,
values_,
std::move(stringBuffers_));
}
*result = flatMapValueFlatValues_;
} else {
*result = std::make_shared<FlatVector<TVector>>(
&memoryPool_,
type,
resultNulls(),
numValues_,
values_,
std::move(stringBuffers_));
}
}

template <>
Expand Down
Loading

0 comments on commit 36a9fb8

Please sign in to comment.