Skip to content

Commit

Permalink
add benchmark for Spark TRowSet generation
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Dec 9, 2023
1 parent 79b24a7 commit d2a360f
Show file tree
Hide file tree
Showing 6 changed files with 513 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ embedded_zookeeper/
/externals/kyuubi-spark-sql-engine/operation_logs/
/externals/kyuubi-spark-sql-engine/engine_operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/externals/kyuubi-spark-sql-engine/benchmarks/*-results.txt
/work/
/docs/_build/
/kyuubi-common/metrics/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.schema

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

trait RowSetHelper {
protected def genRow(value: Int): Row = {
val boolVal = value % 3 match {
case 0 => true
case 1 => false
case _ => null
}
val byteVal = value.toByte
val shortVal = value.toShort
val longVal = value.toLong
val floatVal = java.lang.Float.valueOf(s"$value.$value")
val doubleVal = java.lang.Double.valueOf(s"$value.$value")
val stringVal = value.toString * value
val decimalVal = new java.math.BigDecimal(s"$value.$value")
val day = java.lang.String.format("%02d", java.lang.Integer.valueOf(value % 30 + 1))
val dateVal = Date.valueOf(s"2018-11-$day")
val timestampVal = Timestamp.valueOf(s"2018-11-17 13:33:33.$value")
val binaryVal = Array.fill[Byte](value)(value.toByte)
val arrVal = Array.fill(value)(doubleVal).toSeq
val mapVal = Map(value -> doubleVal)
val interval = new CalendarInterval(value, value, value)
val localDate = LocalDate.of(2018, 11, 17)
val instant = Instant.now()

Row(
boolVal,
byteVal,
shortVal,
value,
longVal,
floatVal,
doubleVal,
stringVal,
decimalVal,
dateVal,
timestampVal,
binaryVal,
arrVal,
mapVal,
interval,
localDate,
instant)
}

protected val schemaStructFields: Seq[StructField] = Seq(
("a", "boolean", "boolVal"),
("b", "tinyint", "byteVal"),
("c", "smallint", "shortVal"),
("d", "int", "value"),
("e", "bigint", "longVal"),
("f", "float", "floatVal"),
("g", "double", "doubleVal"),
("h", "string", "stringVal"),
("i", "decimal", "decimalVal"),
("j", "date", "dateVal"),
("k", "timestamp", "timestampVal"),
("l", "binary", "binaryVal"),
("m", "array<double>", "arrVal"),
("n", "map<int, double>", "mapVal"),
("o", "interval", "interval"),
("p", "date", "localDate"),
("q", "timestamp", "instant"))
.map { case (colName, typeName, comment) =>
StructField(colName, CatalystSqlParser.parseDataType(typeName)).withComment(comment)
}

protected val schema: StructType = StructType(schemaStructFields)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.kyuubi.engine.spark.schema
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import scala.collection.JavaConverters._

Expand All @@ -32,70 +31,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion

class RowSetSuite extends KyuubiFunSuite {

def genRow(value: Int): Row = {
val boolVal = value % 3 match {
case 0 => true
case 1 => false
case _ => null
}
val byteVal = value.toByte
val shortVal = value.toShort
val longVal = value.toLong
val floatVal = java.lang.Float.valueOf(s"$value.$value")
val doubleVal = java.lang.Double.valueOf(s"$value.$value")
val stringVal = value.toString * value
val decimalVal = new java.math.BigDecimal(s"$value.$value")
val day = java.lang.String.format("%02d", java.lang.Integer.valueOf(value + 1))
val dateVal = Date.valueOf(s"2018-11-$day")
val timestampVal = Timestamp.valueOf(s"2018-11-17 13:33:33.$value")
val binaryVal = Array.fill[Byte](value)(value.toByte)
val arrVal = Array.fill(value)(doubleVal).toSeq
val mapVal = Map(value -> doubleVal)
val interval = new CalendarInterval(value, value, value)
val localDate = LocalDate.of(2018, 11, 17)
val instant = Instant.now()

Row(
boolVal,
byteVal,
shortVal,
value,
longVal,
floatVal,
doubleVal,
stringVal,
decimalVal,
dateVal,
timestampVal,
binaryVal,
arrVal,
mapVal,
interval,
localDate,
instant)
}

val schema: StructType = new StructType()
.add("a", "boolean")
.add("b", "tinyint")
.add("c", "smallint")
.add("d", "int")
.add("e", "bigint")
.add("f", "float")
.add("g", "double")
.add("h", "string")
.add("i", "decimal")
.add("j", "date")
.add("k", "timestamp")
.add("l", "binary")
.add("m", "array<double>")
.add("n", "map<int, double>")
.add("o", "interval")
.add("p", "date")
.add("q", "timestamp")

class RowSetSuite extends KyuubiFunSuite with RowSetHelper {
private val rows: Seq[Row] = (0 to 10).map(genRow) ++ Seq(Row.fromSeq(Seq.fill(17)(null)))

test("column based set") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kyuubi

import scala.concurrent.duration._

import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.hive.service.rpc.thrift.TProtocolVersion._
import org.apache.spark.kyuubi.benchmark.{Benchmark, KyuubiBenchmarkBase}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.engine.spark.schema.{RowSet, RowSetHelper}

/**
* Benchmark to measure the performance of generate TRowSet.
*
* {{{
* RUN_BENCHMARK=1 ./build/mvn clean test \
* -pl externals/kyuubi-spark-sql-engine -am \
* -Dtest=none -DwildcardSuites=org.apache.spark.kyuubi.TRowSetBenchmark
* }}}
*/
class TRowSetBenchmark extends KyuubiFunSuite with RowSetHelper with KyuubiBenchmarkBase {
private val runBenchmark = sys.env.contains("RUN_BENCHMARK")

private val rowCount = 1000
private val rotations = 5
private lazy val allRows = (0 until rowCount).map(genRow)

test("row-based toTRowSet benchmark") {
assume(runBenchmark)
val rowSetType = "column-based"
withHeader(rowSetType) {
tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V5, rowSetType)
}
}

test("column-based toTRowSet benchmark") {
assume(runBenchmark)
val rowSetType = "column-based"
withHeader(rowSetType) {
tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V6, rowSetType)
}
}

private def tRowSetGenerationBenchmark(
protocolVersion: TProtocolVersion,
rowSetType: String): Unit = {
val benchmark =
new Benchmark(
s"$rowSetType TRowSet benchmark",
rowCount,
warmupTime = 3.seconds,
output = output)

schemaStructFields.zipWithIndex.foreach {
case (field, idx) =>
val rowsOfSingleType = allRows.map(row => Row(row.get(idx)))
val schemaOfSingleType = StructType(Seq(field))

val commentOrName = field.getComment().getOrElse(field.dataType.typeName)
benchmark.addCase(s"$commentOrName", rotations) { _ =>
benchmarkToTRowSet(
rowsOfSingleType,
schemaOfSingleType,
protocolVersion)
}
}

benchmark.addCase(s"with all types", rotations) { _ =>
benchmarkToTRowSet(allRows, schema, protocolVersion)
}

benchmark.run()
}

private def benchmarkToTRowSet(
rows: Seq[Row],
schema: StructType,
protocolVersion: TProtocolVersion): Unit = {
RowSet.toTRowSet(rows, schema, protocolVersion)
}
}
Loading

0 comments on commit d2a360f

Please sign in to comment.