Skip to content

Commit

Permalink
Support HyperLogLog++
Browse files Browse the repository at this point in the history
Signed-off-by: Chong Gao <[email protected]>
  • Loading branch information
Chong Gao committed Oct 17, 2024
1 parent ed4c878 commit 35518d8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
7 changes: 7 additions & 0 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ def _get_overflow_df(spark, data, data_type, expr):
StructType([StructField('a', data_type)])
).selectExpr(expr)

def test_hll():
assert_gpu_and_cpu_are_equal_sql(
lambda spark : spark.read.parquet("/home/chongg/a"),
"tab",
"select c1, APPROX_COUNT_DISTINCT(c1) from tab group by c1"
)

@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn)
@disable_ansi_mode
def test_addition(data_gen):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3918,6 +3918,16 @@ object GpuOverrides extends Logging {
GpuDynamicPruningExpression(child)
}
}),
expr[HyperLogLogPlusPlus](
"Aggregation approximate count distinct",
ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG,
Seq(ParamCheck("input", TypeSig.all, TypeSig.all))),
(a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = {
GpuHLL(child, 0)
}
}
),
SparkShimImpl.ansiCastRule
).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.aggregate

import ai.rapids.cudf
import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResourceIfAllowed
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
import com.nvidia.spark.rapids.jni.HLL
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class CudfHLL(override val dataType: DataType) extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(ReductionAggregation.HLL(), DType.LIST)
override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.HLL(32 * 1024)
override val name: String = "CudfHLL"
}

case class CudfMergeHLL(override val dataType: DataType)
extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) =>
input.reduce(ReductionAggregation.mergeHLL(), DType.LIST)

override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHLL()
override val name: String = "CudfMergeHLL"
}

/**
* Perform the final evaluation step to compute approximate count distinct from sketches.
*/
case class GpuHLLEvaluation(childExpr: Expression, precision: Int)
extends GpuExpression with ShimExpression {
override def dataType: DataType = LongType

override def prettyName: String = "HLL_evaluation"

override def nullable: Boolean = false

override def children: Seq[Expression] = Seq(childExpr)

override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches =>
val distinctValues = HLL.estimateDistinctValueFromSketches(sketches.getBase, precision)
GpuColumnVector.from(distinctValues, LongType)
}
}
}

case class GpuHLL(childExpr: Expression, precision: Int)
extends GpuAggregateFunction with Serializable {

// specify the HLL sketch type: list<byte>
private lazy val hllBufferType: DataType = ArrayType(ByteType, containsNull = false)

private lazy val hllBufferAttribute: AttributeReference =
AttributeReference("hllAttr", hllBufferType)()

override lazy val initialValues: Seq[Expression] =
Seq(GpuLiteral.create(new GenericArrayData(Array.ofDim[Byte](32 * 1024)), hllBufferType))

override lazy val inputProjection: Seq[Expression] = Seq(childExpr)

override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHLL(hllBufferType))

override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(CudfMergeHLL(hllBufferType))

override lazy val evaluateExpression: Expression = GpuHLLEvaluation(hllBufferAttribute, precision)

override def aggBufferAttributes: Seq[AttributeReference] = hllBufferAttribute :: Nil

override def dataType: DataType = hllBufferType

override def prettyName: String = "approx_count_distinct"

override def nullable: Boolean = true

override def children: Seq[Expression] = Seq(childExpr)
}

0 comments on commit 35518d8

Please sign in to comment.