diff --git a/velox/common/base/BitUtil.h b/velox/common/base/BitUtil.h index 75b118ce094f..3ef7856ed554 100644 --- a/velox/common/base/BitUtil.h +++ b/velox/common/base/BitUtil.h @@ -18,6 +18,8 @@ #include "velox/common/base/Exceptions.h" +#include + #include #include #include @@ -28,6 +30,20 @@ #include #endif +// Remove once we upgrade folly. +#ifndef FOLLY_BUILTIN_MEMCPY +#if FOLLY_HAS_BUILTIN(__builtin_memcpy_inline) +#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \ + void(__builtin_memcpy_inline((dest), (src), (size))) +#elif FOLLY_HAS_BUILTIN(__builtin_memcpy) +#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \ + void(__builtin_memcpy((dest), (src), (size))) +#else +#define FOLLY_BUILTIN_MEMCPY(dest, src, size) \ + void(::std::memcpy((dest), (src), (size))) +#endif +#endif + namespace facebook { namespace velox { namespace bits { diff --git a/velox/common/base/IndexedPriorityQueue.h b/velox/common/base/IndexedPriorityQueue.h index 302cbf7aae8f..348915045c30 100644 --- a/velox/common/base/IndexedPriorityQueue.h +++ b/velox/common/base/IndexedPriorityQueue.h @@ -16,178 +16,223 @@ #pragma once -#include -#include -#include - #include "velox/common/base/Exceptions.h" +#include + namespace facebook::velox { + /// A priority queue with values of type 'T'. Each value has assigned priority -/// on insertion which determines the value's position in the quue. It supports +/// on insertion which determines the value's position in the queue. It supports /// to update the priority of the existing values, which adjusts the value's -/// position in the queue accordingly. Ties are broken by insertion order. If -/// 'MaxQueue' is true, it is a max priority queue, otherwise a min priority -/// queue. +/// position in the queue accordingly. Ties are broken by insertion and update +/// order. If 'kMaxQueue' is true, it is a max priority queue, otherwise a min +/// priority queue. /// /// NOTE: this class is not thread-safe. -template +template < + typename T, + bool kMaxQueue, + typename Allocator = std::allocator, + typename Hash = std::hash, + typename EqualTo = std::equal_to> class IndexedPriorityQueue { public: - IndexedPriorityQueue() = default; + explicit IndexedPriorityQueue(const Allocator& allocator = {}) + : values_(allocator), + priorities_(RebindAlloc(allocator)), + generations_(RebindAlloc(allocator)), + heap_(RebindAlloc(allocator)), + valueIndices_(RebindAlloc>(allocator)), + heapIndices_(RebindAlloc(allocator)) {} ~IndexedPriorityQueue() { - VELOX_CHECK_EQ(queue_.size(), index_.size()); + validate(); } - size_t size() const { - return queue_.size(); + int32_t size() const { + return values_.size(); } bool empty() const { - return queue_.empty(); + return values_.empty(); } /// Inserts 'value' into the queue with specified 'priority'. If 'value' /// already exists, then update its priority and the corresponding position in /// the queue. - bool addOrUpdate(T value, uint64_t priority) { - auto it = index_.find(value); - if (it != index_.end()) { - if (it->second->priority() == priority) { - return false; - } - queue_.erase(it->second.get()); - - it->second->updatePriority(priority); - queue_.insert(it->second.get()); + bool addOrUpdate(const T& value, int64_t priority) { + auto it = valueIndices_.find(value); + if (it == valueIndices_.end()) { + addNewValue(value, priority); + return true; + } + auto i = it->second; + if (priorities_[i] == priority) { return false; } - - auto newEntry = std::make_unique(value, priority, generation_++); - queue_.insert(newEntry.get()); - index_.emplace(value, std::move(newEntry)); + updatePriority(i, priority); return true; } - std::optional pop() { - VELOX_CHECK_EQ(queue_.size(), index_.size()); - if (queue_.empty()) { - return std::nullopt; - } - - auto it = queue_.begin(); - Entry* removedEntry = *it; - const auto value = removedEntry->value(); - queue_.erase(it); - VELOX_CHECK(index_.contains(removedEntry->value())); - index_.erase(removedEntry->value()); - return value; - } - - /// Describes the state of an inserted 'value' in the queue. - class Entry { - public: - Entry(T value, uint64_t priority, uint64_t generation) - : value_(std::move(value)), - priority_(priority), - generation_(generation) {} - - const T& value() const { - return value_; - } + const T& top() const { + VELOX_DCHECK(!heap_.empty()); + return values_[heap_[0]]; + } - uint64_t priority() const { - return priority_; - } + int64_t topPriority() const { + VELOX_DCHECK(!heap_.empty()); + return priorities_[heap_[0]]; + } - void updatePriority(uint64_t priority) { - priority_ = priority; + T pop() { + VELOX_DCHECK(!heap_.empty()); + auto i = heap_[0]; + heap_[0] = heap_.back(); + heapIndices_[heap_.back()] = 0; + heap_.pop_back(); + if (!heap_.empty()) { + percolateDown(0); } - - uint64_t generation() const { - return generation_; + auto result = values_[i]; + valueIndices_.erase(values_[i]); + if (i != size() - 1) { + valueIndices_[values_.back()] = i; + values_[i] = values_.back(); + priorities_[i] = priorities_.back(); + generations_[i] = generations_.back(); + heap_[heapIndices_.back()] = i; + heapIndices_[i] = heapIndices_.back(); } + values_.pop_back(); + priorities_.pop_back(); + generations_.pop_back(); + heapIndices_.pop_back(); + return result; + } - private: - const T value_; - uint64_t priority_; - const uint64_t generation_; - }; - - /// Used to iterate through the queue in priority order. - class Iterator { - public: - Iterator( - typename std::set::const_iterator cur, - typename std::set::const_iterator end) - : end_{end}, cur_{cur}, val_{0} { - if (cur_ != end_) { - val_ = (*cur_)->value(); - } - } + const T* values() const { + return values_.data(); + } - bool operator==(const Iterator& other) const { - return std::tie(cur_, end_) == std::tie(other.cur_, other.end_); - } + const int64_t* priorities() const { + return priorities_.data(); + } - bool operator!=(const Iterator& other) const { - return !operator==(other); + std::optional getValueIndex(const T& value) const { + auto it = valueIndices_.find(value); + if (it != valueIndices_.end()) { + return it->second; } + return std::nullopt; + } - Iterator& operator++() { - VELOX_DCHECK(cur_ != end_); - ++cur_; - if (cur_ != end_) { - val_ = (*cur_)->value(); - } - return *this; + void updatePriority(int index, int64_t priority) { + bool up = priority < priorities_[index]; + if constexpr (kMaxQueue) { + up = !up; } - - const T& operator*() const { - VELOX_DCHECK(cur_ != end_); - return val_; + priorities_[index] = priority; + generations_[index] = ++generation_; + if (up) { + percolateUp(heapIndices_[index]); + } else { + percolateDown(heapIndices_[index]); } + } - private: - const typename std::set::const_iterator end_; - typename std::set::const_iterator cur_; - T val_; - }; + void addNewValue(const T& value, int64_t priority) { + VELOX_DCHECK_LT(size(), std::numeric_limits::max()); + auto i = size(); + values_.push_back(value); + priorities_.push_back(priority); + generations_.push_back(++generation_); + VELOX_CHECK(valueIndices_.emplace(value, i).second); + heapIndices_.push_back(i); + heap_.push_back(i); + percolateUp(i); + } - Iterator begin() const { - return Iterator{queue_.cbegin(), queue_.cend()}; + void replaceTop(const T& value, int64_t priority) { + VELOX_DCHECK(!heap_.empty()); + int i = heap_[0]; + valueIndices_.erase(values_[i]); + values_[i] = value; + priorities_[i] = priority; + generations_[i] = ++generation_; + VELOX_CHECK(valueIndices_.emplace(value, i).second); + percolateDown(0); } - Iterator end() const { - return Iterator{queue_.cend(), queue_.cend()}; + /// Return negative number if value at i should be ordered before j; positive + /// number if j should be ordered before i. Otherwise return 0. + int64_t compare(int i, int j) const { + int64_t result = priorities_[i] - priorities_[j]; + if constexpr (kMaxQueue) { + result = -result; + } + return result != 0 ? result : generations_[i] - generations_[j]; } private: - struct EntrySetComparator { - bool operator()(Entry* lhs, Entry* rhs) const { - if (MaxQueue) { - if (lhs->priority() > rhs->priority()) { - return true; - } - if (lhs->priority() < rhs->priority()) { - return false; - } - } else { - if (lhs->priority() > rhs->priority()) { - return false; - } - if (lhs->priority() < rhs->priority()) { - return true; - } + template + using RebindAlloc = + typename std::allocator_traits::template rebind_alloc; + + void validate() const { + VELOX_DCHECK_EQ(values_.size(), priorities_.size()); + VELOX_DCHECK_EQ(values_.size(), generations_.size()); + VELOX_DCHECK_EQ(values_.size(), heap_.size()); + VELOX_DCHECK_EQ(values_.size(), valueIndices_.size()); + VELOX_DCHECK_EQ(values_.size(), heapIndices_.size()); + } + + void percolateUp(int pos) { + while (pos > 0) { + int parent = (pos - 1) / 2; + if (compare(heap_[pos], heap_[parent]) >= 0) { + break; } - return lhs->generation() < rhs->generation(); + std::swap(heap_[pos], heap_[parent]); + heapIndices_[heap_[pos]] = pos; + pos = parent; } - }; + heapIndices_[heap_[pos]] = pos; + } + + void percolateDown(int pos) { + for (;;) { + int left = 2 * pos + 1; + if (left >= heap_.size()) { + break; + } + int right = left + 1; + int next = right < heap_.size() && compare(heap_[right], heap_[left]) < 0 + ? right + : left; + if (compare(heap_[pos], heap_[next]) <= 0) { + break; + } + std::swap(heap_[pos], heap_[next]); + heapIndices_[heap_[pos]] = pos; + pos = next; + } + heapIndices_[heap_[pos]] = pos; + } - uint64_t generation_{0}; - folly::F14FastMap> index_; - std::set queue_; + int64_t generation_{0}; + std::vector values_; + std::vector> priorities_; + std::vector> generations_; + std::vector> heap_; + folly::F14FastMap< + T, + int32_t, + Hash, + EqualTo, + RebindAlloc>> + valueIndices_; + std::vector> heapIndices_; }; } // namespace facebook::velox diff --git a/velox/common/base/SkewedPartitionBalancer.cpp b/velox/common/base/SkewedPartitionBalancer.cpp index 1d2880f3c35b..24cb1511469e 100644 --- a/velox/common/base/SkewedPartitionBalancer.cpp +++ b/velox/common/base/SkewedPartitionBalancer.cpp @@ -149,12 +149,8 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness( std::vector>& taskMaxPartitions) { std::unordered_set scaledPartitions; - while (true) { - const auto maxTaskIdOpt = maxTasks.pop(); - if (!maxTaskIdOpt.has_value()) { - break; - } - const uint32_t maxTaskId = maxTaskIdOpt.value(); + while (!maxTasks.empty()) { + const auto maxTaskId = maxTasks.pop(); auto& maxPartitions = taskMaxPartitions[maxTaskId]; if (maxPartitions.empty()) { @@ -167,12 +163,8 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness( break; } - while (true) { - const auto maxPartitionOpt = maxPartitions.pop(); - if (!maxPartitionOpt.has_value()) { - break; - } - const uint32_t maxPartition = maxPartitionOpt.value(); + while (!maxPartitions.empty()) { + const auto maxPartition = maxPartitions.pop(); // Rebalance partition only once in a single cycle to avoid aggressive // rebalancing. @@ -267,9 +259,10 @@ void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() { std::vector SkewedPartitionRebalancer::findSkewedMinTasks( uint32_t maxTaskId, - const IndexedPriorityQueue& minTasks) const { - std::vector minSkewdTaskIds; - for (uint32_t minTaskId : minTasks) { + IndexedPriorityQueue& minTasks) const { + std::vector minSkewedTaskIds; + while (!minTasks.empty()) { + const auto minTaskId = minTasks.top(); if (minTaskId == maxTaskId) { break; } @@ -280,9 +273,15 @@ std::vector SkewedPartitionRebalancer::findSkewedMinTasks( if (skewness <= kTaskSkewnessThreshod_ || std::isnan(skewness)) { break; } - minSkewdTaskIds.push_back(minTaskId); + + minTasks.pop(); + minSkewedTaskIds.push_back(minTaskId); + } + + for (auto taskId : minSkewedTaskIds) { + minTasks.addOrUpdate(taskId, estimatedTaskBytesSinceLastRebalance_[taskId]); } - return minSkewdTaskIds; + return minSkewedTaskIds; } std::string SkewedPartitionRebalancer::Stats::toString() const { diff --git a/velox/common/base/SkewedPartitionBalancer.h b/velox/common/base/SkewedPartitionBalancer.h index c81709a31777..3c998920c1ee 100644 --- a/velox/common/base/SkewedPartitionBalancer.h +++ b/velox/common/base/SkewedPartitionBalancer.h @@ -113,9 +113,9 @@ class SkewedPartitionRebalancer { uint64_t calculateTaskDataSizeSinceLastRebalance( const IndexedPriorityQueue& maxPartitions) { uint64_t estimatedDataBytesSinceLastRebalance{0}; - for (uint32_t partition : maxPartitions) { + for (int i = 0; i < maxPartitions.size(); ++i) { estimatedDataBytesSinceLastRebalance += - partitionBytesSinceLastRebalancePerTask_[partition]; + partitionBytesSinceLastRebalancePerTask_[maxPartitions.values()[i]]; } return estimatedDataBytesSinceLastRebalance; } @@ -132,7 +132,7 @@ class SkewedPartitionRebalancer { // 'maxTaskId'. std::vector findSkewedMinTasks( uint32_t maxTaskId, - const IndexedPriorityQueue& minTasks) const; + IndexedPriorityQueue& minTasks) const; // Tries to assign 'targetTaskId' to 'rebalancePartition' for rebalancing. // Returns true if rebalanced, otherwise false. diff --git a/velox/common/base/benchmarks/CMakeLists.txt b/velox/common/base/benchmarks/CMakeLists.txt index 72fe153b52c8..b175e021e69f 100644 --- a/velox/common/base/benchmarks/CMakeLists.txt +++ b/velox/common/base/benchmarks/CMakeLists.txt @@ -24,3 +24,11 @@ target_link_libraries( velox_common_stringsearch_benchmarks PUBLIC Folly::follybenchmark PRIVATE velox_common_base Folly::folly) + +add_executable(velox_common_indexed_priority_queue_benchmark + IndexedPriorityQueueBenchmark.cpp) + +target_link_libraries( + velox_common_indexed_priority_queue_benchmark + PUBLIC Folly::follybenchmark + PRIVATE velox_common_base Folly::folly) diff --git a/velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp b/velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp new file mode 100644 index 000000000000..a7566ce9df18 --- /dev/null +++ b/velox/common/base/benchmarks/IndexedPriorityQueueBenchmark.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/base/IndexedPriorityQueue.h" + +#include +#include +#include + +#define VELOX_BENCHMARK(_type, _name, ...) \ + [[maybe_unused]] _type _name(FOLLY_PP_STRINGIZE(_name), __VA_ARGS__) + +namespace facebook::velox { +namespace { + +template +class IndexedPriorityQueueBenchmark { + public: + IndexedPriorityQueueBenchmark(const char* name, int numValues) + : values_(numValues), priorities_(2 * numValues) { + generateValues(); + for (int i = 0; i < 2 * numValues; ++i) { + priorities_[i] = folly::Random::rand32(); + } + folly::addBenchmark(__FILE__, fmt::format("{}_add", name), [this] { + IndexedPriorityQueue queue; + return add(queue); + }); + folly::addBenchmark(__FILE__, fmt::format("{}_update", name), [this] { + IndexedPriorityQueue queue; + BENCHMARK_SUSPEND { + add(queue); + } + return update(queue); + }); + folly::addBenchmark(__FILE__, fmt::format("{}_pop", name), [this] { + IndexedPriorityQueue queue; + BENCHMARK_SUSPEND { + add(queue); + update(queue); + } + return pop(queue); + }); + } + + private: + void generateValues(); + + unsigned add(IndexedPriorityQueue& queue) const { + for (int i = 0; i < values_.size(); ++i) { + queue.addOrUpdate(values_[i], priorities_[i]); + } + return values_.size(); + } + + unsigned update(IndexedPriorityQueue& queue) const { + for (int i = 0; i < values_.size(); ++i) { + queue.addOrUpdate(values_[i], priorities_[i + values_.size()]); + } + return values_.size(); + } + + unsigned pop(IndexedPriorityQueue& queue) const { + while (!queue.empty()) { + queue.pop(); + } + return values_.size(); + } + + std::vector values_; + std::vector priorities_; +}; + +template <> +void IndexedPriorityQueueBenchmark::generateValues() { + std::iota(values_.begin(), values_.end(), 0); +} + +} // namespace +} // namespace facebook::velox + +int main(int argc, char* argv[]) { + using namespace facebook::velox; + folly::Init follyInit(&argc, &argv); + VELOX_BENCHMARK(IndexedPriorityQueueBenchmark, int64_1000, 1000); + VELOX_BENCHMARK(IndexedPriorityQueueBenchmark, int64_10000, 10'000); + VELOX_BENCHMARK( + IndexedPriorityQueueBenchmark, int64_100000, 100'000); + VELOX_BENCHMARK( + IndexedPriorityQueueBenchmark, int64_1000000, 1'000'000); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/common/base/tests/IndexedPriorityQueueTest.cpp b/velox/common/base/tests/IndexedPriorityQueueTest.cpp index 0836b699b3d7..66f1403046f5 100644 --- a/velox/common/base/tests/IndexedPriorityQueueTest.cpp +++ b/velox/common/base/tests/IndexedPriorityQueueTest.cpp @@ -30,22 +30,8 @@ class IndexedPriorityQueueTest : public testing::Test { void verify( const IndexedPriorityQueue& queue, const std::vector& expectedValues) { - int i{0}; - for (auto value : queue) { - ASSERT_EQ(value, expectedValues[i++]); - } - } - - template - void verifyWithIterate( - const IndexedPriorityQueue& queue, - const std::vector& expectedValues) { - int i{0}; - auto it = queue.begin(); - while (it != queue.end()) { - ASSERT_EQ(*it, expectedValues[i++]); - ++it; - } + auto clone = queue; + verifyAndRemove(expectedValues, clone); } template @@ -55,7 +41,7 @@ class IndexedPriorityQueueTest : public testing::Test { ASSERT_EQ(expectedValues.size(), queue.size()); int i{0}; while (!queue.empty()) { - ASSERT_EQ(queue.pop().value(), expectedValues[i++]); + ASSERT_EQ(queue.pop(), expectedValues[i++]); } ASSERT_TRUE(queue.empty()); } @@ -74,9 +60,11 @@ class IndexedPriorityQueueTest : public testing::Test { queue.addOrUpdate(value, priority); } ASSERT_EQ(queue.size(), valuePriorities.size()); + auto size = queue.size(); std::unordered_set queuedValues; uint64_t prev = MaxQueue ? std::numeric_limits::max() : 0; - for (auto value : queue) { + while (!queue.empty()) { + auto value = queue.pop(); queuedValues.insert(value); ASSERT_TRUE(valuePriorities.find(value) != valuePriorities.end()); if (MaxQueue) { @@ -87,8 +75,8 @@ class IndexedPriorityQueueTest : public testing::Test { prev = valuePriorities[value]; } } - ASSERT_EQ(queuedValues.size(), queue.size()); - ASSERT_EQ(queue.size(), valuePriorities.size()); + ASSERT_EQ(queuedValues.size(), size); + ASSERT_EQ(size, valuePriorities.size()); } }; @@ -104,8 +92,6 @@ TEST_F(IndexedPriorityQueueTest, insertOnly) { ASSERT_EQ(minQueue.size(), 0); ASSERT_TRUE(maxQueue.empty()); ASSERT_TRUE(minQueue.empty()); - ASSERT_FALSE(maxQueue.pop().has_value()); - ASSERT_FALSE(minQueue.pop().has_value()); for (int value = 0; value < numValues; ++value) { maxQueue.addOrUpdate(value, priorities[value]); @@ -113,15 +99,6 @@ TEST_F(IndexedPriorityQueueTest, insertOnly) { } ASSERT_EQ(maxQueue.size(), numValues); ASSERT_EQ(minQueue.size(), numValues); - verify(maxQueue, expectedMaxValues); - verify(minQueue, expectedMinValues); - - ASSERT_EQ(maxQueue.size(), numValues); - ASSERT_EQ(minQueue.size(), numValues); - - verifyWithIterate(maxQueue, expectedMaxValues); - verifyWithIterate(minQueue, expectedMinValues); - verifyAndRemove(expectedMaxValues, maxQueue); verifyAndRemove(expectedMinValues, minQueue); } @@ -209,12 +186,12 @@ TEST_F(IndexedPriorityQueueTest, remove) { maxQueue.addOrUpdate(value2, /*priority=*/2); maxQueue.addOrUpdate(value3, /*priority=*/3); verify(maxQueue, {33, 32, 31}); - ASSERT_EQ(maxQueue.pop().value(), 33); + ASSERT_EQ(maxQueue.pop(), 33); verify(maxQueue, {32, 31}); maxQueue.addOrUpdate(value2, 0); - ASSERT_EQ(maxQueue.pop().value(), 31); + ASSERT_EQ(maxQueue.pop(), 31); verify(maxQueue, {32}); - ASSERT_EQ(maxQueue.pop().value(), 32); + ASSERT_EQ(maxQueue.pop(), 32); ASSERT_TRUE(maxQueue.empty()); IndexedPriorityQueue minQueue; @@ -222,12 +199,12 @@ TEST_F(IndexedPriorityQueueTest, remove) { minQueue.addOrUpdate(value2, /*priority=*/2); minQueue.addOrUpdate(value3, /*priority=*/3); verify(minQueue, {31, 32, 33}); - ASSERT_EQ(minQueue.pop().value(), 31); + ASSERT_EQ(minQueue.pop(), 31); verify(minQueue, {32, 33}); minQueue.addOrUpdate(value2, 20); - ASSERT_EQ(minQueue.pop().value(), 33); + ASSERT_EQ(minQueue.pop(), 33); verify(minQueue, {32}); - ASSERT_EQ(minQueue.pop().value(), 32); + ASSERT_EQ(minQueue.pop(), 32); ASSERT_TRUE(minQueue.empty()); } diff --git a/velox/functions/lib/ApproxMostFrequentStreamSummary.h b/velox/functions/lib/ApproxMostFrequentStreamSummary.h index dd15c7f77f4e..b9aff626bf03 100644 --- a/velox/functions/lib/ApproxMostFrequentStreamSummary.h +++ b/velox/functions/lib/ApproxMostFrequentStreamSummary.h @@ -16,13 +16,13 @@ #pragma once -#include - -#include - +#include "velox/common/base/BitUtil.h" #include "velox/common/base/Exceptions.h" +#include "velox/common/base/IndexedPriorityQueue.h" #include "velox/type/StringView.h" +#include + namespace facebook::velox::functions { /// Data structure to approximately compute the top frequent values from a large @@ -70,58 +70,27 @@ struct ApproxMostFrequentStreamSummary { /// Return the pointer to values data. The number of values equals to size(). const T* values() const { - return values_.data(); + return queue_.values(); } /// Return the pointer to counts data. The number of counts equals to size(). const int64_t* counts() const { - return counts_.data(); + return queue_.priorities(); } bool contains(T value) const { - return indices_.count(value) > 0; + return queue_.getValueIndex(value).has_value(); } private: - template - using RebindAlloc = - typename std::allocator_traits::template rebind_alloc; - - int heapCompare(int i, int j) const; - void percolateUp(int position); - void percolateDown(int position); - - int capacity() const { - return capacity_; - } - int capacity_ = 0; - int64_t currentGeneration_ = 0; - std::vector values_; - std::vector> counts_; - std::vector> generations_; - std::vector> heap_; - - folly::F14FastMap< - T, - int32_t, - std::hash, - std::equal_to, - RebindAlloc>> - indices_; - - std::vector> heapIndices_; + IndexedPriorityQueue queue_; }; template ApproxMostFrequentStreamSummary::ApproxMostFrequentStreamSummary( const A& allocator) - : values_(allocator), - counts_(RebindAlloc(allocator)), - generations_(RebindAlloc(allocator)), - heap_(RebindAlloc(allocator)), - indices_(RebindAlloc>(allocator)), - heapIndices_(RebindAlloc(allocator)) {} + : queue_(allocator) {} template void ApproxMostFrequentStreamSummary::setCapacity(int capacity) { @@ -135,92 +104,21 @@ void ApproxMostFrequentStreamSummary::setCapacity(int capacity) { template int ApproxMostFrequentStreamSummary::size() const { - VELOX_DCHECK_EQ(values_.size(), counts_.size()); - VELOX_DCHECK_EQ(values_.size(), generations_.size()); - VELOX_DCHECK_EQ(values_.size(), heap_.size()); - VELOX_DCHECK_EQ(values_.size(), heapIndices_.size()); - return values_.size(); -} - -template -int ApproxMostFrequentStreamSummary::heapCompare(int i, int j) const { - if (int ans = counts_[i] - counts_[j]; ans != 0) { - return ans; - } - // When the counts are same, we want to consider the previously generated - // value as minimum to prefer it over newly generated value with same count - // when we need to remove min. - return generations_[i] - generations_[j]; + return queue_.size(); } template void ApproxMostFrequentStreamSummary::insert(T value, int64_t count) { - if (auto it = indices_.find(value); it != indices_.end()) { - // The value to be counted is currently being tracked, we just need to - // increase the counter. - int i = it->second; - counts_[i] += count; - generations_[i] = ++currentGeneration_; - percolateDown(heapIndices_[i]); - return; - } - if (size() < capacity()) { - // There is still room available, just insert the value. - int i = size(); - values_.push_back(value); - counts_.push_back(count); - generations_.push_back(++currentGeneration_); - indices_.emplace(value, i); - heapIndices_.push_back(i); - heap_.push_back(i); - percolateUp(i); - return; - } - // Replace the element with least hits. - VELOX_DCHECK(!heap_.empty()); - int i = heap_[0]; - indices_.erase(values_[i]); - values_[i] = value; - counts_[i] += count; - generations_[i] = ++currentGeneration_; - indices_.emplace(value, i); - percolateDown(0); -} - -template -void ApproxMostFrequentStreamSummary::percolateUp(int pos) { - while (pos > 0) { - int parent = (pos - 1) / 2; - if (heapCompare(heap_[pos], heap_[parent]) >= 0) { - break; - } - std::swap(heap_[pos], heap_[parent]); - heapIndices_[heap_[pos]] = pos; - pos = parent; - } - heapIndices_[heap_[pos]] = pos; -} - -template -void ApproxMostFrequentStreamSummary::percolateDown(int pos) { - for (;;) { - int left = 2 * pos + 1; - if (left >= size()) { - break; - } - int child = left; - if (int right = left + 1; - right < size() && heapCompare(heap_[right], heap_[left]) < 0) { - child = right; - } - if (heapCompare(heap_[pos], heap_[child]) <= 0) { - break; - } - std::swap(heap_[pos], heap_[child]); - heapIndices_[heap_[pos]] = pos; - pos = child; + auto index = queue_.getValueIndex(value); + if (index.has_value()) { + auto oldCount = queue_.priorities()[*index]; + queue_.updatePriority(*index, oldCount + count); + } else if (size() < capacity_) { + queue_.addNewValue(value, count); + } else { + auto oldCount = queue_.topPriority(); + queue_.replaceTop(value, oldCount + count); } - heapIndices_[heap_[pos]] = pos; } template @@ -237,12 +135,12 @@ void ApproxMostFrequentStreamSummary::topK( // elements. auto posEnd = reinterpret_cast(counts + k); auto posBeg = posEnd - k; - auto gt = [&](auto i, auto j) { return heapCompare(i, j) > 0; }; + auto gt = [&](auto i, auto j) { return queue_.compare(i, j) > 0; }; for (int i = 0; i < size(); ++i) { if (i < k) { posBeg[i] = i; std::push_heap(posBeg, posBeg + i + 1, gt); - } else if (heapCompare(i, *posBeg) > 0) { + } else if (queue_.compare(i, *posBeg) > 0) { std::pop_heap(posBeg, posEnd, gt); posBeg[k - 1] = i; std::push_heap(posBeg, posEnd, gt); @@ -251,8 +149,8 @@ void ApproxMostFrequentStreamSummary::topK( std::sort(posBeg, posEnd, gt); for (auto it = posBeg; it != posEnd; ++it) { auto i = *it; - *values++ = values_[i]; - *counts++ = counts_[i]; + *values++ = queue_.values()[i]; + *counts++ = queue_.priorities()[i]; } } @@ -275,7 +173,8 @@ template size_t ApproxMostFrequentStreamSummary::serializedByteSize() const { size_t ans = sizeof(int32_t) + sizeof(T) * size() + sizeof(int64_t) * size(); if constexpr (std::is_same_v) { - for (auto& v : values_) { + for (int i = 0; i < size(); ++i) { + auto& v = queue_.values()[i]; if (!v.isInline()) { ans += v.size(); } @@ -291,17 +190,18 @@ size_t ApproxMostFrequentStreamSummary::serializedByteSize() const { // 4. If the value type is StringView, the actual non-inlined string data template void ApproxMostFrequentStreamSummary::serialize(char* out) const { - auto cur = out; - *reinterpret_cast(cur) = size(); + auto* cur = out; + folly::storeUnaligned(cur, size()); cur += sizeof(int32_t); auto byteSize = sizeof(T) * size(); - memcpy(cur, values_.data(), byteSize); + memcpy(cur, queue_.values(), byteSize); cur += byteSize; byteSize = sizeof(int64_t) * size(); - memcpy(cur, counts_.data(), byteSize); + memcpy(cur, queue_.priorities(), byteSize); cur += byteSize; if constexpr (std::is_same_v) { - for (auto& v : values_) { + for (int i = 0; i < size(); ++i) { + auto& v = queue_.values()[i]; if (!v.isInline()) { memcpy(cur, v.data(), v.size()); cur += v.size(); @@ -313,23 +213,24 @@ void ApproxMostFrequentStreamSummary::serialize(char* out) const { template void ApproxMostFrequentStreamSummary::mergeSerialized(const char* other) { - auto size = *reinterpret_cast(other); + auto size = folly::loadUnaligned(other); other += sizeof size; - auto values = reinterpret_cast(other); + auto* values = other; other += sizeof(T) * size; - auto counts = reinterpret_cast(other); + auto* counts = other; if constexpr (std::is_same_v) { other += sizeof(int64_t) * size; } + T v; for (int i = 0; i < size; ++i) { - auto v = values[i]; + FOLLY_BUILTIN_MEMCPY(&v, values + i * sizeof(T), sizeof(T)); if constexpr (std::is_same_v) { if (!v.isInline()) { v = {other, static_cast(v.size())}; other += v.size(); } } - insert(v, counts[i]); + insert(v, folly::loadUnaligned(counts + i * sizeof(int64_t))); } }