From 870ef16bffe56d002b789272b3f6d6fac1c2a039 Mon Sep 17 00:00:00 2001
From: Jimmy Lu <jimmylu@meta.com>
Date: Wed, 27 Nov 2024 08:55:37 -0800
Subject: [PATCH] feat: Add T-Digest data structure (#11665)

Summary:

Add the T-Digest data structure implementation to be used in T-Digest
related functions.  Also extract the `getRandomSeed` test utility that is used
in multiple unit tests.

Differential Revision: D66435741
---
 velox/common/testutil/CMakeLists.txt          |   9 +-
 velox/common/testutil/RandomSeed.cpp          |  37 ++
 velox/common/testutil/RandomSeed.h            |  27 +
 .../common/tests/utils/E2EFilterTestBase.h    |  13 +-
 velox/functions/lib/TDigest.h                 | 483 ++++++++++++++++++
 velox/functions/lib/tests/CMakeLists.txt      |   1 +
 velox/functions/lib/tests/TDigestTest.cpp     | 384 ++++++++++++++
 velox/type/tests/TimestampTest.cpp            |  15 +-
 8 files changed, 944 insertions(+), 25 deletions(-)
 create mode 100644 velox/common/testutil/RandomSeed.cpp
 create mode 100644 velox/common/testutil/RandomSeed.h
 create mode 100644 velox/functions/lib/TDigest.h
 create mode 100644 velox/functions/lib/tests/TDigestTest.cpp

diff --git a/velox/common/testutil/CMakeLists.txt b/velox/common/testutil/CMakeLists.txt
index f33bb98be06f2..6e8e05b849fff 100644
--- a/velox/common/testutil/CMakeLists.txt
+++ b/velox/common/testutil/CMakeLists.txt
@@ -12,8 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp)
-velox_link_libraries(velox_test_util PUBLIC velox_exception)
+velox_add_library(velox_test_util ScopedTestTime.cpp TestValue.cpp
+                  RandomSeed.cpp)
+
+velox_link_libraries(
+  velox_test_util
+  PUBLIC velox_exception
+  PRIVATE glog::glog Folly::folly)
 
 if(${VELOX_BUILD_TESTING})
   add_subdirectory(tests)
diff --git a/velox/common/testutil/RandomSeed.cpp b/velox/common/testutil/RandomSeed.cpp
new file mode 100644
index 0000000000000..8c27f684572cb
--- /dev/null
+++ b/velox/common/testutil/RandomSeed.cpp
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "velox/common/testutil/RandomSeed.h"
+
+#include <folly/Conv.h>
+#include <folly/Random.h>
+#include <glog/logging.h>
+
+#include <cstdlib>
+
+namespace facebook::velox::common::testutil {
+
+unsigned getRandomSeed(unsigned fixedValue) {
+  const char* env = getenv("VELOX_TEST_USE_RANDOM_SEED");
+  if (!(env && folly::to<bool>(env))) {
+    return fixedValue;
+  }
+  auto seed = folly::Random::secureRand32();
+  LOG(INFO) << "Random seed: " << seed;
+  return seed;
+}
+
+} // namespace facebook::velox::common::testutil
diff --git a/velox/common/testutil/RandomSeed.h b/velox/common/testutil/RandomSeed.h
new file mode 100644
index 0000000000000..c6fa02ad1e11f
--- /dev/null
+++ b/velox/common/testutil/RandomSeed.h
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace facebook::velox::common::testutil {
+
+/// Get a truely random seed and log it for future reproducing if
+/// VELOX_TEST_USE_RANDOM_SEED is set.  Otherwise return a fixed value so test
+/// runs are deterministic.  We use environment variable because `buck test`
+/// does not allow pass in command line arguments.
+unsigned getRandomSeed(unsigned fixedValue);
+
+} // namespace facebook::velox::common::testutil
diff --git a/velox/dwio/common/tests/utils/E2EFilterTestBase.h b/velox/dwio/common/tests/utils/E2EFilterTestBase.h
index a39fc8d1b64d1..2ad299b67984c 100644
--- a/velox/dwio/common/tests/utils/E2EFilterTestBase.h
+++ b/velox/dwio/common/tests/utils/E2EFilterTestBase.h
@@ -16,6 +16,7 @@
 
 #pragma once
 
+#include "velox/common/testutil/RandomSeed.h"
 #include "velox/common/time/Timer.h"
 #include "velox/dwio/common/BufferedInput.h"
 #include "velox/dwio/common/FileSink.h"
@@ -102,20 +103,10 @@ class E2EFilterTestBase : public testing::Test {
     memory::MemoryManager::testingSetInstance({});
   }
 
