From 36a9fb86fba500621314db03ced1a9d52c4e7784 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 15 Apr 2024 07:27:04 -0700 Subject: [PATCH] Optimize DWRF flatmap reader (#9486) Summary: 1. Avoid copying and replicating value scan spec 2. Access inMap and child values sequentially one at a time to avoid cache thrashing 3. Reuse column reader `nullsInReadRange`, `values_`, child value vector, and result vector 4. Use thread local instead of atomic for `MmapAllocator::numMallocBytes_` Differential Revision: D56124127 --- velox/common/memory/MmapAllocator.cpp | 4 +- velox/common/memory/MmapAllocator.h | 8 +- velox/dwio/common/ScanSpec.cpp | 12 - velox/dwio/common/ScanSpec.h | 4 - velox/dwio/common/SelectiveColumnReader.cpp | 12 - velox/dwio/common/SelectiveColumnReader.h | 16 +- .../common/SelectiveColumnReaderInternal.h | 71 ++- .../reader/SelectiveFlatMapColumnReader.cpp | 518 +++++------------- velox/vector/FlatVector.h | 9 + 9 files changed, 214 insertions(+), 440 deletions(-) diff --git a/velox/common/memory/MmapAllocator.cpp b/velox/common/memory/MmapAllocator.cpp index 65d5abe7aa5df..88839e6b26622 100644 --- a/velox/common/memory/MmapAllocator.cpp +++ b/velox/common/memory/MmapAllocator.cpp @@ -437,7 +437,7 @@ void* MmapAllocator::allocateBytesWithoutRetry( VELOX_MEM_LOG(ERROR) << "Failed to allocateBytes " << bytes << " bytes with " << alignment << " alignment"; } else { - numMallocBytes_.fetch_add(bytes); + numMallocBytes_ += bytes; } return result; } @@ -472,7 +472,7 @@ void* MmapAllocator::allocateBytesWithoutRetry( void MmapAllocator::freeBytes(void* p, uint64_t bytes) noexcept { if (useMalloc(bytes)) { ::free(p); // NOLINT - numMallocBytes_.fetch_sub(bytes); + numMallocBytes_ -= bytes; return; } diff --git a/velox/common/memory/MmapAllocator.h b/velox/common/memory/MmapAllocator.h index 5678a91f3008a..9013817c36a42 100644 --- a/velox/common/memory/MmapAllocator.h +++ b/velox/common/memory/MmapAllocator.h @@ -24,6 +24,8 @@ #include #include +#include + #include "velox/common/base/SimdUtil.h" #include "velox/common/memory/MemoryAllocator.h" #include "velox/common/memory/MemoryPool.h" @@ -130,7 +132,7 @@ class MmapAllocator : public MemoryAllocator { } size_t totalUsedBytes() const override { - return numMallocBytes_ + AllocationTraits::pageBytes(numAllocated_); + return numMallocBytes() + AllocationTraits::pageBytes(numAllocated_); } MachinePageCount numAllocated() const override { @@ -146,7 +148,7 @@ class MmapAllocator : public MemoryAllocator { } uint64_t numMallocBytes() const { - return numMallocBytes_; + return numMallocBytes_.readFull(); } Stats stats() const override { @@ -410,7 +412,7 @@ class MmapAllocator : public MemoryAllocator { std::atomic numAllocations_ = 0; std::atomic numAllocatedPages_ = 0; std::atomic numAdvisedPages_ = 0; - std::atomic numMallocBytes_ = 0; + folly::ThreadCachedInt numMallocBytes_; // Allocations that are larger than largest size classes will be delegated to // ManagedMmapArenas, to avoid calling mmap on every allocation. diff --git a/velox/dwio/common/ScanSpec.cpp b/velox/dwio/common/ScanSpec.cpp index 4845b3242a773..6708223711998 100644 --- a/velox/dwio/common/ScanSpec.cpp +++ b/velox/dwio/common/ScanSpec.cpp @@ -402,18 +402,6 @@ std::string ScanSpec::toString() const { return out.str(); } -std::shared_ptr ScanSpec::removeChild(const ScanSpec* child) { - for (auto it = children_.begin(); it != children_.end(); ++it) { - if (it->get() == child) { - auto removed = std::move(*it); - children_.erase(it); - childByFieldName_.erase(removed->fieldName()); - return removed; - } - } - return nullptr; -} - void ScanSpec::addFilter(const Filter& filter) { filter_ = filter_ ? filter_->mergeWith(&filter) : filter.clone(); } diff --git a/velox/dwio/common/ScanSpec.h b/velox/dwio/common/ScanSpec.h index a75ff65496ff5..3e242025fc3d2 100644 --- a/velox/dwio/common/ScanSpec.h +++ b/velox/dwio/common/ScanSpec.h @@ -216,10 +216,6 @@ class ScanSpec { return it->second; } - // Remove a child from this scan spec, returning the removed child. This is - // used for example to transform a flatmap scan spec into a struct scan spec. - std::shared_ptr removeChild(const ScanSpec* child); - SelectivityInfo& selectivity() { return selectivity_; } diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index cfc9973e2ebf2..bf316f770adbe 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -66,7 +66,6 @@ const std::vector& SelectiveColumnReader::children() } void SelectiveColumnReader::seekTo(vector_size_t offset, bool readsNullsOnly) { - VELOX_TRACE_HISTORY_PUSH("seekTo %d %d", offset, readsNullsOnly); if (offset == readOffset_) { return; } @@ -393,17 +392,6 @@ void SelectiveColumnReader::addStringValue(folly::StringPiece value) { StringView(copy, value.size()); } -bool SelectiveColumnReader::readsNullsOnly() const { - auto filter = scanSpec_->filter(); - if (filter) { - auto kind = filter->kind(); - return kind == velox::common::FilterKind::kIsNull || - (!scanSpec_->keepValues() && - kind == velox::common::FilterKind::kIsNotNull); - } - return false; -} - void SelectiveColumnReader::setNulls(BufferPtr resultNulls) { resultNulls_ = resultNulls; rawResultNulls_ = resultNulls ? resultNulls->asMutable() : nullptr; diff --git a/velox/dwio/common/SelectiveColumnReader.h b/velox/dwio/common/SelectiveColumnReader.h index 354c64185ee72..9365c5c858c0a 100644 --- a/velox/dwio/common/SelectiveColumnReader.h +++ b/velox/dwio/common/SelectiveColumnReader.h @@ -430,11 +430,6 @@ class SelectiveColumnReader { static constexpr int8_t kNoValueSize = -1; static constexpr uint32_t kRowGroupNotSet = ~0; - // True if we have an is null filter and optionally return column - // values or we have an is not null filter and do not return column - // values. This means that only null flags need be accessed. - bool readsNullsOnly() const; - template void ensureValuesCapacity(vector_size_t numRows); @@ -442,9 +437,13 @@ class SelectiveColumnReader { // 'extraSpace' bits worth of space in the nulls buffer. void prepareNulls(RowSet rows, bool hasNulls, int32_t extraRows = 0); + void setFlatMapValue(bool value) { + flatMapValue_ = value; + } + protected: // Filters 'rows' according to 'is_null'. Only applies to cases where - // readsNullsOnly() is true. + // scanSpec_->readsNullsOnly() is true. template void filterNulls(RowSet rows, bool isNull, bool extractValues); @@ -638,6 +637,11 @@ class SelectiveColumnReader { // Encoding-related state to keep between reads, e.g. dictionaries. ScanState scanState_; + + bool flatMapValue_ = false; + BufferPtr flatMapValueNullsInReadRange_; + VectorPtr flatMapValueFlatValues_; + VectorPtr flatMapValueConstantNullValues_; }; template <> diff --git a/velox/dwio/common/SelectiveColumnReaderInternal.h b/velox/dwio/common/SelectiveColumnReaderInternal.h index e4df3ebcb958b..a6b0d317e1c08 100644 --- a/velox/dwio/common/SelectiveColumnReaderInternal.h +++ b/velox/dwio/common/SelectiveColumnReaderInternal.h @@ -47,7 +47,7 @@ class Timer { template void SelectiveColumnReader::ensureValuesCapacity(vector_size_t numRows) { - if (values_ && values_->unique() && + if (values_ && (flatMapValue_ || values_->unique()) && values_->capacity() >= BaseVector::byteSize(numRows) + simd::kPadding) { return; @@ -62,15 +62,22 @@ void SelectiveColumnReader::prepareRead( vector_size_t offset, RowSet rows, const uint64_t* incomingNulls) { - seekTo(offset, scanSpec_->readsNullsOnly()); + const bool readsNullsOnly = scanSpec_->readsNullsOnly(); + seekTo(offset, readsNullsOnly); vector_size_t numRows = rows.back() + 1; - // Do not re-use unless singly-referenced. - if (nullsInReadRange_ && !nullsInReadRange_->unique()) { + if (flatMapValue_) { + if (!nullsInReadRange_) { + nullsInReadRange_ = std::move(flatMapValueNullsInReadRange_); + } + } else if (nullsInReadRange_ && !nullsInReadRange_->unique()) { nullsInReadRange_.reset(); } formatData_->readNulls( - numRows, incomingNulls, nullsInReadRange_, readsNullsOnly()); + numRows, incomingNulls, nullsInReadRange_, readsNullsOnly); + if (flatMapValue_ && nullsInReadRange_) { + flatMapValueNullsInReadRange_ = nullsInReadRange_; + } // We check for all nulls and no nulls. We expect both calls to // bits::isAllSet to fail early in the common case. We could do a // single traversal of null bits counting the bits and then compare @@ -116,14 +123,19 @@ void SelectiveColumnReader::getFlatValues( mayGetValues_ = false; } if (allNull_) { - *result = std::make_shared>( - &memoryPool_, - rows.size(), - true, - type, - T(), - SimpleVectorStats{}, - sizeof(TVector) * rows.size()); + if (flatMapValue_) { + if (flatMapValueConstantNullValues_) { + flatMapValueConstantNullValues_->resize(rows.size()); + } else { + flatMapValueConstantNullValues_ = + std::make_shared>( + &memoryPool_, rows.size(), true, type, T()); + } + *result = flatMapValueConstantNullValues_; + } else { + *result = std::make_shared>( + &memoryPool_, rows.size(), true, type, T()); + } return; } if (valueSize_ == sizeof(TVector)) { @@ -134,13 +146,32 @@ void SelectiveColumnReader::getFlatValues( upcastScalarValues(rows); } valueSize_ = sizeof(TVector); - *result = std::make_shared>( - &memoryPool_, - type, - resultNulls(), - numValues_, - values_, - std::move(stringBuffers_)); + if (flatMapValue_) { + if (flatMapValueFlatValues_) { + auto* flat = flatMapValueFlatValues_->asUnchecked>(); + flat->unsafeSetSize(numValues_); + flat->setNulls(resultNulls()); + flat->unsafeSetValues(values_); + flat->setStringBuffers(std::move(stringBuffers_)); + } else { + flatMapValueFlatValues_ = std::make_shared>( + &memoryPool_, + type, + resultNulls(), + numValues_, + values_, + std::move(stringBuffers_)); + } + *result = flatMapValueFlatValues_; + } else { + *result = std::make_shared>( + &memoryPool_, + type, + resultNulls(), + numValues_, + values_, + std::move(stringBuffers_)); + } } template <> diff --git a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp index c25c6cae4beb6..3717583be1d43 100644 --- a/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveFlatMapColumnReader.cpp @@ -22,8 +22,6 @@ namespace facebook::velox::dwrf { -bool noFastPath = false; - namespace { template @@ -93,17 +91,18 @@ std::vector> getKeyNodes( auto& stripe = params.stripeStreams(); auto keyPredicate = prepareKeyPredicate(requestedType, stripe); - std::shared_ptr keysSpec; - std::shared_ptr valuesSpec; + common::ScanSpec* keysSpec = nullptr; + common::ScanSpec* valuesSpec = nullptr; if (!asStruct) { - if (auto keys = scanSpec.childByName(common::ScanSpec::kMapKeysFieldName)) { - keysSpec = scanSpec.removeChild(keys); - } - if (auto values = - scanSpec.childByName(common::ScanSpec::kMapValuesFieldName)) { - VELOX_CHECK(!values->hasFilter()); - valuesSpec = scanSpec.removeChild(values); - } + keysSpec = scanSpec.getOrCreateChild( + common::Subfield(common::ScanSpec::kMapKeysFieldName)); + valuesSpec = scanSpec.getOrCreateChild( + common::Subfield(common::ScanSpec::kMapValuesFieldName)); + VELOX_CHECK(!valuesSpec->hasFilter()); + keysSpec->setProjectOut(true); + keysSpec->setExtractValues(true); + valuesSpec->setProjectOut(true); + valuesSpec->setExtractValues(true); } std::unordered_map, common::ScanSpec*, KeyValueHash> @@ -147,14 +146,7 @@ std::vector> getKeyNodes( !common::applyFilter(*keysSpec->filter(), key.get())) { return; // Subfield pruning } - childSpec = - scanSpec.getOrCreateChild(common::Subfield(toString(key.get()))); - childSpec->setProjectOut(true); - childSpec->setExtractValues(true); - if (valuesSpec) { - *childSpec = *valuesSpec; - } - childSpecs[key] = childSpec; + childSpecs[key] = childSpec = valuesSpec; } auto labels = params.streamLabels().append(toString(key.get())); auto inMap = stripe.getStream( @@ -225,24 +217,18 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { fileType, params, scanSpec), - // Copy the scan spec because we need to remove the children. - structScanSpec_(scanSpec) { - scanSpec_ = &structScanSpec_; - keyNodes_ = - getKeyNodes(requestedType, fileType, params, structScanSpec_, false); + keyNodes_( + getKeyNodes(requestedType, fileType, params, scanSpec, false)) { std::sort(keyNodes_.begin(), keyNodes_.end(), [](auto& x, auto& y) { return x.sequence < y.sequence; }); - childValues_.resize(keyNodes_.size()); - copyRanges_.resize(keyNodes_.size()); children_.resize(keyNodes_.size()); for (int i = 0; i < keyNodes_.size(); ++i) { children_[i] = keyNodes_[i].reader.get(); + children_[i]->setFlatMapValue(true); } if (auto type = requestedType_->type()->childAt(1); type->isRow()) { - for (auto& vec : childValues_) { - vec = BaseVector::create(type, 0, &memoryPool_); - } + childValues_ = BaseVector::create(type, 0, &memoryPool_); } } @@ -272,8 +258,11 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { } activeRows = outputRows_; } + // Separate the loop to be cache friendly. for (auto* reader : children_) { advanceFieldReader(reader, offset); + } + for (auto* reader : children_) { reader->read(offset, activeRows, mapNulls); reader->addParentNulls(offset, mapNulls, rows); } @@ -282,59 +271,95 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { } void getValues(RowSet rows, VectorPtr* result) override { - for (int k = 0; k < children_.size(); ++k) { - children_[k]->getValues(rows, &childValues_[k]); - copyRanges_[k].clear(); - } - auto offsets = - AlignedBuffer::allocate(rows.size(), &memoryPool_); - auto sizes = - AlignedBuffer::allocate(rows.size(), &memoryPool_); + auto& mapResult = prepareResult(*result, rows.size()); + auto* rawOffsets = mapResult.mutableOffsets(rows.size()) + ->template asMutable(); + auto* rawSizes = mapResult.mutableSizes(rows.size()) + ->template asMutable(); auto* nulls = nullsInReadRange_ ? nullsInReadRange_->as() : nullptr; + auto numNestedRows = calculateOffsets(rows, nulls, rawOffsets, rawSizes); + auto& keys = mapResult.mapKeys(); + auto& values = mapResult.mapValues(); + BaseVector::prepareForReuse(keys, numNestedRows); + BaseVector::prepareForReuse(values, numNestedRows); + auto* flatKeys = keys->template asFlatVector(); + VELOX_DYNAMIC_TYPE_DISPATCH( + copyValues, values->typeKind(), rows, flatKeys, rawOffsets, *values); + VELOX_CHECK_EQ(rawOffsets[rows.size() - 1], numNestedRows); + std::copy_backward( + rawOffsets, rawOffsets + rows.size() - 1, rawOffsets + rows.size()); + rawOffsets[0] = 0; + result->get()->setNulls(resultNulls()); + } - if (!noFastPath && fastPath(offsets, sizes, rows, nulls, result)) { - return; + private: + MapVector& prepareResult(VectorPtr& result, vector_size_t size) { + if (result && result->encoding() == VectorEncoding::Simple::MAP && + result.unique()) { + result->resetDataDependentFlags(nullptr); + result->resize(size); + } else { + VLOG(1) << "Reallocating result MAP vector of size " << size; + result = BaseVector::create(requestedType_->type(), size, &memoryPool_); } - auto* rawOffsets = offsets->template asMutable(); - auto* rawSizes = sizes->template asMutable(); - vector_size_t totalSize = 0; + return *result->asUnchecked(); + } + + vector_size_t calculateOffsets( + RowSet rows, + const uint64_t* nulls, + vector_size_t* offsets, + vector_size_t* sizes) { + inMaps_.resize(children_.size()); + for (int k = 0; k < children_.size(); ++k) { + auto& data = static_cast(children_[k]->formatData()); + inMaps_[k] = data.inMap(); + if (!inMaps_[k]) { + inMaps_[k] = nulls; + } + } + columnRowBits_.resize(bits::nwords(children_.size() * rows.size())); + std::fill(columnRowBits_.begin(), columnRowBits_.end(), 0); + std::fill(sizes, sizes + rows.size(), 0); + for (int k = 0; k < children_.size(); ++k) { + if (inMaps_[k]) { + for (vector_size_t i = 0; i < rows.size(); ++i) { + if (bits::isBitSet(inMaps_[k], rows[i])) { + bits::setBit(columnRowBits_.data(), i + k * rows.size()); + ++sizes[i]; + } + } + } else { + for (vector_size_t i = 0; i < rows.size(); ++i) { + bits::setBit(columnRowBits_.data(), i + k * rows.size()); + ++sizes[i]; + } + } + } + vector_size_t numNestedRows = 0; for (vector_size_t i = 0; i < rows.size(); ++i) { if (nulls && bits::isBitNull(nulls, rows[i])) { - rawSizes[i] = 0; - if (!returnReaderNulls_) { bits::setNull(rawResultNulls_, i); } - anyNulls_ = true; - - continue; - } - int currentRowSize = 0; - for (int k = 0; k < children_.size(); ++k) { - auto& data = static_cast(children_[k]->formatData()); - auto* inMap = data.inMap(); - if (inMap && bits::isBitNull(inMap, rows[i])) { - continue; - } - copyRanges_[k].push_back({ - .sourceIndex = i, - .targetIndex = totalSize + currentRowSize, - .count = 1, - }); - ++currentRowSize; } - rawOffsets[i] = totalSize; - rawSizes[i] = currentRowSize; - totalSize += currentRowSize; + offsets[i] = numNestedRows; + numNestedRows += sizes[i]; } - auto& mapType = requestedType_->type()->asMap(); - VectorPtr keys = - BaseVector::create(mapType.keyType(), totalSize, &memoryPool_); - VectorPtr values = - BaseVector::create(mapType.valueType(), totalSize, &memoryPool_); - auto* flatKeys = keys->asFlatVector(); + return numNestedRows; + } + + template + void copyValues( + RowSet rows, + FlatVector* flatKeys, + vector_size_t* rawOffsets, + BaseVector& values) { + constexpr bool kDirectCopy = + TypeKind::TINYINT <= kKind && kKind <= TypeKind::DOUBLE; + using ValueType = typename TypeTraits::NativeType; T* rawKeys = flatKeys->mutableRawValues(); [[maybe_unused]] size_t strKeySize; [[maybe_unused]] char* rawStrKeyBuffer; @@ -360,6 +385,14 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { strKeySize = 0; } } + [[maybe_unused]] ValueType* targetValues; + [[maybe_unused]] uint64_t* targetNulls; + if constexpr (kDirectCopy) { + VELOX_CHECK(values.isFlatEncoding()); + auto* flat = values.asUnchecked>(); + targetValues = flat->mutableRawValues(); + targetNulls = flat->mutableRawNulls(); + } for (int k = 0; k < children_.size(); ++k) { [[maybe_unused]] StringView strKey; if constexpr (std::is_same_v) { @@ -371,326 +404,49 @@ class SelectiveFlatMapReader : public SelectiveStructColumnReaderBase { strKeySize += strKey.size(); } } - for (auto& r : copyRanges_[k]) { - if constexpr (std::is_same_v) { - rawKeys[r.targetIndex] = strKey; - } else { - rawKeys[r.targetIndex] = keyNodes_[k].key.get(); - } + children_[k]->getValues(rows, &childValues_); + if constexpr (kDirectCopy) { + decodedChildValues_.decode(*childValues_); } - values->copyRanges(childValues_[k].get(), copyRanges_[k]); - } - *result = std::make_shared( - &memoryPool_, - requestedType_->type(), - resultNulls(), - rows.size(), - std::move(offsets), - std::move(sizes), - std::move(keys), - std::move(values)); - } - - private: - // Sets the bits for present and selected positions in 'rowColumnBits_' for - // 'columnIdx' for the 64 rows selected by the inMap and selected bitmaps. - // 'baseRow' is the number of selectd rows below the range covered by 'inMap' - // and 'selected' - void setRowBits( - uint64_t inMap, - uint64_t selected, - int32_t baseRow, - int32_t columnIdx) { - auto pitch = children_.size(); - auto rowColumns = rowColumnBits_.data(); - auto selectedPresent = selected & inMap; - while (selectedPresent) { - int32_t row = __builtin_ctzll(selectedPresent); - auto nthRow = __builtin_popcountll(selected & bits::lowMask(row)); - bits::setBit(rowColumns, (nthRow + baseRow) * pitch + columnIdx, true); - selectedPresent &= selectedPresent - 1; - } - } - - // Returns the count of selected rows that have a value in inMap. Sets the - // corresponding bits in 'rowColumnBits_'. - int32_t countInMap( - const uint64_t* inMap, - const uint64_t* selected, - int32_t numRows, - int32_t columnIdx) { - int32_t numSelected = 0; - int32_t count = 0; - bits::forEachWord( - 0, - numRows, - [&](int32_t idx, uint64_t mask) { - count += __builtin_popcountll(inMap[idx] & selected[idx] & mask); - setRowBits(inMap[idx], selected[idx] & mask, numSelected, columnIdx); - numSelected += __builtin_popcountll(selected[idx] & mask); - }, - [&](int32_t idx) { - count += __builtin_popcountll(inMap[idx] & selected[idx]); - setRowBits(inMap[idx], selected[idx], numSelected, columnIdx); - numSelected += __builtin_popcountll(selected[idx]); - }); - return count; - } - - template - void fillValues( - const BufferPtr& offsets, - const BufferPtr& sizes, - const uint64_t* mapNulls, - RowSet rows, - int32_t totalChildValues, - T* rawKeys) { - using V = typename TypeTraits::NativeType; - auto rawOffsets = offsets->asMutable(); - auto rawSizes = sizes->asMutable(); - int32_t pitch = children_.size(); - auto values = AlignedBuffer::allocate(totalChildValues, &memoryPool_); - auto rawValues = values->template asMutable(); - BufferPtr valueNulls; - uint64_t* valueRawNulls = nullptr; - if (nullsInChildValues_) { - valueNulls = AlignedBuffer::allocate( - totalChildValues, &memoryPool_, bits::kNotNull); - valueRawNulls = valueNulls->asMutable(); - } - bool mayHaveDict = !childIndices_.empty(); - int32_t startBit = 0; - int32_t fill = 0; - for (int32_t rowIndex = 0; rowIndex < rows.size(); ++rowIndex) { - int32_t row = rows[rowIndex]; - if (mapNulls && bits::isBitNull(mapNulls, row)) { - rawSizes[rowIndex] = 0; - rawOffsets[rowIndex] = 0; - if (!returnReaderNulls_) { - if (!rawResultNulls_) { - mutableNulls(rows.size()); - } - bits::setNull(rawResultNulls_, rowIndex); - anyNulls_ = true; - } - // A row which is null for the whole map will only have not presents in - // inMap. - startBit += pitch; - continue; - } - rawOffsets[rowIndex] = fill; + const auto begin = k * rows.size(); bits::forEachSetBit( - rowColumnBits_.data(), - startBit, - startBit + pitch, - [&](int32_t index) { - auto column = index - startBit; - rawKeys[fill] = keyValues_[column]; - if (childRawNulls_[column] && - bits::isBitNull(childRawNulls_[column], rowIndex)) { - bits::setNull(valueRawNulls, fill); + columnRowBits_.data(), + begin, + begin + rows.size(), + [&](vector_size_t i) { + i -= begin; + if constexpr (std::is_same_v) { + rawKeys[rawOffsets[i]] = strKey; + } else { + rawKeys[rawOffsets[i]] = keyNodes_[k].key.get(); + } + if constexpr (kDirectCopy) { + targetValues[rawOffsets[i]] = + decodedChildValues_.valueAt(i); + bits::setNull( + targetNulls, rawOffsets[i], decodedChildValues_.isNullAt(i)); } else { - auto valueIndex = rowIndex; - if (mayHaveDict && childIndices_[column]) { - valueIndex = childIndices_[column][rowIndex]; - } - rawValues[fill] = reinterpret_cast( - childRawValues_[column])[valueIndex]; + copyRanges_.push_back({ + .sourceIndex = i, + .targetIndex = rawOffsets[i], + .count = 1, + }); } - ++fill; + ++rawOffsets[i]; }); - rawSizes[rowIndex] = fill - rawOffsets[rowIndex]; - startBit += pitch; - } - VELOX_CHECK_EQ(fill, totalChildValues); - std::vector allStrings; - if constexpr (std::is_same_v) { - for (auto i = 0; i < childValues_.size(); ++i) { - std::vector strings = vectorStrings(*childValues_[i]); - allStrings.insert(allStrings.end(), strings.begin(), strings.end()); - } - } - - valueVector_ = std::make_shared>( - &memoryPool_, - requestedType_->type()->childAt(1), - std::move(valueNulls), - totalChildValues, - std::move(values), - std::move(allStrings)); - } - - static std::vector vectorStrings(BaseVector& vector) { - if (vector.encoding() == VectorEncoding::Simple::CONSTANT) { - auto buffer = - vector.asUnchecked>()->getStringBuffer(); - if (!buffer) { - return {}; - } - return {buffer}; - } else if (vector.encoding() == VectorEncoding::Simple::FLAT) { - return vector.asUnchecked>()->stringBuffers(); - } else if (vector.encoding() == VectorEncoding::Simple::DICTIONARY) { - return vector.valueVector() - ->asUnchecked>() - ->stringBuffers(); - } else { - VELOX_FAIL("String value is is neither flat, dictionary nor constant"); - } - } - - bool fastPath( - BufferPtr& offsets, - BufferPtr& sizes, - RowSet rows, - const uint64_t* nulls, - VectorPtr* result) { - auto& valueType = requestedType_->type()->childAt(1); - auto valueKind = valueType->kind(); - if (valueKind == TypeKind::MAP || valueKind == TypeKind::ROW || - valueKind == TypeKind::ARRAY) { - return false; - } - initKeyValues(); - assert(!children_.empty()); - childRawNulls_.resize(children_.size()); - childRawValues_.resize(children_.size()); - inMap_.resize(children_.size()); - if (!childIndices_.empty()) { - std::fill(childIndices_.begin(), childIndices_.end(), nullptr); - } - SelectivityVector selectedInMap(rows.back() + 1, false); - for (auto row : rows) { - selectedInMap.setValid(row, true); - } - int32_t totalChildValues = 0; - rowColumnBits_.resize(bits::nwords(rows.size() * children_.size())); - std::fill(rowColumnBits_.begin(), rowColumnBits_.end(), 0); - nullsInChildValues_ = false; - for (auto i = 0; i < children_.size(); ++i) { - auto& data = static_cast(children_[i]->formatData()); - inMap_[i] = data.inMap(); - auto& child = childValues_[i]; - childRawNulls_[i] = child->rawNulls(); - if (childRawNulls_[i]) { - nullsInChildValues_ = true; - } - if (child->encoding() == VectorEncoding::Simple::DICTIONARY) { - childIndices_.resize(children_.size()); - childIndices_[i] = child->wrapInfo()->template as(); - childRawValues_[i] = child->valueVector()->valuesAsVoid(); - } else if (child->encoding() == VectorEncoding::Simple::FLAT) { - childRawValues_[i] = childValues_[i]->valuesAsVoid(); - } else if (child->encoding() == VectorEncoding::Simple::CONSTANT) { - if (zeros_.size() < rows.size()) { - zeros_.resize(rows.size()); - } - childRawValues_[i] = child->valuesAsVoid(); - if (childValues_[i]->isNullAt(0)) { - // There are at least rows worth of zero words, so this can serve as - // an all null bitmap. - childRawNulls_[i] = reinterpret_cast(zeros_.data()); - } else { - childRawNulls_[i] = nullptr; - } - childIndices_.resize(children_.size()); - // Every null is redirected to row 0, which is valuesAsVoid of constant - // vector. - childIndices_[i] = zeros_.data(); - } else { - VELOX_FAIL( - "Flat map columns must be flat or single level dictionaries"); - } - totalChildValues += countInMap( - inMap_[i], selectedInMap.asRange().bits(), selectedInMap.size(), i); - } - BufferPtr keyBuffer = - AlignedBuffer::allocate(totalChildValues, &memoryPool_); - VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( - fillValues, - valueKind, - offsets, - sizes, - nulls, - rows, - totalChildValues, - keyBuffer->template asMutable()); - - std::vector keyStrings; - if (std::is_same_v && keyStrings_) { - keyStrings.push_back(keyStrings_); - } - auto keyVector = std::make_shared>( - &memoryPool_, - requestedType_->type()->childAt(0), - BufferPtr(nullptr), - totalChildValues, - std::move(keyBuffer), - std::move(keyStrings)); - *result = std::make_shared( - &memoryPool_, - requestedType_->type(), - resultNulls(), - rows.size(), - std::move(offsets), - std::move(sizes), - std::move(keyVector), - std::move(valueVector_)); - return true; - } - - void initKeyValues() { - if (!keyValues_.empty()) { - return; - } - assert(!children_.empty()); - keyValues_.resize(children_.size()); - if constexpr (std::is_same_v) { - int32_t strKeySize = 0; - for (int k = 0; k < children_.size(); ++k) { - if (!keyNodes_[k].key.get().isInline()) { - strKeySize += keyNodes_[k].key.get().size(); - } - } - char* rawStrKeyBuffer = nullptr; - if (strKeySize > 0) { - keyStrings_ = AlignedBuffer::allocate(strKeySize, &memoryPool_); - rawStrKeyBuffer = keyStrings_->template asMutable(); - strKeySize = 0; - } - for (int k = 0; k < children_.size(); ++k) { - auto& s = keyNodes_[k].key.get(); - if (!s.isInline()) { - memcpy(&rawStrKeyBuffer[strKeySize], s.data(), s.size()); - *reinterpret_cast(&keyValues_[k]) = - StringView(&rawStrKeyBuffer[strKeySize], s.size()); - strKeySize += s.size(); - } else { - keyValues_[k] = s; - } - } - } else { - for (auto i = 0; i < children_.size(); ++i) { - keyValues_[i] = keyNodes_[i].key.get(); + if constexpr (!kDirectCopy) { + values.copyRanges(childValues_.get(), copyRanges_); + copyRanges_.clear(); } } } - common::ScanSpec structScanSpec_; std::vector> keyNodes_; - std::vector childValues_; - std::vector> copyRanges_; - std::vector inMap_; - std::vector childRawNulls_; - // if a child is dictionary encoded, these are indices of non-null values. - std::vector childIndices_; - std::vector childRawValues_; - std::vector rowColumnBits_; - std::vector keyValues_; - BufferPtr keyStrings_; - bool nullsInChildValues_{false}; - VectorPtr valueVector_; - std::vector zeros_; + VectorPtr childValues_; + DecodedVector decodedChildValues_; + std::vector inMaps_; + std::vector columnRowBits_; + std::vector copyRanges_; }; template diff --git a/velox/vector/FlatVector.h b/velox/vector/FlatVector.h index b9ca51af67b91..6a71803318121 100644 --- a/velox/vector/FlatVector.h +++ b/velox/vector/FlatVector.h @@ -495,6 +495,15 @@ class FlatVector final : public SimpleVector { } } + void unsafeSetSize(vector_size_t newSize) { + this->length_ = newSize; + } + + void unsafeSetValues(BufferPtr values) { + values_ = std::move(values); + rawValues_ = values_ ? const_cast(values_->as()) : nullptr; + } + private: void ensureValues() { if (rawValues_ == nullptr) {