From 2377f35d610aa8218a211a5c49e08e4326dedba9 Mon Sep 17 00:00:00 2001
From: Jimmy Lu <jimmylu@meta.com>
Date: Wed, 18 Sep 2024 13:01:41 -0700
Subject: [PATCH] Expose ArrayVectorBase::hasOverlappingRanges as static method
 to be used in different vector implementation

Differential Revision: D62980472
---
 velox/vector/ComplexVector.cpp | 57 ++++++++++++++++++++++------------
 velox/vector/ComplexVector.h   | 20 ++++++------
 2 files changed, 49 insertions(+), 28 deletions(-)

diff --git a/velox/vector/ComplexVector.cpp b/velox/vector/ComplexVector.cpp
index 32cd5c9d6030..2b6f6beef16d 100644
--- a/velox/vector/ComplexVector.cpp
+++ b/velox/vector/ComplexVector.cpp
@@ -809,55 +809,74 @@ VectorPtr RowVector::pushDictionaryToRowVectorLeaves(const VectorPtr& input) {
       wrappers, input->size(), input, input->pool());
 }
 
+namespace {
+
+// Returns the next non-null non-empty array/map on or after `index'.
 template <bool kHasNulls>
-vector_size_t ArrayVectorBase::nextNonEmpty(vector_size_t i) const {
-  while (i < size() &&
-         ((kHasNulls && bits::isBitNull(rawNulls(), i)) || rawSizes_[i] <= 0)) {
+vector_size_t nextNonEmpty(
+    vector_size_t i,
+    vector_size_t size,
+    const uint64_t* nulls,
+    const vector_size_t* sizes) {
+  while (i < size &&
+         ((kHasNulls && bits::isBitNull(nulls, i)) || sizes[i] <= 0)) {
     ++i;
   }
   return i;
 }
 
 template <bool kHasNulls>
-bool ArrayVectorBase::maybeHaveOverlappingRanges() const {
+bool maybeHaveOverlappingRanges(
+    vector_size_t size,
+    const uint64_t* nulls,
+    const vector_size_t* offsets,
+    const vector_size_t* sizes) {
   vector_size_t curr = 0;
-  curr = nextNonEmpty<kHasNulls>(curr);
-  if (curr >= size()) {
+  curr = nextNonEmpty<kHasNulls>(curr, size, nulls, sizes);
+  if (curr >= size) {
     return false;
   }
   for (;;) {
-    auto next = nextNonEmpty<kHasNulls>(curr + 1);
-    if (next >= size()) {
+    auto next = nextNonEmpty<kHasNulls>(curr + 1, size, nulls, sizes);
+    if (next >= size) {
       return false;
     }
-    // This also implicitly ensures rawOffsets_[curr] <= rawOffsets_[next].
-    if (rawOffsets_[curr] + rawSizes_[curr] > rawOffsets_[next]) {
+    // This also implicitly ensures offsets[curr] <= offsets[next].
+    if (offsets[curr] + sizes[curr] > offsets[next]) {
       return true;
     }
     curr = next;
   }
 }
 
-bool ArrayVectorBase::hasOverlappingRanges() const {
-  if (!(rawNulls() ? maybeHaveOverlappingRanges<true>()
-                   : maybeHaveOverlappingRanges<false>())) {
+} // namespace
+
+// static
+bool ArrayVectorBase::hasOverlappingRanges(
+    vector_size_t size,
+    const uint64_t* nulls,
+    const vector_size_t* offsets,
+    const vector_size_t* sizes) {
+  if (!(nulls
+            ? maybeHaveOverlappingRanges<true>(size, nulls, offsets, sizes)
+            : maybeHaveOverlappingRanges<false>(size, nulls, offsets, sizes))) {
     return false;
   }
   std::vector<vector_size_t> indices;
-  indices.reserve(size());
-  for (vector_size_t i = 0; i < size(); ++i) {
-    const bool isNull = rawNulls() && bits::isBitNull(rawNulls(), i);
-    if (!isNull && rawSizes_[i] > 0) {
+  indices.reserve(size);
+  for (vector_size_t i = 0; i < size; ++i) {
+    const bool isNull = nulls && bits::isBitNull(nulls, i);
+    if (!isNull && sizes[i] > 0) {
       indices.push_back(i);
     }
   }
   std::sort(indices.begin(), indices.end(), [&](auto i, auto j) {
-    return rawOffsets_[i] < rawOffsets_[j];
+    return offsets[i] < offsets[j];
   });
   for (vector_size_t i = 1; i < indices.size(); ++i) {
     auto j = indices[i - 1];
     auto k = indices[i];
-    if (rawOffsets_[j] + rawSizes_[j] > rawOffsets_[k]) {
+    if (offsets[j] + sizes[j] > offsets[k]) {
       return true;
     }
   }
diff --git a/velox/vector/ComplexVector.h b/velox/vector/ComplexVector.h
index 04ab99679fef..eb45cc08bfcc 100644
--- a/velox/vector/ComplexVector.h
+++ b/velox/vector/ComplexVector.h
@@ -365,7 +365,17 @@ struct ArrayVectorBase : BaseVector {
   }
 
   /// Check if there is any overlapping [offset, size] ranges.
-  bool hasOverlappingRanges() const;
+  bool hasOverlappingRanges() const {
+    return hasOverlappingRanges(size(), rawNulls(), rawOffsets_, rawSizes_);
+  }
+
+  /// Check if there is any overlapping [offset, size] ranges for any non-null
+  /// non-empty rows.
+  static bool hasOverlappingRanges(
+      vector_size_t size,
+      const uint64_t* nulls,
+      const vector_size_t* offsets,
+      const vector_size_t* sizes);
 
  protected:
   ArrayVectorBase(
@@ -410,14 +420,6 @@ struct ArrayVectorBase : BaseVector {
   const vector_size_t* rawOffsets_;
   BufferPtr sizes_;
   const vector_size_t* rawSizes_;
-
- private:
-  template <bool kHasNulls>
-  bool maybeHaveOverlappingRanges() const;
-
-  // Returns the next non-null non-empty array/map on or after `index'.
-  template <bool kHasNulls>
-  vector_size_t nextNonEmpty(vector_size_t index) const;
 };
 
 class ArrayVector : public ArrayVectorBase {