-  static bool useRandomSeed() {
-    // Check environment variable because `buck test` does not allow pass in
-    // command line arguments.
-    const char* env = getenv("VELOX_TEST_USE_RANDOM_SEED");
-    return !env ? false : folly::to<bool>(env);
-  }
-
   void SetUp() override {
     rootPool_ = memory::memoryManager()->addRootPool("E2EFilterTestBase");
     leafPool_ = rootPool_->addLeafChild("E2EFilterTestBase");
-    if (useRandomSeed()) {
-      seed_ = folly::Random::secureRand32();
-      LOG(INFO) << "Random seed: " << seed_;
-    }
+    seed_ = common::testutil::getRandomSeed(seed_);
   }
 
   static bool typeKindSupportsValueHook(TypeKind kind) {
diff --git a/velox/functions/lib/TDigest.h b/velox/functions/lib/TDigest.h
new file mode 100644
index 0000000000000..b5295968d8406
--- /dev/null
+++ b/velox/functions/lib/TDigest.h
@@ -0,0 +1,483 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/common/base/Exceptions.h"
+#include "velox/common/base/Portability.h"
+
+#include <folly/Bits.h>
+
+#include <numeric>
+
+namespace facebook::velox::functions {
+
+namespace tdigest {
+constexpr double kDefaultCompression = 100;
+}
+
+/// Implementation of T-Digest that matches Presto Java behavior.  It has the
+/// same error bound as Java version and the serialization format is same as
+/// Java.
+///
+/// There are some improvements on runtime performance compared to Java version:
+///
+/// 1. The memory footprint is largely reduced compared to Java.  When we merge
+/// new values, we keep the already merged values and unmerged values in the
+/// same buffer and do the reordering and merging in-place, instead of keeping
+/// the merged values in separate buffers like Java.  We also do not keep the
+/// positions buffer inside this class, because these are temporary scratch
+/// memory that can be reused across different objects and should not be stored
+/// in row container.
+///
+/// 2. When we merging the deserialized digests, if the centroids are already
+/// sorted (highly likely so), we no longer need to re-sort them and can
+/// directly start merging the sorted centroids.
+///
+/// Java implementation can be found at
+/// https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/tdigest/TDigest.java
+template <typename Allocator = std::allocator<double>>
+class TDigest {
+ public:
+  explicit TDigest(const Allocator& allocator = Allocator());
+
+  /// Set the compression parameter of the T-Digest.  The value should be
+  /// between 10 and 1000.  The larger the value, the more accurate this digest
+  /// will be.  Default to tdigest::kDefaultCompression if this method is not
+  /// called.
+  void setCompression(double compression);
+
+  /// Add a new value or multiple same values to the digest.
+  ///
+  /// @param positions Scratch memory used to keep the ordered positions of
+  ///  centroids.  This buffer can and should be reused across different groups
+  ///  of accumulators in an aggregate function.
+  /// @param value The new value to be added.  Cannot be NaN.
+  /// @param weight A positive number indicating how many copies of `value' to
+  ///  be added.
+  void add(std::vector<int16_t>& positions, double value, int64_t weight = 1);
+
+  /// Compress the buffered values according to the compression parameter
+  /// provided.  Must be called before doing any estimation or serialization.
+  ///
+  /// @param positions Scratch memory used to keep the ordered positions of
+  ///  centroids.  This buffer can and should be reused across different groups
+  ///  of accumulators in an aggregate function.
+  void compress(std::vector<int16_t>& positions);
+
+  /// Estimate the value of the given quantile.
+  /// @param quantile Quantile in [0, 1] to be estimated.
+  double estimateQuantile(double quantile) const;
+
+  /// Calculate the size needed for serialization.
+  int64_t serializedByteSize() const;
+
+  /// Serialize the digest into bytes.  The serialzation is versioned, and newer
+  /// version of code should be able to read all previous versions.  Presto Java
+  /// can read output of this function.
+  ///
+  /// @param out Pre-allocated memory at least serializedByteSize() in size.
+  void serialize(char* out) const;
+
+  /// Merge this digest with values from another deserialized digest.
+  /// Serialization produced by Presto Java can be used as input.
+  ///
+  /// @param positions Scratch memory used to keep the ordered positions of
+  ///  centroids.  This buffer can and should be reused across different groups
+  ///  of accumulators in an aggregate function.
+  /// @param input The input serialization.
+  void mergeDeserialized(std::vector<int16_t>& positions, const char* input);
+
+  /// Returns the total sum of all values added to this digest.
+  double sum() const;
+
+  /// Returns the compression parameter.
+  double compression() const {
+    return compression_;
+  }
+
+ private:
+  static constexpr int8_t kSerializationVersion = 1;
+  static constexpr double kEpsilon = 1e-3;
+
+  void mergeNewValues(std::vector<int16_t>& positions, double compression);
+
+  void merge(
+      double compression,
+      const double* weights,
+      const double* means,
+      int count);
+
+  template <bool kReverse>
+  void mergeImpl(
+      double compression,
+      const double* weights,
+      const double* means,
+      int count);
+
+  static double
+  weightedAverageSorted(double x1, double w1, double x2, double w2) {
+    VELOX_DCHECK_LE(x1, x2);
+    double x = (x1 * w1 + x2 * w2) / (w1 + w2);
+    return std::max(x1, std::min(x, x2));
+  }
+
+  std::vector<double, Allocator> weights_;
+  std::vector<double, Allocator> means_;
+  double compression_;
+  int maxBufferSize_;
+  int32_t numMerged_ = 0;
+  double min_ = INFINITY;
+  double max_ = -INFINITY;
+  bool reverseCompress_ = false;
+};
+
+template <typename A>
+TDigest<A>::TDigest(const A& allocator)
+    : weights_(allocator), means_(allocator) {
+  setCompression(tdigest::kDefaultCompression);
+}
+
+template <typename A>
+void TDigest<A>::setCompression(double compression) {
+  VELOX_CHECK_GE(compression, 10);
+  VELOX_CHECK_LE(compression, 1000);
+  VELOX_CHECK(weights_.empty());
+  compression_ = compression;
+  maxBufferSize_ = 5 * std::ceil(2 * compression_ + 30);
+}
+
+template <typename A>
+void TDigest<A>::add(
+    std::vector<int16_t>& positions,
+    double value,
+    int64_t weight) {
+  VELOX_CHECK(!std::isnan(value));
+  VELOX_CHECK_GT(weight, 0);
+  min_ = std::min(min_, value);
+  max_ = std::max(max_, value);
+  weights_.push_back(weight);
+  means_.push_back(value);
+  if (weights_.size() >= maxBufferSize_) {
+    mergeNewValues(positions, 2 * compression_);
+  }
+}
+
+template <typename A>
+void TDigest<A>::compress(std::vector<int16_t>& positions) {
+  if (!weights_.empty()) {
+    mergeNewValues(positions, compression_);
+  }
+}
+
+template <typename A>
+void TDigest<A>::mergeNewValues(
+    std::vector<int16_t>& positions,
+    double compression) {
+  if (numMerged_ < weights_.size()) {
+    VELOX_CHECK_LE(weights_.size(), std::numeric_limits<int16_t>::max());
+    positions.resize(weights_.size());
+    std::iota(positions.begin(), positions.end(), 0);
+    auto newBegin = positions.begin() + numMerged_;
+    auto compare = [this](auto i, auto j) { return means_[i] < means_[j]; };
+    if (!std::is_sorted(means_.begin() + numMerged_, means_.end())) {
+      std::sort(newBegin, positions.end(), compare);
+    }
+    std::inplace_merge(positions.begin(), newBegin, positions.end(), compare);
+    for (int i = 0; i < positions.size(); ++i) {
+      if (i == positions[i]) {
+        continue;
+      }
+      auto wi = weights_[i];
+      auto mi = means_[i];
+      auto j = i;
+      for (;;) {
+        auto k = positions[j];
+        if (k == i) {
+          break;
+        }
+        weights_[j] = weights_[k];
+        means_[j] = means_[k];
+        positions[j] = j;
+        j = k;
+      }
+      weights_[j] = wi;
+      means_[j] = mi;
+      positions[j] = j;
+    }
+    VELOX_DCHECK(std::is_sorted(means_.begin(), means_.end()));
+  }
+  merge(compression, weights_.data(), means_.data(), weights_.size());
+}
+
+template <typename A>
+void TDigest<A>::merge(
+    double compression,
+    const double* weights,
+    const double* means,
+    int count) {
+  VELOX_CHECK_GT(count, 0);
+  VELOX_CHECK_GE(weights_.size(), count);
+  if (reverseCompress_) {
+    // Run the merge in reverse every other merge to avoid left-to-right
+    // bias.
+    mergeImpl<true>(compression, weights, means, count);
+  } else {
+    mergeImpl<false>(compression, weights, means, count);
+  }
+  reverseCompress_ = !reverseCompress_;
+}
+
+template <typename A>
+template <bool kReverse>
+void TDigest<A>::mergeImpl(
+    double compression,
+    const double* weights,
+    const double* means,
+    int count) {
+  const auto totalWeight = std::accumulate(weights, weights + count, 0.0);
+  const auto invTotalWeight = 1 / totalWeight;
+  const auto normalizer =
+      (4 * std::log(totalWeight / compression) + 24) / compression;
+  auto maxSize = [normalizer](double q) { return q * (1 - q) * normalizer; };
+  double weightSoFar = 0;
+  numMerged_ = 0;
+  const int begin = kReverse ? count - 1 : 0;
+  auto notEnd = [&](auto i) INLINE_LAMBDA {
+    if constexpr (kReverse) {
+      return i >= 0;
+    } else {
+      return i < count;
+    }
+  };
+  constexpr int kStep = kReverse ? -1 : 1;
+  int j = kReverse ? count - 1 : 0;
+  weights_[j] = weights[begin];
+  means_[j] = means[begin];
+  for (int i = begin + kStep; notEnd(i); i += kStep) {
+    auto proposedWeight = weights_[j] + weights[i];
+    auto q0 = weightSoFar * invTotalWeight;
+    auto q2 = (weightSoFar + proposedWeight) * invTotalWeight;
+    if (proposedWeight <= totalWeight * std::min(maxSize(q0), maxSize(q2))) {
+      weights_[j] += weights[i];
+      means_[j] += (means[i] - means_[j]) * weights[i] / weights_[j];
+    } else {
+      weightSoFar += weights_[j];
+      ++numMerged_;
+      j += kStep;
+      weights_[j] = weights[i];
+      means_[j] = means[i];
+    }
+  }
+  weightSoFar += weights_[j];
+  ++numMerged_;
+  VELOX_CHECK_LT(std::abs(weightSoFar - totalWeight), kEpsilon);
+  if constexpr (kReverse) {
+    std::copy(weights_.begin() + j, weights_.end(), weights_.begin());
+    std::copy(means_.begin() + j, means_.end(), means_.begin());
+  }
+  weights_.resize(numMerged_);
+  means_.resize(numMerged_);
+  min_ = std::min(min_, means_.front());
+  max_ = std::max(max_, means_.back());
+}
+
+template <typename A>
+double TDigest<A>::estimateQuantile(double quantile) const {
+  VELOX_CHECK(0 <= quantile && quantile <= 1);
+  VELOX_CHECK_EQ(numMerged_, weights_.size());
+  if (numMerged_ == 0) {
+    return NAN;
+  }
+  if (numMerged_ == 1) {
+    return means_[0];
+  }
+  auto totalWeight = std::accumulate(weights_.begin(), weights_.end(), 0.0);
+  const auto index = quantile * totalWeight;
+  if (index < 1) {
+    return min_;
+  }
+  if (weights_.front() > 1 && index < weights_.front() / 2) {
+    return min_ +
+        (index - 1) / (weights_.front() / 2 - 1) * (means_.front() - min_);
+  }
+  if (index > totalWeight - 1) {
+    return max_;
+  }
+  if (weights_.back() > 1 && totalWeight - index <= weights_.back() / 2) {
+    return max_ -
+        (totalWeight - index - 1) / (weights_.back() / 2 - 1) *
+        (max_ - means_.back());
+  }
+  auto weightSoFar = weights_[0] / 2;
+  for (int i = 1; i < numMerged_; ++i) {
+    auto dw = (weights_[i - 1] + weights_[i]) / 2;
+    if (weightSoFar + dw <= index) {
+      weightSoFar += dw;
+      continue;
+    }
+    double leftUnit = 0;
+    if (weights_[i - 1] == 1) {
+      if (index - weightSoFar < 0.5) {
+        return means_[i - 1];
+      }
+      leftUnit = 0.5;
+    }
+    double rightUnit = 0;
+    if (weights_[i] == 1) {
+      if (weightSoFar + dw - index <= 0.5) {
+        return means_[i];
+      }
+      rightUnit = 0.5;
+    }
+    auto z1 = index - weightSoFar - leftUnit;
+    auto z2 = weightSoFar + dw - index - rightUnit;
+    return weightedAverageSorted(means_[i - 1], z2, means_[i], z1);
+  }
+  VELOX_CHECK_GT(weights_.back(), 1);
+  VELOX_CHECK_LE(index, totalWeight);
+  VELOX_CHECK_GE(index, totalWeight - weights_.back() / 2);
+  auto z1 = index - totalWeight - weights_.back() / 2;
+  auto z2 = weights_.back() / 2 - z1;
+  return weightedAverageSorted(means_.back(), z1, max_, z2);
+}
+
+namespace tdigest::detail {
+
+static_assert(folly::kIsLittleEndian);
+
+template <typename T>
+void write(T value, char*& out) {
+  folly::storeUnaligned(out, value);
+  out += sizeof(T);
+}
+
+template <typename T>
+void write(const T* values, int count, char*& out) {
+  auto size = sizeof(T) * count;
+  memcpy(out, values, size);
+  out += size;
+}
+
+template <typename T>
+void read(const char*& input, T& value) {
+  value = folly::loadUnaligned<T>(input);
+  input += sizeof(T);
+}
+
+template <typename T>
+void read(const char*& input, T* values, int count) {
+  auto size = sizeof(T) * count;
+  memcpy(values, input, size);
+  input += size;
+}
+
+} // namespace tdigest::detail
+
+template <typename A>
+int64_t TDigest<A>::serializedByteSize() const {
+  VELOX_CHECK_EQ(numMerged_, weights_.size());
+  return sizeof(kSerializationVersion) + 1 /*data type*/ + sizeof(min_) +
+      sizeof(max_) + sizeof(double) /*sum*/ + sizeof(compression_) +
+      sizeof(double) /*total weight*/ + sizeof(numMerged_) +
+      2 * numMerged_ * sizeof(double);
+}
+
+template <typename A>
+void TDigest<A>::serialize(char* out) const {
+  VELOX_CHECK_EQ(numMerged_, weights_.size());
+  auto totalWeight = std::accumulate(weights_.begin(), weights_.end(), 0.0);
+  const char* oldOut = out;
+  tdigest::detail::write(kSerializationVersion, out);
+  tdigest::detail::write<int8_t>(0, out);
+  tdigest::detail::write(min_, out);
+  tdigest::detail::write(max_, out);
+  tdigest::detail::write(sum(), out);
+  tdigest::detail::write(compression_, out);
+  tdigest::detail::write(totalWeight, out);
+  tdigest::detail::write(numMerged_, out);
+  if (numMerged_ > 0) {
+    tdigest::detail::write(weights_.data(), numMerged_, out);
+    tdigest::detail::write(means_.data(), numMerged_, out);
+  }
+  VELOX_CHECK_EQ(out - oldOut, serializedByteSize());
+}
+
+template <typename A>
+void TDigest<A>::mergeDeserialized(
+    std::vector<int16_t>& positions,
+    const char* input) {
+  int8_t version;
+  tdigest::detail::read(input, version);
+  VELOX_CHECK_GE(version, 0);
+  VELOX_CHECK_LE(version, kSerializationVersion);
+  int8_t type;
+  tdigest::detail::read(input, type);
+  VELOX_CHECK_EQ(type, 0);
+  double min, max, sum, compression, totalWeight;
+  tdigest::detail::read(input, min);
+  tdigest::detail::read(input, max);
+  if (version >= 1) {
+    tdigest::detail::read(input, sum);
+  }
+  tdigest::detail::read(input, compression);
+  VELOX_CHECK_EQ(compression, compression_);
+  tdigest::detail::read(input, totalWeight);
+  int32_t numNew;
+  tdigest::detail::read(input, numNew);
+  if (numNew > 0) {
+    auto numOld = weights_.size();
+    weights_.resize(numOld + numNew);
+    auto* weights = weights_.data() + numOld;
+    tdigest::detail::read(input, weights, numNew);
+    for (int i = 0; i < numNew; ++i) {
+      VELOX_CHECK_GT(weights[i], 0);
+    }
+    means_.resize(numOld + numNew);
+    auto* means = means_.data() + numOld;
+    tdigest::detail::read(input, means, numNew);
+    for (int i = 0; i < numNew; ++i) {
+      VELOX_CHECK(!std::isnan(means[i]));
+    }
+    if (version >= 1) {
+      double actualSum = 0;
+      for (int i = 0; i < numNew; ++i) {
+        actualSum += weights[i] * means[i];
+      }
+      VELOX_CHECK_LT(std::abs(actualSum - sum), kEpsilon);
+    }
+    double actualTotalWeight = std::accumulate(weights, weights + numNew, 0.0);
+    VELOX_CHECK_LT(std::abs(actualTotalWeight - totalWeight), kEpsilon);
+  } else {
+    VELOX_CHECK_LT(std::abs(sum), kEpsilon);
+    VELOX_CHECK_LT(std::abs(totalWeight), kEpsilon);
+  }
+  if (weights_.size() >= maxBufferSize_) {
+    mergeNewValues(positions, 2 * compression_);
+  }
+}
+
+template <typename A>
+double TDigest<A>::sum() const {
+  VELOX_CHECK_EQ(numMerged_, weights_.size());
+  double result = 0;
+  for (int i = 0; i < numMerged_; ++i) {
+    result += weights_[i] * means_[i];
+  }
+  return result;
+}
+
+} // namespace facebook::velox::functions
diff --git a/velox/functions/lib/tests/CMakeLists.txt b/velox/functions/lib/tests/CMakeLists.txt
index c3486a6687295..5715c0d4e890e 100644
--- a/velox/functions/lib/tests/CMakeLists.txt
+++ b/velox/functions/lib/tests/CMakeLists.txt
@@ -23,6 +23,7 @@ add_executable(
   MapConcatTest.cpp
   Re2FunctionsTest.cpp
   RepeatTest.cpp
+  TDigestTest.cpp
   Utf8Test.cpp
   ZetaDistributionTest.cpp)
 
diff --git a/velox/functions/lib/tests/TDigestTest.cpp b/velox/functions/lib/tests/TDigestTest.cpp
new file mode 100644
index 0000000000000..56b2a5f4fdb8f
--- /dev/null
+++ b/velox/functions/lib/tests/TDigestTest.cpp
@@ -0,0 +1,384 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "velox/functions/lib/TDigest.h"
+#include "velox/common/testutil/RandomSeed.h"
+
+#include <folly/base64.h>
+#include <gtest/gtest.h>
+
+#include <random>
+
+namespace facebook::velox::functions {
+namespace {
+
+constexpr double kSumError = 1e-4;
+constexpr double kRankError = 0.01;
+
+constexpr double kQuantiles[] = {
+    0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000,
+    0.3000, 0.4000, 0.5000, 0.6000,  0.7000, 0.8000, 0.9000,
+    0.9500, 0.9600, 0.9700, 0.9800,  0.9999,
+};
+
+void checkQuantiles(
+    folly::Range<const double*> values,
+    const TDigest<>& digest) {
+  VELOX_CHECK(std::is_sorted(values.begin(), values.end()));
+  auto sum = std::accumulate(values.begin(), values.end(), 0.0);
+  ASSERT_NEAR(digest.sum(), sum, kSumError);
+  for (auto q : kQuantiles) {
+    auto v = digest.estimateQuantile(q);
+    ASSERT_LE(values.front(), v);
+    ASSERT_LE(v, values.back());
+    auto hi = std::lower_bound(values.begin(), values.end(), v);
+    auto lo = hi;
+    while (lo != values.begin() && v > *lo) {
+      --lo;
+    }
+    while (std::next(hi) != values.end() && *hi == *std::next(hi)) {
+      ++hi;
+    }
+    auto l = (lo - values.begin()) / (values.size() - 1.0);
+    auto r = (hi - values.begin()) / (values.size() - 1.0);
+    if (q < l) {
+      ASSERT_NEAR(l, q, kRankError);
+    } else if (q > r) {
+      ASSERT_NEAR(r, q, kRankError);
+    }
+  }
+}
+
+#define CHECK_QUANTILES(_values, _digest) \
+  do {                                    \
+    SCOPED_TRACE("CHECK_QUANTILES");      \
+    checkQuantiles((_values), (_digest)); \
+  } while (false)
+
+std::string decodeBase64(std::string_view input) {
+  std::string decoded(folly::base64DecodedSize(input), '\0');
+  folly::base64Decode(input, decoded.data());
+  return decoded;
+}
+
+TEST(TDigestTest, addElementsInOrder) {
+  constexpr int N = 1e6;
+  TDigest digest;
+  ASSERT_EQ(digest.compression(), tdigest::kDefaultCompression);
+  std::vector<int16_t> positions;
+  for (int i = 0; i < N; ++i) {
+    digest.add(positions, i);
+  }
+  digest.compress(positions);
+  ASSERT_NEAR(digest.sum(), 1.0 * N * (N - 1) / 2, kSumError);
+  for (auto q : kQuantiles) {
+    auto v = digest.estimateQuantile(q);
+    ASSERT_NEAR(v / (N - 1), q, kRankError);
+  }
+}
+
+TEST(TDigestTest, addElementsRandomized) {
+  constexpr int N = 1e5;
+  double values[N];
+  TDigest digest;
+  std::vector<int16_t> positions;
+  std::default_random_engine gen(common::testutil::getRandomSeed(42));
+  std::uniform_real_distribution<> dist;
+  for (int i = 0; i < N; ++i) {
+    auto v = dist(gen);
+    digest.add(positions, v);
+    values[i] = v;
+  }
+  digest.compress(positions);
+  std::sort(std::begin(values), std::end(values));
+  CHECK_QUANTILES(folly::Range(values, N), digest);
+}
+
+TEST(TDigestTest, fewElements) {
+  TDigest digest;
+  std::vector<int16_t> positions;
+  digest.compress(positions);
+  ASSERT_EQ(digest.sum(), 0);
+  for (auto q : kQuantiles) {
+    ASSERT_TRUE(std::isnan(digest.estimateQuantile(q)));
+  }
+  digest.add(positions, 1.0);
+  digest.compress(positions);
+  ASSERT_EQ(digest.sum(), 1);
+  for (auto q : kQuantiles) {
+    ASSERT_EQ(digest.estimateQuantile(q), 1.0);
+  }
+}
+
+// IMPORTANT: All these errors cannot be caught by TRY in Presto, so we should
+// not make them user errors.  If in another engine these are catchable errors,
+// throw user errors in the corresponding UDFs before they reach the TDigest
+// implementation.
+TEST(TDigestTest, invalid) {
+  TDigest digest;
+  ASSERT_THROW(digest.setCompression(NAN), VeloxRuntimeError);
+  ASSERT_THROW(digest.setCompression(0), VeloxRuntimeError);
+  ASSERT_THROW(digest.setCompression(1000.1), VeloxRuntimeError);
+  std::vector<int16_t> positions;
+  ASSERT_THROW(digest.add(positions, NAN), VeloxRuntimeError);
+  ASSERT_THROW(digest.add(positions, 1, 0), VeloxRuntimeError);
+  ASSERT_THROW(digest.estimateQuantile(1.1), VeloxRuntimeError);
+}
+
+TEST(TDigestTest, unalignedSerialization) {
+  constexpr int N = 1e4;
+  TDigest digest;
+  std::vector<int16_t> positions;
+  for (int i = 0; i < N; ++i) {
+    digest.add(positions, i);
+  }
+  digest.compress(positions);
+  ASSERT_NEAR(digest.sum(), 1.0 * N * (N - 1) / 2, kSumError);
+  std::string buf(1 + digest.serializedByteSize(), '\0');
+  for (int offset = 0; offset < 2; ++offset) {
+    SCOPED_TRACE(fmt::format("offset={}", offset));
+    digest.serialize(buf.data() + offset);
+    TDigest digest2;
+    digest2.mergeDeserialized(positions, buf.data() + offset);
+    digest2.compress(positions);
+    for (auto q : kQuantiles) {
+      auto v = digest2.estimateQuantile(q);
+      ASSERT_NEAR(v / (N - 1), q, kRankError);
+    }
+  }
+}
+
+TEST(TDigestTest, mergeEmpty) {
+  std::vector<int16_t> positions;
+  TDigest<> digests[2];
+  std::string buf(digests[1].serializedByteSize(), '\0');
+  digests[1].serialize(buf.data());
+  digests[0].mergeDeserialized(positions, buf.data());
+  digests[0].compress(positions);
+  ASSERT_EQ(digests[0].sum(), 0);
+  ASSERT_TRUE(std::isnan(digests[0].estimateQuantile(0.5)));
+  digests[0].add(positions, 1.0);
+  digests[0].compress(positions);
+  ASSERT_EQ(digests[0].sum(), 1);
+  ASSERT_EQ(digests[0].estimateQuantile(0.5), 1);
+  digests[0].mergeDeserialized(positions, buf.data());
+  digests[0].compress(positions);
+  ASSERT_EQ(digests[0].sum(), 1);
+  ASSERT_EQ(digests[0].estimateQuantile(0.5), 1);
+}
+
+TEST(TDigestTest, deserializeJava) {
+  std::vector<int16_t> positions;
+  {
+    SCOPED_TRACE(
+        "select to_base64(cast(tdigest_agg(x) as varbinary)) from (values (2.0)) as t(x)");
+    auto data = decodeBase64(
+        "AQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAEAAAAAAAABZQAAAAAAAAPA/AQAAAAAAAAAAAPA/AAAAAAAAAEA=");
+    TDigest digest;
+    digest.mergeDeserialized(positions, data.data());
+    digest.compress(positions);
+    ASSERT_EQ(digest.compression(), tdigest::kDefaultCompression);
+    ASSERT_EQ(digest.sum(), 2.0);
+    for (auto q : kQuantiles) {
+      ASSERT_EQ(digest.estimateQuantile(q), 2.0);
+    }
+  }
+  {
+    SCOPED_TRACE(
+        "select to_base64(cast(tdigest_agg(x, w, c) as varbinary)) from (values (2.0, 2, 200.0)) as t(x, w, c)");
+    auto data = decodeBase64(
+        "AQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAEEAAAAAAAABpQAAAAAAAAABAAQAAAAAAAAAAAABAAAAAAAAAAEA=");
+    TDigest digest;
+    digest.setCompression(200);
+    digest.mergeDeserialized(positions, data.data());
+    digest.compress(positions);
+    ASSERT_EQ(digest.compression(), 200);
+    ASSERT_EQ(digest.sum(), 4.0);
+    for (auto q : kQuantiles) {
+      ASSERT_EQ(digest.estimateQuantile(q), 2.0);
+    }
+  }
+  {
+    SCOPED_TRACE(
+        "select to_base64(cast(tdigest_agg(x) as varbinary)) from unnest(sequence(0, 1000)) as t(x)");
+    auto data = decodeBase64(
+        "AQAAAAAAAAAAAAAAAAAAQI9AAAAAAFCMHkEAAAAAAABZQAAAAAAASI9AMgAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAAAEAAAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAAUQAAAAAAAABxAAAAAAAAAIkAAAAAAAAAoQAAAAAAAADBAAAAAAAAANEAAAAAAAAA6QAAAAAAAgEBAAAAAAACAREAAAAAAAABJQAAAAAAAAE5AAAAAAABAUUAAAAAAAEBTQAAAAAAAgFRAAAAAAADAU0AAAAAAAABSQAAAAAAAAFBAAAAAAAAAS0AAAAAAAIBGQAAAAAAAAEJAAAAAAAAAPUAAAAAAAAA2QAAAAAAAADFAAAAAAAAAKkAAAAAAAAAkQAAAAAAAACBAAAAAAAAAGEAAAAAAAAAUQAAAAAAAAAhAAAAAAAAACEAAAAAAAAAAQAAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAA8D8AAAAAAADwPwAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAAAAAAAAAAPA/AAAAAAAAAEAAAAAAAAAIQAAAAAAAABBAAAAAAAAAFEAAAAAAAAAYQAAAAAAAAB5AAAAAAAAAI0AAAAAAAAAoQAAAAAAAAC9AAAAAAAAANEAAAAAAAAA6QAAAAAAAAEFAAAAAAABARkAAAAAAAEBNQAAAAAAAIFNAAAAAAADgWEAAAAAAACBgQAAAAAAAwGRAAAAAAABwakAAAAAAAKhwQAAAAAAAsHRAAAAAAABAeUAAAAAAADh+QAAAAAAAoIFAAAAAAAD8g0AAAAAAAByGQAAAAAAA9IdAAAAAAACAiUAAAAAAAMSKQAAAAAAAyItAAAAAAACUjEAAAAAAADCNQAAAAAAAqI1AAAAAAAAEjkAAAAAAAEyOQAAAAAAAhI5AAAAAAACwjkAAAAAAANCOQAAAAAAA6I5AAAAAAAD8jkAAAAAAAAiPQAAAAAAAEI9AAAAAAAAYj0AAAAAAACCPQAAAAAAAKI9AAAAAAAAwj0AAAAAAADiPQAAAAAAAQI9A");
+    TDigest digest;
+    digest.mergeDeserialized(positions, data.data());
+    digest.compress(positions);
+    ASSERT_NEAR(digest.sum(), 500500, kSumError);
+    for (auto q : kQuantiles) {
+      auto v = digest.estimateQuantile(q);
+      ASSERT_NEAR(v / 1000, q, kRankError);
+    }
+  }
+  {
+    SCOPED_TRACE(
+        "select to_base64(cast(tdigest_agg(x, w) as varbinary)) from (values (0.0, 1), (1.0, 100)) as t(x, w)");
+    auto data = decodeBase64(
+        "AQAAAAAAAAAAAAAAAAAAAPA/AAAAAAAAWUAAAAAAAABZQAAAAAAAQFlAAgAAAAAAAAAAAPA/AAAAAAAAWUAAAAAAAAAAAAAAAAAAAPA/");
+    TDigest digest;
+    digest.mergeDeserialized(positions, data.data());
+    digest.compress(positions);
+    double values[101];
+    values[0] = 0;
+    std::fill(values + 1, values + 101, 1);
+    CHECK_QUANTILES(folly::Range(values, 101), digest);
+  }
+  {
+    SCOPED_TRACE(
+        "select to_base64(cast(tdigest_agg(cast(x as double), 1001 - x) as varbinary)) from unnest(sequence(1, 1000)) as t(x)");
+    auto data = decodeBase64(
+        "AQAAAAAAAADwPwAAAAAAQI9AAAAAMIjto0EAAAAAAABZQAAAAABQjB5BLAAAAAAAAAAAQI9AAAAAAAA4j0AAAAAAADCPQAAAAAAAKI9AAAAAAAAcn0AAAAAAAEanQAAAAAAAUbNAAAAAAADhukAAAAAAAO3EQAAAAAAA4M9AAAAAAEDU10AAAAAAwEDhQAAAAABA4edAAAAAAEB57kAAAAAAwELxQAAAAABgiu1AAAAAAKAE50AAAAAA4LzgQAAAAADAANdAAAAAAABYz0AAAAAAAHzEQAAAAAAAqbpAAAAAAAD0sEAAAAAAAOClQAAAAAAA+JtAAAAAAADgkUAAAAAAAJCFQAAAAAAAEH1AAAAAAADAckAAAAAAAOBmQAAAAAAAQF9AAAAAAACAVEAAAAAAAIBJQAAAAAAAAEVAAAAAAAAAN0AAAAAAAAAzQAAAAAAAACBAAAAAAAAAHEAAAAAAAAAYQAAAAAAAABRAAAAAAAAAEEAAAAAAAAAIQAAAAAAAAABAAAAAAAAA8D8AAAAAAADwPwAAAAAAAABAAAAAAAAACEAAAAAAAAAQQCT88Sq+/xVAQP1fAVD/H0C9Xrrw9v4nQHwxjlL1/jFAoIdSJV/9OkDMzMzMzHxEQM6g0QNUOE9AN9zhXw23V0AB/5K759VhQNPiJszvSmpATJIkSZK8ckDxSWWlSwl5QMvdEa6CIH9A8tiKoOFUgkCutAVRS7qEQHt4eHh4v4ZAwq5fKvlsiECiNVqjNcqJQLdt27Zt44pAJEmSJEnEi0CGUZ0mB3mMQN/NI1SfCY1ACis2j1d6jUBTSimllNKNQHsUrkfhGo5Aul0rJzxTjkCQwvUoXH+OQPQxOB+Do45AsK+vr6+/jkB6nud5nteOQOpNb3rT645AvYbyGsr7jkAAAAAAAAiPQAAAAAAAEI9AAAAAAAAYj0AAAAAAACCPQAAAAAAAKI9AAAAAAAAwj0AAAAAAADiPQAAAAAAAQI9A");
+    TDigest digest;
+    digest.mergeDeserialized(positions, data.data());
+    digest.compress(positions);
+    std::vector<double> values;
+    values.reserve(500500);
+    for (int i = 1; i <= 1000; ++i) {
+      values.insert(values.end(), 1001 - i, i);
+    }
+    CHECK_QUANTILES(values, digest);
+  }
+}
+
+TEST(TDigestTest, mergeNoOverlap) {
+  constexpr int N = 1e5;
+  TDigest<> digests[2];
+  std::vector<int16_t> positions;
+  for (int i = 0; i < N; ++i) {
+    digests[0].add(positions, i);
+    digests[1].add(positions, i + N);
+  }
+  digests[1].compress(positions);
+  std::string buf(digests[1].serializedByteSize(), '\0');
+  digests[1].serialize(buf.data());
+  digests[0].mergeDeserialized(positions, buf.data());
+  digests[0].compress(positions);
+  ASSERT_NEAR(digests[0].sum(), N * (2.0 * N - 1), kSumError);
+  for (auto q : kQuantiles) {
+    auto v = digests[0].estimateQuantile(q);
+    ASSERT_NEAR(v / (2 * N - 1), q, kRankError);
+  }
+}
+
+TEST(TDigestTest, mergeOverlap) {
+  constexpr int N = 1e5;
+  TDigest digest;
+  std::vector<int16_t> positions;
+  std::vector<double> values;
+  values.reserve(2 * N);
+  for (int i = 0; i < N; ++i) {
+    digest.add(positions, i);
+    values.insert(values.end(), 2, i);
+  }
+  digest.compress(positions);
+  std::string buf(digest.serializedByteSize(), '\0');
+  digest.serialize(buf.data());
+  digest.mergeDeserialized(positions, buf.data());
+  digest.compress(positions);
+  CHECK_QUANTILES(values, digest);
+}
+
+TEST(TDigestTest, normalDistribution) {
+  constexpr int N = 1e5;
+  std::vector<int16_t> positions;
+  double values[N];
+  std::default_random_engine gen(common::testutil::getRandomSeed(42));
+  for (double mean : {0, 1000}) {
+    SCOPED_TRACE(fmt::format("mean={}", mean));
+    std::normal_distribution<> dist(mean, 1);
+    TDigest digest;
+    for (int i = 0; i < N; ++i) {
+      auto v = dist(gen);
+      digest.add(positions, v);
+      values[i] = v;
+    }
+    digest.compress(positions);
+    std::sort(values, values + N);
+    CHECK_QUANTILES(folly::Range(values, N), digest);
+  }
+}
+
+TEST(TDigestTest, addWeighed) {
+  std::vector<int16_t> positions;
+  TDigest digest;
+  std::vector<double> values;
+  values.reserve(5050);
+  for (int i = 1; i <= 100; ++i) {
+    digest.add(positions, i, i);
+    values.insert(values.end(), i, i);
+  }
+  digest.compress(positions);
+  CHECK_QUANTILES(values, digest);
+}
+
+TEST(TDigestTest, merge) {
+  std::vector<int16_t> positions;
+  std::default_random_engine gen(common::testutil::getRandomSeed(42));
+  std::vector<double> values;
+  std::string buf;
+  auto test = [&](int numDigests, int size, double mean, double stddev) {
+    SCOPED_TRACE(fmt::format(
+        "numDigests={} size={} mean={} stddev={}",
+        numDigests,
+        size,
+        mean,
+        stddev));
+    values.clear();
+    values.reserve(numDigests * size);
+    std::normal_distribution<> dist(mean, stddev);
+    TDigest digest;
+    for (int i = 0; i < numDigests; ++i) {
+      TDigest current;
+      for (int j = 0; j < size; ++j) {
+        auto v = dist(gen);
+        current.add(positions, v);
+        values.push_back(v);
+      }
+      current.compress(positions);
+      buf.resize(current.serializedByteSize());
+      current.serialize(buf.data());
+      digest.mergeDeserialized(positions, buf.data());
+    }
+    digest.compress(positions);
+    std::sort(std::begin(values), std::end(values));
+    CHECK_QUANTILES(values, digest);
+  };
+  test(2, 5e4, 0, 50);
+  test(100, 1000, 500, 20);
+  test(1e4, 10, 500, 20);
+}
+
+TEST(TDigestTest, infinity) {
+  std::vector<int16_t> positions;
+  TDigest digest;
+  digest.add(positions, 0.0);
+  digest.add(positions, INFINITY);
+  digest.add(positions, -INFINITY);
+  digest.compress(positions);
+  ASSERT_TRUE(std::isnan(digest.sum()));
+  ASSERT_EQ(digest.estimateQuantile(0), -INFINITY);
+  ASSERT_EQ(digest.estimateQuantile(0.3), -INFINITY);
+  ASSERT_EQ(digest.estimateQuantile(0.4), 0.0);
+  ASSERT_EQ(digest.estimateQuantile(0.5), 0.0);
+  ASSERT_EQ(digest.estimateQuantile(0.6), 0.0);
+  ASSERT_EQ(digest.estimateQuantile(0.7), INFINITY);
+  ASSERT_EQ(digest.estimateQuantile(1), INFINITY);
+}
+
+} // namespace
+} // namespace facebook::velox::functions
diff --git a/velox/type/tests/TimestampTest.cpp b/velox/type/tests/TimestampTest.cpp
index 9e344cf999349..bdfa146dd81ac 100644
--- a/velox/type/tests/TimestampTest.cpp
+++ b/velox/type/tests/TimestampTest.cpp
@@ -18,6 +18,7 @@
 #include <random>
 
 #include "velox/common/base/tests/GTestUtils.h"
+#include "velox/common/testutil/RandomSeed.h"
 #include "velox/type/Timestamp.h"
 #include "velox/type/tz/TimeZoneMap.h"
 
@@ -259,16 +260,6 @@ TEST(TimestampTest, toStringPrestoCastBehavior) {
 
 namespace {
 
-uint64_t randomSeed() {
-  if (const char* env = getenv("VELOX_TEST_USE_RANDOM_SEED")) {
-    auto seed = std::random_device{}();
-    LOG(INFO) << "Random seed: " << seed;
-    return seed;
-  } else {
-    return 42;
-  }
-}
-
 std::string toStringAlt(
     const Timestamp& t,
     TimestampToStringOptions::Precision precision) {
@@ -308,7 +299,7 @@ bool checkUtcToEpoch(int year, int mon, int mday, int hour, int min, int sec) {
 } // namespace
 
 TEST(TimestampTest, compareWithToStringAlt) {
-  std::default_random_engine gen(randomSeed());
+  std::default_random_engine gen(common::testutil::getRandomSeed(42));
   std::uniform_int_distribution<int64_t> distSec(
       Timestamp::kMinSeconds, Timestamp::kMaxSeconds);
   std::uniform_int_distribution<uint64_t> distNano(0, Timestamp::kMaxNanos);
@@ -349,7 +340,7 @@ TEST(TimestampTest, utcToEpoch) {
 }
 
 TEST(TimestampTest, utcToEpochRandomInputs) {
-  std::default_random_engine gen(randomSeed());
+  std::default_random_engine gen(common::testutil::getRandomSeed(42));
   std::uniform_int_distribution<int32_t> dist(INT32_MIN, INT32_MAX);
   for (int i = 0; i < 10'000; ++i) {
     checkUtcToEpoch(