diff --git a/velox/common/testutil/CMakeLists.txt b/velox/common/testutil/CMakeLists.txt index f33bb98be06f2..6e8e05b849fff 100644 --- a/velox/common/testutil/CMakeLists.txt +++ b/velox/common/testutil/CMakeLists.txt @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp) -velox_link_libraries(velox_test_util PUBLIC velox_exception) +velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp + RandomSeed.cpp) + +velox_link_libraries( + velox_test_util + PUBLIC velox_exception + PRIVATE glog::glog Folly::folly) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/common/testutil/RandomSeed.cpp b/velox/common/testutil/RandomSeed.cpp new file mode 100644 index 0000000000000..8c27f684572cb --- /dev/null +++ b/velox/common/testutil/RandomSeed.cpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/testutil/RandomSeed.h" + +#include +#include +#include + +#include + +namespace facebook::velox::common::testutil { + +unsigned getRandomSeed(unsigned fixedValue) { + const char* env = getenv("VELOX_TEST_USE_RANDOM_SEED"); + if (!(env && folly::to(env))) { + return fixedValue; + } + auto seed = folly::Random::secureRand32(); + LOG(INFO) << "Random seed: " << seed; + return seed; +} + +} // namespace facebook::velox::common::testutil diff --git a/velox/common/testutil/RandomSeed.h b/velox/common/testutil/RandomSeed.h new file mode 100644 index 0000000000000..c6fa02ad1e11f --- /dev/null +++ b/velox/common/testutil/RandomSeed.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::common::testutil { + +/// Get a truely random seed and log it for future reproducing if +/// VELOX_TEST_USE_RANDOM_SEED is set. Otherwise return a fixed value so test +/// runs are deterministic. We use environment variable because `buck test` +/// does not allow pass in command line arguments. +unsigned getRandomSeed(unsigned fixedValue); + +} // namespace facebook::velox::common::testutil diff --git a/velox/dwio/common/tests/utils/E2EFilterTestBase.h b/velox/dwio/common/tests/utils/E2EFilterTestBase.h index a39fc8d1b64d1..2ad299b67984c 100644 --- a/velox/dwio/common/tests/utils/E2EFilterTestBase.h +++ b/velox/dwio/common/tests/utils/E2EFilterTestBase.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/common/testutil/RandomSeed.h" #include "velox/common/time/Timer.h" #include "velox/dwio/common/BufferedInput.h" #include "velox/dwio/common/FileSink.h" @@ -102,20 +103,10 @@ class E2EFilterTestBase : public testing::Test { memory::MemoryManager::testingSetInstance({}); } - static bool useRandomSeed() { - // Check environment variable because `buck test` does not allow pass in - // command line arguments. - const char* env = getenv("VELOX_TEST_USE_RANDOM_SEED"); - return !env ? false : folly::to(env); - } - void SetUp() override { rootPool_ = memory::memoryManager()->addRootPool("E2EFilterTestBase"); leafPool_ = rootPool_->addLeafChild("E2EFilterTestBase"); - if (useRandomSeed()) { - seed_ = folly::Random::secureRand32(); - LOG(INFO) << "Random seed: " << seed_; - } + seed_ = common::testutil::getRandomSeed(seed_); } static bool typeKindSupportsValueHook(TypeKind kind) { diff --git a/velox/functions/lib/TDigest.h b/velox/functions/lib/TDigest.h new file mode 100644 index 0000000000000..b5295968d8406 --- /dev/null +++ b/velox/functions/lib/TDigest.h @@ -0,0 +1,483 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/Portability.h" + +#include + +#include + +namespace facebook::velox::functions { + +namespace tdigest { +constexpr double kDefaultCompression = 100; +} + +/// Implementation of T-Digest that matches Presto Java behavior. It has the +/// same error bound as Java version and the serialization format is same as +/// Java. +/// +/// There are some improvements on runtime performance compared to Java version: +/// +/// 1. The memory footprint is largely reduced compared to Java. When we merge +/// new values, we keep the already merged values and unmerged values in the +/// same buffer and do the reordering and merging in-place, instead of keeping +/// the merged values in separate buffers like Java. We also do not keep the +/// positions buffer inside this class, because these are temporary scratch +/// memory that can be reused across different objects and should not be stored +/// in row container. +/// +/// 2. When we merging the deserialized digests, if the centroids are already +/// sorted (highly likely so), we no longer need to re-sort them and can +/// directly start merging the sorted centroids. +/// +/// Java implementation can be found at +/// https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/tdigest/TDigest.java +template > +class TDigest { + public: + explicit TDigest(const Allocator& allocator = Allocator()); + + /// Set the compression parameter of the T-Digest. The value should be + /// between 10 and 1000. The larger the value, the more accurate this digest + /// will be. Default to tdigest::kDefaultCompression if this method is not + /// called. + void setCompression(double compression); + + /// Add a new value or multiple same values to the digest. + /// + /// @param positions Scratch memory used to keep the ordered positions of + /// centroids. This buffer can and should be reused across different groups + /// of accumulators in an aggregate function. + /// @param value The new value to be added. Cannot be NaN. + /// @param weight A positive number indicating how many copies of `value' to + /// be added. + void add(std::vector& positions, double value, int64_t weight = 1); + + /// Compress the buffered values according to the compression parameter + /// provided. Must be called before doing any estimation or serialization. + /// + /// @param positions Scratch memory used to keep the ordered positions of + /// centroids. This buffer can and should be reused across different groups + /// of accumulators in an aggregate function. + void compress(std::vector& positions); + + /// Estimate the value of the given quantile. + /// @param quantile Quantile in [0, 1] to be estimated. + double estimateQuantile(double quantile) const; + + /// Calculate the size needed for serialization. + int64_t serializedByteSize() const; + + /// Serialize the digest into bytes. The serialzation is versioned, and newer + /// version of code should be able to read all previous versions. Presto Java + /// can read output of this function. + /// + /// @param out Pre-allocated memory at least serializedByteSize() in size. + void serialize(char* out) const; + + /// Merge this digest with values from another deserialized digest. + /// Serialization produced by Presto Java can be used as input. + /// + /// @param positions Scratch memory used to keep the ordered positions of + /// centroids. This buffer can and should be reused across different groups + /// of accumulators in an aggregate function. + /// @param input The input serialization. + void mergeDeserialized(std::vector& positions, const char* input); + + /// Returns the total sum of all values added to this digest. + double sum() const; + + /// Returns the compression parameter. + double compression() const { + return compression_; + } + + private: + static constexpr int8_t kSerializationVersion = 1; + static constexpr double kEpsilon = 1e-3; + + void mergeNewValues(std::vector& positions, double compression); + + void merge( + double compression, + const double* weights, + const double* means, + int count); + + template + void mergeImpl( + double compression, + const double* weights, + const double* means, + int count); + + static double + weightedAverageSorted(double x1, double w1, double x2, double w2) { + VELOX_DCHECK_LE(x1, x2); + double x = (x1 * w1 + x2 * w2) / (w1 + w2); + return std::max(x1, std::min(x, x2)); + } + + std::vector weights_; + std::vector means_; + double compression_; + int maxBufferSize_; + int32_t numMerged_ = 0; + double min_ = INFINITY; + double max_ = -INFINITY; + bool reverseCompress_ = false; +}; + +template +TDigest::TDigest(const A& allocator) + : weights_(allocator), means_(allocator) { + setCompression(tdigest::kDefaultCompression); +} + +template +void TDigest::setCompression(double compression) { + VELOX_CHECK_GE(compression, 10); + VELOX_CHECK_LE(compression, 1000); + VELOX_CHECK(weights_.empty()); + compression_ = compression; + maxBufferSize_ = 5 * std::ceil(2 * compression_ + 30); +} + +template +void TDigest::add( + std::vector& positions, + double value, + int64_t weight) { + VELOX_CHECK(!std::isnan(value)); + VELOX_CHECK_GT(weight, 0); + min_ = std::min(min_, value); + max_ = std::max(max_, value); + weights_.push_back(weight); + means_.push_back(value); + if (weights_.size() >= maxBufferSize_) { + mergeNewValues(positions, 2 * compression_); + } +} + +template +void TDigest::compress(std::vector& positions) { + if (!weights_.empty()) { + mergeNewValues(positions, compression_); + } +} + +template +void TDigest::mergeNewValues( + std::vector& positions, + double compression) { + if (numMerged_ < weights_.size()) { + VELOX_CHECK_LE(weights_.size(), std::numeric_limits::max()); + positions.resize(weights_.size()); + std::iota(positions.begin(), positions.end(), 0); + auto newBegin = positions.begin() + numMerged_; + auto compare = [this](auto i, auto j) { return means_[i] < means_[j]; }; + if (!std::is_sorted(means_.begin() + numMerged_, means_.end())) { + std::sort(newBegin, positions.end(), compare); + } + std::inplace_merge(positions.begin(), newBegin, positions.end(), compare); + for (int i = 0; i < positions.size(); ++i) { + if (i == positions[i]) { + continue; + } + auto wi = weights_[i]; + auto mi = means_[i]; + auto j = i; + for (;;) { + auto k = positions[j]; + if (k == i) { + break; + } + weights_[j] = weights_[k]; + means_[j] = means_[k]; + positions[j] = j; + j = k; + } + weights_[j] = wi; + means_[j] = mi; + positions[j] = j; + } + VELOX_DCHECK(std::is_sorted(means_.begin(), means_.end())); + } + merge(compression, weights_.data(), means_.data(), weights_.size()); +} + +template +void TDigest::merge( + double compression, + const double* weights, + const double* means, + int count) { + VELOX_CHECK_GT(count, 0); + VELOX_CHECK_GE(weights_.size(), count); + if (reverseCompress_) { + // Run the merge in reverse every other merge to avoid left-to-right + // bias. + mergeImpl(compression, weights, means, count); + } else { + mergeImpl(compression, weights, means, count); + } + reverseCompress_ = !reverseCompress_; +} + +template +template +void TDigest::mergeImpl( + double compression, + const double* weights, + const double* means, + int count) { + const auto totalWeight = std::accumulate(weights, weights + count, 0.0); + const auto invTotalWeight = 1 / totalWeight; + const auto normalizer = + (4 * std::log(totalWeight / compression) + 24) / compression; + auto maxSize = [normalizer](double q) { return q * (1 - q) * normalizer; }; + double weightSoFar = 0; + numMerged_ = 0; + const int begin = kReverse ? count - 1 : 0; + auto notEnd = [&](auto i) INLINE_LAMBDA { + if constexpr (kReverse) { + return i >= 0; + } else { + return i < count; + } + }; + constexpr int kStep = kReverse ? -1 : 1; + int j = kReverse ? count - 1 : 0; + weights_[j] = weights[begin]; + means_[j] = means[begin]; + for (int i = begin + kStep; notEnd(i); i += kStep) { + auto proposedWeight = weights_[j] + weights[i]; + auto q0 = weightSoFar * invTotalWeight; + auto q2 = (weightSoFar + proposedWeight) * invTotalWeight; + if (proposedWeight <= totalWeight * std::min(maxSize(q0), maxSize(q2))) { + weights_[j] += weights[i]; + means_[j] += (means[i] - means_[j]) * weights[i] / weights_[j]; + } else { + weightSoFar += weights_[j]; + ++numMerged_; + j += kStep; + weights_[j] = weights[i]; + means_[j] = means[i]; + } + } + weightSoFar += weights_[j]; + ++numMerged_; + VELOX_CHECK_LT(std::abs(weightSoFar - totalWeight), kEpsilon); + if constexpr (kReverse) { + std::copy(weights_.begin() + j, weights_.end(), weights_.begin()); + std::copy(means_.begin() + j, means_.end(), means_.begin()); + } + weights_.resize(numMerged_); + means_.resize(numMerged_); + min_ = std::min(min_, means_.front()); + max_ = std::max(max_, means_.back()); +} + +template +double TDigest::estimateQuantile(double quantile) const { + VELOX_CHECK(0 <= quantile && quantile <= 1); + VELOX_CHECK_EQ(numMerged_, weights_.size()); + if (numMerged_ == 0) { + return NAN; + } + if (numMerged_ == 1) { + return means_[0]; + } + auto totalWeight = std::accumulate(weights_.begin(), weights_.end(), 0.0); + const auto index = quantile * totalWeight; + if (index < 1) { + return min_; + } + if (weights_.front() > 1 && index < weights_.front() / 2) { + return min_ + + (index - 1) / (weights_.front() / 2 - 1) * (means_.front() - min_); + } + if (index > totalWeight - 1) { + return max_; + } + if (weights_.back() > 1 && totalWeight - index <= weights_.back() / 2) { + return max_ - + (totalWeight - index - 1) / (weights_.back() / 2 - 1) * + (max_ - means_.back()); + } + auto weightSoFar = weights_[0] / 2; + for (int i = 1; i < numMerged_; ++i) { + auto dw = (weights_[i - 1] + weights_[i]) / 2; + if (weightSoFar + dw <= index) { + weightSoFar += dw; + continue; + } + double leftUnit = 0; + if (weights_[i - 1] == 1) { + if (index - weightSoFar < 0.5) { + return means_[i - 1]; + } + leftUnit = 0.5; + } + double rightUnit = 0; + if (weights_[i] == 1) { + if (weightSoFar + dw - index <= 0.5) { + return means_[i]; + } + rightUnit = 0.5; + } + auto z1 = index - weightSoFar - leftUnit; + auto z2 = weightSoFar + dw - index - rightUnit; + return weightedAverageSorted(means_[i - 1], z2, means_[i], z1); + } + VELOX_CHECK_GT(weights_.back(), 1); + VELOX_CHECK_LE(index, totalWeight); + VELOX_CHECK_GE(index, totalWeight - weights_.back() / 2); + auto z1 = index - totalWeight - weights_.back() / 2; + auto z2 = weights_.back() / 2 - z1; + return weightedAverageSorted(means_.back(), z1, max_, z2); +} + +namespace tdigest::detail { + +static_assert(folly::kIsLittleEndian); + +template +void write(T value, char*& out) { + folly::storeUnaligned(out, value); + out += sizeof(T); +} + +template +void write(const T* values, int count, char*& out) { + auto size = sizeof(T) * count; + memcpy(out, values, size); + out += size; +} + +template +void read(const char*& input, T& value) { + value = folly::loadUnaligned(input); + input += sizeof(T); +} + +template +void read(const char*& input, T* values, int count) { + auto size = sizeof(T) * count; + memcpy(values, input, size); + input += size; +} + +} // namespace tdigest::detail + +template +int64_t TDigest::serializedByteSize() const { + VELOX_CHECK_EQ(numMerged_, weights_.size()); + return sizeof(kSerializationVersion) + 1 /*data type*/ + sizeof(min_) + + sizeof(max_) + sizeof(double) /*sum*/ + sizeof(compression_) + + sizeof(double) /*total weight*/ + sizeof(numMerged_) + + 2 * numMerged_ * sizeof(double); +} + +template +void TDigest::serialize(char* out) const { + VELOX_CHECK_EQ(numMerged_, weights_.size()); + auto totalWeight = std::accumulate(weights_.begin(), weights_.end(), 0.0); + const char* oldOut = out; + tdigest::detail::write(kSerializationVersion, out); + tdigest::detail::write(0, out); + tdigest::detail::write(min_, out); + tdigest::detail::write(max_, out); + tdigest::detail::write(sum(), out); + tdigest::detail::write(compression_, out); + tdigest::detail::write(totalWeight, out); + tdigest::detail::write(numMerged_, out); + if (numMerged_ > 0) { + tdigest::detail::write(weights_.data(), numMerged_, out); + tdigest::detail::write(means_.data(), numMerged_, out); + } + VELOX_CHECK_EQ(out - oldOut, serializedByteSize()); +} + +template +void TDigest::mergeDeserialized( + std::vector& positions, + const char* input) { + int8_t version; + tdigest::detail::read(input, version); + VELOX_CHECK_GE(version, 0); + VELOX_CHECK_LE(version, kSerializationVersion); + int8_t type; + tdigest::detail::read(input, type); + VELOX_CHECK_EQ(type, 0); + double min, max, sum, compression, totalWeight; + tdigest::detail::read(input, min); + tdigest::detail::read(input, max); + if (version >= 1) { + tdigest::detail::read(input, sum); + } + tdigest::detail::read(input, compression); + VELOX_CHECK_EQ(compression, compression_); + tdigest::detail::read(input, totalWeight); + int32_t numNew; + tdigest::detail::read(input, numNew); + if (numNew > 0) { + auto numOld = weights_.size(); + weights_.resize(numOld + numNew); + auto* weights = weights_.data() + numOld; + tdigest::detail::read(input, weights, numNew); + for (int i = 0; i < numNew; ++i) { + VELOX_CHECK_GT(weights[i], 0); + } + means_.resize(numOld + numNew); + auto* means = means_.data() + numOld; + tdigest::detail::read(input, means, numNew); + for (int i = 0; i < numNew; ++i) { + VELOX_CHECK(!std::isnan(means[i])); + } + if (version >= 1) { + double actualSum = 0; + for (int i = 0; i < numNew; ++i) { + actualSum += weights[i] * means[i]; + } + VELOX_CHECK_LT(std::abs(actualSum - sum), kEpsilon); + } + double actualTotalWeight = std::accumulate(weights, weights + numNew, 0.0); + VELOX_CHECK_LT(std::abs(actualTotalWeight - totalWeight), kEpsilon); + } else { + VELOX_CHECK_LT(std::abs(sum), kEpsilon); + VELOX_CHECK_LT(std::abs(totalWeight), kEpsilon); + } + if (weights_.size() >= maxBufferSize_) { + mergeNewValues(positions, 2 * compression_); + } +} + +template +double TDigest::sum() const { + VELOX_CHECK_EQ(numMerged_, weights_.size()); + double result = 0; + for (int i = 0; i < numMerged_; ++i) { + result += weights_[i] * means_[i]; + } + return result; +} + +} // namespace facebook::velox::functions diff --git a/velox/functions/lib/tests/CMakeLists.txt b/velox/functions/lib/tests/CMakeLists.txt index c3486a6687295..5715c0d4e890e 100644 --- a/velox/functions/lib/tests/CMakeLists.txt +++ b/velox/functions/lib/tests/CMakeLists.txt @@ -23,6 +23,7 @@ add_executable( MapConcatTest.cpp Re2FunctionsTest.cpp RepeatTest.cpp + TDigestTest.cpp Utf8Test.cpp ZetaDistributionTest.cpp) diff --git a/velox/functions/lib/tests/TDigestTest.cpp b/velox/functions/lib/tests/TDigestTest.cpp new file mode 100644 index 0000000000000..56b2a5f4fdb8f --- /dev/null +++ b/velox/functions/lib/tests/TDigestTest.cpp @@ -0,0 +1,384 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/functions/lib/TDigest.h" +#include "velox/common/testutil/RandomSeed.h" + +#include +#include + +#include + +namespace facebook::velox::functions { +namespace { + +constexpr double kSumError = 1e-4; +constexpr double kRankError = 0.01; + +constexpr double kQuantiles[] = { + 0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, + 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, 0.9000, + 0.9500, 0.9600, 0.9700, 0.9800, 0.9999, +}; + +void checkQuantiles( + folly::Range values, + const TDigest<>& digest) { + VELOX_CHECK(std::is_sorted(values.begin(), values.end())); + auto sum = std::accumulate(values.begin(), values.end(), 0.0); + ASSERT_NEAR(digest.sum(), sum, kSumError); + for (auto q : kQuantiles) { + auto v = digest.estimateQuantile(q); + ASSERT_LE(values.front(), v); + ASSERT_LE(v, values.back()); + auto hi = std::lower_bound(values.begin(), values.end(), v); + auto lo = hi; + while (lo != values.begin() && v > *lo) { + --lo; + } + while (std::next(hi) != values.end() && *hi == *std::next(hi)) { + ++hi; + } + auto l = (lo - values.begin()) / (values.size() - 1.0); + auto r = (hi - values.begin()) / (values.size() - 1.0); + if (q < l) { + ASSERT_NEAR(l, q, kRankError); + } else if (q > r) { + ASSERT_NEAR(r, q, kRankError); + } + } +} + +#define CHECK_QUANTILES(_values, _digest) \ + do { \ + SCOPED_TRACE("CHECK_QUANTILES"); \ + checkQuantiles((_values), (_digest)); \ + } while (false) + +std::string decodeBase64(std::string_view input) { + std::string decoded(folly::base64DecodedSize(input), '\0'); + folly::base64Decode(input, decoded.data()); + return decoded; +} + +TEST(TDigestTest, addElementsInOrder) { + constexpr int N = 1e6; + TDigest digest; + ASSERT_EQ(digest.compression(), tdigest::kDefaultCompression); + std::vector positions; + for (int i = 0; i < N; ++i) { + digest.add(positions, i); + } + digest.compress(positions); + ASSERT_NEAR(digest.sum(), 1.0 * N * (N - 1) / 2, kSumError); + for (auto q : kQuantiles) { + auto v = digest.estimateQuantile(q); + ASSERT_NEAR(v / (N - 1), q, kRankError); + } +} + +TEST(TDigestTest, addElementsRandomized) { + constexpr int N = 1e5; + double values[N]; + TDigest digest; + std::vector positions; + std::default_random_engine gen(common::testutil::getRandomSeed(42)); + std::uniform_real_distribution<> dist; + for (int i = 0; i < N; ++i) { + auto v = dist(gen); + digest.add(positions, v); + values[i] = v; + } + digest.compress(positions); + std::sort(std::begin(values), std::end(values)); + CHECK_QUANTILES(folly::Range(values, N), digest); +} + +TEST(TDigestTest, fewElements) { + TDigest digest; + std::vector positions; + digest.compress(positions); + ASSERT_EQ(digest.sum(), 0); + for (auto q : kQuantiles) { + ASSERT_TRUE(std::isnan(digest.estimateQuantile(q))); + } + digest.add(positions, 1.0); + digest.compress(positions); + ASSERT_EQ(digest.sum(), 1); + for (auto q : kQuantiles) { + ASSERT_EQ(digest.estimateQuantile(q), 1.0); + } +} + +// IMPORTANT: All these errors cannot be caught by TRY in Presto, so we should +// not make them user errors. If in another engine these are catchable errors, +// throw user errors in the corresponding UDFs before they reach the TDigest +// implementation. +TEST(TDigestTest, invalid) { + TDigest digest; + ASSERT_THROW(digest.setCompression(NAN), VeloxRuntimeError); + ASSERT_THROW(digest.setCompression(0), VeloxRuntimeError); + ASSERT_THROW(digest.setCompression(1000.1), VeloxRuntimeError); + std::vector positions; + ASSERT_THROW(digest.add(positions, NAN), VeloxRuntimeError); + ASSERT_THROW(digest.add(positions, 1, 0), VeloxRuntimeError); + ASSERT_THROW(digest.estimateQuantile(1.1), VeloxRuntimeError); +} + +TEST(TDigestTest, unalignedSerialization) { + constexpr int N = 1e4; + TDigest digest; + std::vector positions; + for (int i = 0; i < N; ++i) { + digest.add(positions, i); + } + digest.compress(positions); + ASSERT_NEAR(digest.sum(), 1.0 * N * (N - 1) / 2, kSumError); + std::string buf(1 + digest.serializedByteSize(), '\0'); + for (int offset = 0; offset < 2; ++offset) { + SCOPED_TRACE(fmt::format("offset={}", offset)); + digest.serialize(buf.data() + offset); + TDigest digest2; + digest2.mergeDeserialized(positions, buf.data() + offset); + digest2.compress(positions); + for (auto q : kQuantiles) { + auto v = digest2.estimateQuantile(q); + ASSERT_NEAR(v / (N - 1), q, kRankError); + } + } +} + +TEST(TDigestTest, mergeEmpty) { + std::vector positions; + TDigest<> digests[2]; + std::string buf(digests[1].serializedByteSize(), '\0'); + digests[1].serialize(buf.data()); + digests[0].mergeDeserialized(positions, buf.data()); + digests[0].compress(positions); + ASSERT_EQ(digests[0].sum(), 0); + ASSERT_TRUE(std::isnan(digests[0].estimateQuantile(0.5))); + digests[0].add(positions, 1.0); + digests[0].compress(positions); + ASSERT_EQ(digests[0].sum(), 1); + ASSERT_EQ(digests[0].estimateQuantile(0.5), 1); + digests[0].mergeDeserialized(positions, buf.data()); + digests[0].compress(positions); + ASSERT_EQ(digests[0].sum(), 1); + ASSERT_EQ(digests[0].estimateQuantile(0.5), 1); +} + +TEST(TDigestTest, deserializeJava) { + std::vector positions; + { + SCOPED_TRACE( + "select to_base64(cast(tdigest_agg(x) as varbinary)) from (values (2.0)) as t(x)"); + auto data = decodeBase64( + "AQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAEAAAAAAAABZQAAAAAAAAPA/AQAAAAAAAAAAAPA/AAAAAAAAAEA="); + TDigest digest; + digest.mergeDeserialized(positions, data.data()); + digest.compress(positions); + ASSERT_EQ(digest.compression(), tdigest::kDefaultCompression); + ASSERT_EQ(digest.sum(), 2.0); + for (auto q : kQuantiles) { + ASSERT_EQ(digest.estimateQuantile(q), 2.0); + } + } + { + SCOPED_TRACE( + "select to_base64(cast(tdigest_agg(x, w, c) as varbinary)) from (values (2.0, 2, 200.0)) as t(x, w, c)"); + auto data = decodeBase64( + "AQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAEEAAAAAAAABpQAAAAAAAAABAAQAAAAAAAAAAAABAAAAAAAAAAEA="); + TDigest digest; + digest.setCompression(200); + digest.mergeDeserialized(positions, data.data()); + digest.compress(positions); + ASSERT_EQ(digest.compression(), 200); + ASSERT_EQ(digest.sum(), 4.0); + for (auto q : kQuantiles) { + ASSERT_EQ(digest.estimateQuantile(q), 2.0); + } + } + { + SCOPED_TRACE( + "select to_base64(cast(tdigest_agg(x) as varbinary)) from unnest(sequence(0, 1000)) as t(x)"); + auto data = decodeBase64( + "AQAAAAAAAAAAAAAAAAAAQI9AAAAAAFCMHkEAAAAAAABZQAAAAAAASI9AMgAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAAUQAAAAAAAABxAAAAAAAAAIkAAAAAAAAAoQAAAAAAAADBAAAAAAAAANEAAAAAAAAA6QAAAAAAAgEBAAAAAAACAREAAAAAAAABJQAAAAAAAAE5AAAAAAABAUUAAAAAAAEBTQAAAAAAAgFRAAAAAAADAU0AAAAAAAABSQAAAAAAAAFBAAAAAAAAAS0AAAAAAAIBGQAAAAAAAAEJAAAAAAAAAPUAAAAAAAAA2QAAAAAAAADFAAAAAAAAAKkAAAAAAAAAkQAAAAAAAACBAAAAAAAAAGEAAAAAAAAAUQAAAAAAAAAhAAAAAAAAACEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAAAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAIQAAAAAAAABBAAAAAAAAAFEAAAAAAAAAYQAAAAAAAAB5AAAAAAAAAI0AAAAAAAAAoQAAAAAAAAC9AAAAAAAAANEAAAAAAAAA6QAAAAAAAAEFAAAAAAABARkAAAAAAAEBNQAAAAAAAIFNAAAAAAADgWEAAAAAAACBgQAAAAAAAwGRAAAAAAABwakAAAAAAAKhwQAAAAAAAsHRAAAAAAABAeUAAAAAAADh+QAAAAAAAoIFAAAAAAAD8g0AAAAAAAByGQAAAAAAA9IdAAAAAAACAiUAAAAAAAMSKQAAAAAAAyItAAAAAAACUjEAAAAAAADCNQAAAAAAAqI1AAAAAAAAEjkAAAAAAAEyOQAAAAAAAhI5AAAAAAACwjkAAAAAAANCOQAAAAAAA6I5AAAAAAAD8jkAAAAAAAAiPQAAAAAAAEI9AAAAAAAAYj0AAAAAAACCPQAAAAAAAKI9AAAAAAAAwj0AAAAAAADiPQAAAAAAAQI9A"); + TDigest digest; + digest.mergeDeserialized(positions, data.data()); + digest.compress(positions); + ASSERT_NEAR(digest.sum(), 500500, kSumError); + for (auto q : kQuantiles) { + auto v = digest.estimateQuantile(q); + ASSERT_NEAR(v / 1000, q, kRankError); + } + } + { + SCOPED_TRACE( + "select to_base64(cast(tdigest_agg(x, w) as varbinary)) from (values (0.0, 1), (1.0, 100)) as t(x, w)"); + auto data = decodeBase64( + "AQAAAAAAAAAAAAAAAAAAAPA/AAAAAAAAWUAAAAAAAABZQAAAAAAAQFlAAgAAAAAAAAAAAPA/AAAAAAAAWUAAAAAAAAAAAAAAAAAAAPA/"); + TDigest digest; + digest.mergeDeserialized(positions, data.data()); + digest.compress(positions); + double values[101]; + values[0] = 0; + std::fill(values + 1, values + 101, 1); + CHECK_QUANTILES(folly::Range(values, 101), digest); + } + { + SCOPED_TRACE( + "select to_base64(cast(tdigest_agg(cast(x as double), 1001 - x) as varbinary)) from unnest(sequence(1, 1000)) as t(x)"); + auto data = decodeBase64( + "AQAAAAAAAADwPwAAAAAAQI9AAAAAMIjto0EAAAAAAABZQAAAAABQjB5BLAAAAAAAAAAAQI9AAAAAAAA4j0AAAAAAADCPQAAAAAAAKI9AAAAAAAAcn0AAAAAAAEanQAAAAAAAUbNAAAAAAADhukAAAAAAAO3EQAAAAAAA4M9AAAAAAEDU10AAAAAAwEDhQAAAAABA4edAAAAAAEB57kAAAAAAwELxQAAAAABgiu1AAAAAAKAE50AAAAAA4LzgQAAAAADAANdAAAAAAABYz0AAAAAAAHzEQAAAAAAAqbpAAAAAAAD0sEAAAAAAAOClQAAAAAAA+JtAAAAAAADgkUAAAAAAAJCFQAAAAAAAEH1AAAAAAADAckAAAAAAAOBmQAAAAAAAQF9AAAAAAACAVEAAAAAAAIBJQAAAAAAAAEVAAAAAAAAAN0AAAAAAAAAzQAAAAAAAACBAAAAAAAAAHEAAAAAAAAAYQAAAAAAAABRAAAAAAAAAEEAAAAAAAAAIQAAAAAAAAABAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAACEAAAAAAAAAQQCT88Sq+/xVAQP1fAVD/H0C9Xrrw9v4nQHwxjlL1/jFAoIdSJV/9OkDMzMzMzHxEQM6g0QNUOE9AN9zhXw23V0AB/5K759VhQNPiJszvSmpATJIkSZK8ckDxSWWlSwl5QMvdEa6CIH9A8tiKoOFUgkCutAVRS7qEQHt4eHh4v4ZAwq5fKvlsiECiNVqjNcqJQLdt27Zt44pAJEmSJEnEi0CGUZ0mB3mMQN/NI1SfCY1ACis2j1d6jUBTSimllNKNQHsUrkfhGo5Aul0rJzxTjkCQwvUoXH+OQPQxOB+Do45AsK+vr6+/jkB6nud5nteOQOpNb3rT645AvYbyGsr7jkAAAAAAAAiPQAAAAAAAEI9AAAAAAAAYj0AAAAAAACCPQAAAAAAAKI9AAAAAAAAwj0AAAAAAADiPQAAAAAAAQI9A"); + TDigest digest; + digest.mergeDeserialized(positions, data.data()); + digest.compress(positions); + std::vector values; + values.reserve(500500); + for (int i = 1; i <= 1000; ++i) { + values.insert(values.end(), 1001 - i, i); + } + CHECK_QUANTILES(values, digest); + } +} + +TEST(TDigestTest, mergeNoOverlap) { + constexpr int N = 1e5; + TDigest<> digests[2]; + std::vector positions; + for (int i = 0; i < N; ++i) { + digests[0].add(positions, i); + digests[1].add(positions, i + N); + } + digests[1].compress(positions); + std::string buf(digests[1].serializedByteSize(), '\0'); + digests[1].serialize(buf.data()); + digests[0].mergeDeserialized(positions, buf.data()); + digests[0].compress(positions); + ASSERT_NEAR(digests[0].sum(), N * (2.0 * N - 1), kSumError); + for (auto q : kQuantiles) { + auto v = digests[0].estimateQuantile(q); + ASSERT_NEAR(v / (2 * N - 1), q, kRankError); + } +} + +TEST(TDigestTest, mergeOverlap) { + constexpr int N = 1e5; + TDigest digest; + std::vector positions; + std::vector values; + values.reserve(2 * N); + for (int i = 0; i < N; ++i) { + digest.add(positions, i); + values.insert(values.end(), 2, i); + } + digest.compress(positions); + std::string buf(digest.serializedByteSize(), '\0'); + digest.serialize(buf.data()); + digest.mergeDeserialized(positions, buf.data()); + digest.compress(positions); + CHECK_QUANTILES(values, digest); +} + +TEST(TDigestTest, normalDistribution) { + constexpr int N = 1e5; + std::vector positions; + double values[N]; + std::default_random_engine gen(common::testutil::getRandomSeed(42)); + for (double mean : {0, 1000}) { + SCOPED_TRACE(fmt::format("mean={}", mean)); + std::normal_distribution<> dist(mean, 1); + TDigest digest; + for (int i = 0; i < N; ++i) { + auto v = dist(gen); + digest.add(positions, v); + values[i] = v; + } + digest.compress(positions); + std::sort(values, values + N); + CHECK_QUANTILES(folly::Range(values, N), digest); + } +} + +TEST(TDigestTest, addWeighed) { + std::vector positions; + TDigest digest; + std::vector values; + values.reserve(5050); + for (int i = 1; i <= 100; ++i) { + digest.add(positions, i, i); + values.insert(values.end(), i, i); + } + digest.compress(positions); + CHECK_QUANTILES(values, digest); +} + +TEST(TDigestTest, merge) { + std::vector positions; + std::default_random_engine gen(common::testutil::getRandomSeed(42)); + std::vector values; + std::string buf; + auto test = [&](int numDigests, int size, double mean, double stddev) { + SCOPED_TRACE(fmt::format( + "numDigests={} size={} mean={} stddev={}", + numDigests, + size, + mean, + stddev)); + values.clear(); + values.reserve(numDigests * size); + std::normal_distribution<> dist(mean, stddev); + TDigest digest; + for (int i = 0; i < numDigests; ++i) { + TDigest current; + for (int j = 0; j < size; ++j) { + auto v = dist(gen); + current.add(positions, v); + values.push_back(v); + } + current.compress(positions); + buf.resize(current.serializedByteSize()); + current.serialize(buf.data()); + digest.mergeDeserialized(positions, buf.data()); + } + digest.compress(positions); + std::sort(std::begin(values), std::end(values)); + CHECK_QUANTILES(values, digest); + }; + test(2, 5e4, 0, 50); + test(100, 1000, 500, 20); + test(1e4, 10, 500, 20); +} + +TEST(TDigestTest, infinity) { + std::vector positions; + TDigest digest; + digest.add(positions, 0.0); + digest.add(positions, INFINITY); + digest.add(positions, -INFINITY); + digest.compress(positions); + ASSERT_TRUE(std::isnan(digest.sum())); + ASSERT_EQ(digest.estimateQuantile(0), -INFINITY); + ASSERT_EQ(digest.estimateQuantile(0.3), -INFINITY); + ASSERT_EQ(digest.estimateQuantile(0.4), 0.0); + ASSERT_EQ(digest.estimateQuantile(0.5), 0.0); + ASSERT_EQ(digest.estimateQuantile(0.6), 0.0); + ASSERT_EQ(digest.estimateQuantile(0.7), INFINITY); + ASSERT_EQ(digest.estimateQuantile(1), INFINITY); +} + +} // namespace +} // namespace facebook::velox::functions diff --git a/velox/type/tests/TimestampTest.cpp b/velox/type/tests/TimestampTest.cpp index 9e344cf999349..bdfa146dd81ac 100644 --- a/velox/type/tests/TimestampTest.cpp +++ b/velox/type/tests/TimestampTest.cpp @@ -18,6 +18,7 @@ #include #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/RandomSeed.h" #include "velox/type/Timestamp.h" #include "velox/type/tz/TimeZoneMap.h" @@ -259,16 +260,6 @@ TEST(TimestampTest, toStringPrestoCastBehavior) { namespace { -uint64_t randomSeed() { - if (const char* env = getenv("VELOX_TEST_USE_RANDOM_SEED")) { - auto seed = std::random_device{}(); - LOG(INFO) << "Random seed: " << seed; - return seed; - } else { - return 42; - } -} - std::string toStringAlt( const Timestamp& t, TimestampToStringOptions::Precision precision) { @@ -308,7 +299,7 @@ bool checkUtcToEpoch(int year, int mon, int mday, int hour, int min, int sec) { } // namespace TEST(TimestampTest, compareWithToStringAlt) { - std::default_random_engine gen(randomSeed()); + std::default_random_engine gen(common::testutil::getRandomSeed(42)); std::uniform_int_distribution distSec( Timestamp::kMinSeconds, Timestamp::kMaxSeconds); std::uniform_int_distribution distNano(0, Timestamp::kMaxNanos); @@ -349,7 +340,7 @@ TEST(TimestampTest, utcToEpoch) { } TEST(TimestampTest, utcToEpochRandomInputs) { - std::default_random_engine gen(randomSeed()); + std::default_random_engine gen(common::testutil::getRandomSeed(42)); std::uniform_int_distribution dist(INT32_MIN, INT32_MAX); for (int i = 0; i < 10'000; ++i) { checkUtcToEpoch(