From 7cafbef53f475b966c1984dce395a940c6cc93f5 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 12 Aug 2020 15:49:52 -0700 Subject: [PATCH 1/3] rewritten quantile calculator --- kll/include/kll_quantile_calculator.hpp | 37 +++-- kll/include/kll_quantile_calculator_impl.hpp | 161 ++++++++----------- 2 files changed, 93 insertions(+), 105 deletions(-) diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp index f77071e9..bc60f266 100644 --- a/kll/include/kll_quantile_calculator.hpp +++ b/kll/include/kll_quantile_calculator.hpp @@ -26,31 +26,38 @@ namespace datasketches { template class kll_quantile_calculator { - typedef typename std::allocator_traits::template rebind_alloc AllocU32; - typedef typename std::allocator_traits::template rebind_alloc AllocU64; public: // assumes that all levels are sorted including level 0 kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n); - ~kll_quantile_calculator(); T get_quantile(double fraction) const; private: + using AllocU32 = typename std::allocator_traits::template rebind_alloc; + using vector_u32 = std::vector; + using Entry = std::pair; + using AllocEntry = typename std::allocator_traits::template rebind_alloc; + using Container = std::vector; uint64_t n_; - T* items_; - uint64_t* weights_; - uint32_t* levels_; - uint8_t levels_size_; - uint8_t num_levels_; + vector_u32 levels_; + Container entries_; - void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels); + void populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels); T approximately_answer_positional_query(uint64_t pos) const; - static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size); + void convert_to_preceding_cummulative(); + uint32_t chunk_containing_pos(uint64_t pos) const; + uint32_t search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const; + static void merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items); + static void merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels); + static void merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels); static uint64_t pos_of_phi(double phi, uint64_t n); - static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos); - static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r); - static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels); - static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels); - static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2); + + template + struct compare_pair_by_first { + template + bool operator()(Entry1&& a, Entry2&& b) const { + return Comparator()(std::forward(a).first, std::forward(b).first); + } + }; }; } /* namespace datasketches */ diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp index d4f4c04a..77aeb051 100644 --- a/kll/include/kll_quantile_calculator_impl.hpp +++ b/kll/include/kll_quantile_calculator_impl.hpp @@ -22,31 +22,24 @@ #include #include +#include + +#include #include "kll_helper.hpp" namespace datasketches { template -kll_quantile_calculator::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) { - n_ = n; +kll_quantile_calculator::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n): +n_(n), levels_(num_levels + 1) +{ const uint32_t num_items = levels[num_levels] - levels[0]; - items_ = A().allocate(num_items); - weights_ = AllocU64().allocate(num_items + 1); // one more is intentional - levels_size_ = num_levels + 1; - levels_ = AllocU32().allocate(levels_size_); - populate_from_sketch(items, num_items, levels, num_levels); - blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_); - convert_to_preceding_cummulative(weights_, num_items + 1); -} - -template -kll_quantile_calculator::~kll_quantile_calculator() { - const uint32_t num_items = levels_[num_levels_] - levels_[0]; - for (uint32_t i = 0; i < num_items; i++) items_[i].~T(); - A().deallocate(items_, num_items); - AllocU64().deallocate(weights_, num_items + 1); - AllocU32().deallocate(levels_, levels_size_); + entries_.reserve(num_items); + populate_from_sketch(items, levels, num_levels); + merge_sorted_blocks(entries_, levels_.data(), levels_.size() - 1, num_items); + if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first())) throw std::logic_error("entries must be sorted"); + convert_to_preceding_cummulative(); } template @@ -55,17 +48,18 @@ T kll_quantile_calculator::get_quantile(double fraction) const { } template -void kll_quantile_calculator::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) { - kll_helper::copy_construct(items, levels[0], levels[num_levels], items_, 0); - uint8_t src_level = 0; - uint8_t dst_level = 0; +void kll_quantile_calculator::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) { + size_t src_level = 0; + size_t dst_level = 0; uint64_t weight = 1; uint32_t offset = levels[0]; while (src_level < num_levels) { const uint32_t from_index(levels[src_level] - offset); const uint32_t to_index(levels[src_level + 1] - offset); // exclusive if (from_index < to_index) { // skip empty levels - std::fill(&weights_[from_index], &weights_[to_index], weight); + for (uint32_t i = from_index; i < to_index; ++i) { + entries_.push_back(Entry(items[i + offset], weight)); + } levels_[dst_level] = from_index; levels_[dst_level + 1] = to_index; dst_level++; @@ -73,24 +67,24 @@ void kll_quantile_calculator::populate_from_sketch(const T* items, uint src_level++; weight *= 2; } - weights_[num_items] = 0; - num_levels_ = dst_level; + if (levels_.size() > static_cast(dst_level + 1)) levels_.resize(dst_level + 1); } template T kll_quantile_calculator::approximately_answer_positional_query(uint64_t pos) const { if (pos >= n_) throw std::logic_error("position out of range"); - const uint32_t weights_size(levels_[num_levels_] + 1); - const uint32_t index = chunk_containing_pos(weights_, weights_size, pos); - return items_[index]; + const uint32_t num_items = levels_[levels_.size() - 1]; + if (pos > entries_[num_items - 1].second) return entries_[num_items - 1].first; + const uint32_t index = chunk_containing_pos(pos); + return entries_[index].first; } template -void kll_quantile_calculator::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) { - uint64_t subtotal(0); - for (uint32_t i = 0; i < weights_size; i++) { - const uint32_t new_subtotal = subtotal + weights[i]; - weights[i] = subtotal; +void kll_quantile_calculator::convert_to_preceding_cummulative() { + uint64_t subtotal = 0; + for (auto& entry: entries_) { + const uint32_t new_subtotal = subtotal + entry.second; + entry.second = subtotal; subtotal = new_subtotal; } } @@ -102,87 +96,74 @@ uint64_t kll_quantile_calculator::pos_of_phi(double phi, uint64_t n) { } template -uint32_t kll_quantile_calculator::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) { - if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position - const uint32_t nominal_length(weights_size - 1); - if (pos < weights[0]) throw std::logic_error("position too small"); - if (pos >= weights[nominal_length]) throw std::logic_error("position too large"); - return search_for_chunk_containing_pos(weights, pos, 0, nominal_length); +uint32_t kll_quantile_calculator::chunk_containing_pos(uint64_t pos) const { + if (entries_.size() < 1) throw std::logic_error("array too short"); + if (pos < entries_[0].second) throw std::logic_error("position too small"); + if (pos > entries_[entries_.size() - 1].second) throw std::logic_error("position too large"); + return search_for_chunk_containing_pos(pos, 0, entries_.size()); } template -uint32_t kll_quantile_calculator::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) { +uint32_t kll_quantile_calculator::search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const { if (l + 1 == r) { return l; } const uint32_t m(l + (r - l) / 2); - if (arr[m] <= pos) { - return search_for_chunk_containing_pos(arr, pos, m, r); + if (entries_[m].second <= pos) { + return search_for_chunk_containing_pos(pos, m, r); } - return search_for_chunk_containing_pos(arr, pos, l, m); + return search_for_chunk_containing_pos(pos, l, m); } template -void kll_quantile_calculator::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) { +void kll_quantile_calculator::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) { if (num_levels == 1) return; - - // move the input in preparation for the "ping-pong" reduction strategy - auto tmp_items_deleter = [num_items](T* ptr) { - for (uint32_t i = 0; i < num_items; i++) ptr[i].~T(); - A().deallocate(ptr, num_items); - }; - std::unique_ptr tmp_items(A().allocate(num_items), tmp_items_deleter); - kll_helper::move_construct(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back - auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); }; - std::unique_ptr tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here - std::copy(weights, &weights[num_items], tmp_weights.get()); - blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels); + Container temporary; + temporary.reserve(num_items); + merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels); } template -void kll_quantile_calculator::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) { +void kll_quantile_calculator::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, + uint8_t starting_level, uint8_t num_levels) { if (num_levels == 1) return; const uint8_t num_levels_1 = num_levels / 2; const uint8_t num_levels_2 = num_levels - num_levels_1; - if (num_levels_1 < 1) throw std::logic_error("level above 0 expected"); - if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels"); const uint8_t starting_level_1 = starting_level; const uint8_t starting_level_2 = starting_level + num_levels_1; - // swap roles of src and dst - blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1); - blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2); - tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2); + const auto chunk_begin = temp.begin() + temp.size(); + merge_sorted_blocks_reversed(orig, temp, levels, starting_level_1, num_levels_1); + merge_sorted_blocks_reversed(orig, temp, levels, starting_level_2, num_levels_2); + const uint32_t num_items_1 = levels[starting_level_1 + num_levels_1] - levels[starting_level_1]; + std::merge( + std::make_move_iterator(chunk_begin), std::make_move_iterator(chunk_begin + num_items_1), + std::make_move_iterator(chunk_begin + num_items_1), std::make_move_iterator(temp.end()), + orig.begin() + levels[starting_level], compare_pair_by_first() + ); + temp.erase(chunk_begin, temp.end()); } template -void kll_quantile_calculator::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) { - const auto from_index_1 = levels[starting_level_1]; - const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive - const auto from_index_2 = levels[starting_level_2]; - const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive - auto i_src_1 = from_index_1; - auto i_src_2 = from_index_2; - auto i_dst = from_index_1; - - while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) { - if (C()(items_src[i_src_1], items_src[i_src_2])) { - items_dst[i_dst] = std::move(items_src[i_src_1]); - weights_dst[i_dst] = weights_src[i_src_1]; - i_src_1++; - } else { - items_dst[i_dst] = std::move(items_src[i_src_2]); - weights_dst[i_dst] = weights_src[i_src_2]; - i_src_2++; - } - i_dst++; - } - if (i_src_1 < to_index_1) { - std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]); - std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]); - } else if (i_src_2 < to_index_2) { - std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]); - std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]); +void kll_quantile_calculator::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, + uint8_t starting_level, uint8_t num_levels) { + if (num_levels == 1) { + std::move(orig.begin() + levels[starting_level], orig.begin() + levels[starting_level + 1], std::back_inserter(temp)); + return; } + const uint8_t num_levels_1 = num_levels / 2; + const uint8_t num_levels_2 = num_levels - num_levels_1; + const uint8_t starting_level_1 = starting_level; + const uint8_t starting_level_2 = starting_level + num_levels_1; + merge_sorted_blocks_direct(orig, temp, levels, starting_level_1, num_levels_1); + merge_sorted_blocks_direct(orig, temp, levels, starting_level_2, num_levels_2); + std::merge( + std::make_move_iterator(orig.begin() + levels[starting_level_1]), + std::make_move_iterator(orig.begin() + levels[starting_level_1 + num_levels_1]), + std::make_move_iterator(orig.begin() + levels[starting_level_2]), + std::make_move_iterator(orig.begin() + levels[starting_level_2 + num_levels_2]), + std::back_inserter(temp), + compare_pair_by_first() + ); } } /* namespace datasketches */ From 3135dc4a71af05150bd8864aaf0ff803532a554f Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Thu, 13 Aug 2020 10:39:37 -0700 Subject: [PATCH 2/3] avoid warnings, unified style --- kll/include/kll_sketch_impl.hpp | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp index 9b6c68e1..d0511356 100644 --- a/kll/include/kll_sketch_impl.hpp +++ b/kll/include/kll_sketch_impl.hpp @@ -312,9 +312,9 @@ std::vector kll_sketch::get_quantiles(size_t num) const { template double kll_sketch::get_rank(const T& value) const { if (is_empty()) return std::numeric_limits::quiet_NaN(); - uint8_t level(0); - uint64_t weight(1); - uint64_t total(0); + uint8_t level = 0; + uint64_t weight = 1; + uint64_t total = 0; while (level < num_levels_) { const auto from_index(levels_[level]); const auto to_index(levels_[level + 1]); // exclusive @@ -391,7 +391,7 @@ void kll_sketch::serialize(std::ostream& os) const { os.write(reinterpret_cast(&flags_byte), sizeof(flags_byte)); os.write((char*)&k_, sizeof(k_)); os.write((char*)&m_, sizeof(m_)); - const uint8_t unused(0); + const uint8_t unused = 0; os.write(reinterpret_cast(&unused), sizeof(unused)); if (is_empty()) return; if (!is_single_item) { @@ -412,7 +412,7 @@ vector_u8 kll_sketch::serialize(unsigned header_size_bytes) const const size_t size = header_size_bytes + get_serialized_size_bytes(); vector_u8 bytes(size); uint8_t* ptr = bytes.data() + header_size_bytes; - uint8_t* end_ptr = ptr + size; + const uint8_t* end_ptr = ptr + size; const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL); ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints)); const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1); @@ -427,7 +427,7 @@ vector_u8 kll_sketch::serialize(unsigned header_size_bytes) const ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte)); ptr += copy_to_mem(&k_, ptr, sizeof(k_)); ptr += copy_to_mem(&m_, ptr, sizeof(m_)); - const uint8_t unused(0); + const uint8_t unused = 0; ptr += copy_to_mem(&unused, ptr, sizeof(unused)); if (!is_empty()) { if (!is_single_item) { @@ -776,8 +776,8 @@ vector_d kll_sketch::get_PMF_or_CDF(const T* split_points, uint32 if (is_empty()) return vector_d(); kll_helper::validate_values(split_points, size); vector_d buckets(size + 1, 0); - uint8_t level(0); - uint64_t weight(1); + uint8_t level = 0; + uint64_t weight = 1; while (level < num_levels_) { const auto from_index = levels_[level]; const auto to_index = levels_[level + 1]; // exclusive @@ -871,8 +871,9 @@ void kll_sketch::merge_higher_levels(O&& other, uint64_t final_n) { const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items; kll_helper::move_construct(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true); - if (levels_.size() < (result.final_num_levels + 1)) { - levels_.resize(result.final_num_levels + 1); + const size_t new_levels_size = result.final_num_levels + 1; + if (levels_.size() < new_levels_size) { + levels_.resize(new_levels_size); } const uint32_t offset = free_space_at_bottom - outlevels[0]; for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) { // includes the "extra" index @@ -1027,7 +1028,7 @@ string kll_sketch::to_string(bool print_levels, bool print_items) if (print_items) { os << "### KLL sketch data:" << std::endl; - uint8_t level(0); + uint8_t level = 0; while (level < num_levels_) { const uint32_t from_index = levels_[level]; const uint32_t to_index = levels_[level + 1]; // exclusive From 608a741a4d0db9078feddc165cb080f5bb43e351 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Fri, 14 Aug 2020 10:00:17 -0700 Subject: [PATCH 3/3] removed unused include --- kll/include/kll_quantile_calculator_impl.hpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp index 77aeb051..cb926741 100644 --- a/kll/include/kll_quantile_calculator_impl.hpp +++ b/kll/include/kll_quantile_calculator_impl.hpp @@ -24,8 +24,6 @@ #include #include -#include - #include "kll_helper.hpp" namespace datasketches {