Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK] Add benchmark for Spark TRowSet generation of row-based and column-based #5809

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ embedded_zookeeper/
/externals/kyuubi-spark-sql-engine/operation_logs/
/externals/kyuubi-spark-sql-engine/engine_operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/externals/kyuubi-spark-sql-engine/benchmarks/*-results.txt
/work/
/docs/_build/
/kyuubi-common/metrics/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.schema

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

trait RowSetHelper {
protected def genRow(value: Int): Row = {
val boolVal = value % 3 match {
case 0 => true
case 1 => false
case _ => null
}
val byteVal = value.toByte
val shortVal = value.toShort
val longVal = value.toLong
val floatVal = java.lang.Float.valueOf(s"$value.$value")
val doubleVal = java.lang.Double.valueOf(s"$value.$value")
val stringVal = value.toString * value
val decimalVal = new java.math.BigDecimal(s"$value.$value")
val day = java.lang.String.format("%02d", java.lang.Integer.valueOf(value % 30 + 1))
val dateVal = Date.valueOf(s"2018-11-$day")
val timestampVal = Timestamp.valueOf(s"2018-11-17 13:33:33.$value")
val binaryVal = Array.fill[Byte](value)(value.toByte)
val arrVal = Array.fill(10)(doubleVal).toSeq
val mapVal = Map(value -> doubleVal)
val interval = new CalendarInterval(value, value, value)
val localDate = LocalDate.of(2018, 11, 17)
val instant = Instant.now()

Row(
boolVal,
byteVal,
shortVal,
value,
longVal,
floatVal,
doubleVal,
stringVal,
decimalVal,
dateVal,
timestampVal,
binaryVal,
arrVal,
mapVal,
interval,
localDate,
instant)
}

protected val schemaStructFields: Seq[StructField] = Seq(
("a", "boolean", "boolVal"),
("b", "tinyint", "byteVal"),
("c", "smallint", "shortVal"),
("d", "int", "value"),
("e", "bigint", "longVal"),
("f", "float", "floatVal"),
("g", "double", "doubleVal"),
("h", "string", "stringVal"),
("i", "decimal", "decimalVal"),
("j", "date", "dateVal"),
("k", "timestamp", "timestampVal"),
("l", "binary", "binaryVal"),
("m", "array<double>", "arrVal"),
("n", "map<int, double>", "mapVal"),
("o", "interval", "interval"),
("p", "date", "localDate"),
("q", "timestamp", "instant"))
.map { case (colName, typeName, comment) =>
StructField(colName, CatalystSqlParser.parseDataType(typeName)).withComment(comment)
}

protected val schema: StructType = StructType(schemaStructFields)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.kyuubi.engine.spark.schema
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import scala.collection.JavaConverters._

Expand All @@ -32,70 +31,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion

class RowSetSuite extends KyuubiFunSuite {

def genRow(value: Int): Row = {
val boolVal = value % 3 match {
case 0 => true
case 1 => false
case _ => null
}
val byteVal = value.toByte
val shortVal = value.toShort
val longVal = value.toLong
val floatVal = java.lang.Float.valueOf(s"$value.$value")
val doubleVal = java.lang.Double.valueOf(s"$value.$value")
val stringVal = value.toString * value
val decimalVal = new java.math.BigDecimal(s"$value.$value")
val day = java.lang.String.format("%02d", java.lang.Integer.valueOf(value + 1))
val dateVal = Date.valueOf(s"2018-11-$day")
val timestampVal = Timestamp.valueOf(s"2018-11-17 13:33:33.$value")
val binaryVal = Array.fill[Byte](value)(value.toByte)
val arrVal = Array.fill(value)(doubleVal).toSeq
val mapVal = Map(value -> doubleVal)
val interval = new CalendarInterval(value, value, value)
val localDate = LocalDate.of(2018, 11, 17)
val instant = Instant.now()

Row(
boolVal,
byteVal,
shortVal,
value,
longVal,
floatVal,
doubleVal,
stringVal,
decimalVal,
dateVal,
timestampVal,
binaryVal,
arrVal,
mapVal,
interval,
localDate,
instant)
}

val schema: StructType = new StructType()
.add("a", "boolean")
.add("b", "tinyint")
.add("c", "smallint")
.add("d", "int")
.add("e", "bigint")
.add("f", "float")
.add("g", "double")
.add("h", "string")
.add("i", "decimal")
.add("j", "date")
.add("k", "timestamp")
.add("l", "binary")
.add("m", "array<double>")
.add("n", "map<int, double>")
.add("o", "interval")
.add("p", "date")
.add("q", "timestamp")

class RowSetSuite extends KyuubiFunSuite with RowSetHelper {
private val rows: Seq[Row] = (0 to 10).map(genRow) ++ Seq(Row.fromSeq(Seq.fill(17)(null)))

test("column based set") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kyuubi

import scala.concurrent.duration._

import org.apache.spark.kyuubi.benchmark.{Benchmark, KyuubiBenchmarkBase}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.engine.spark.schema.{RowSet, RowSetHelper}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion._

/**
* Benchmark to measure the performance of generate TRowSet.
*
* {{{
* RUN_BENCHMARK=1 ./build/mvn clean test \
* -pl externals/kyuubi-spark-sql-engine -am \
* -Dtest=none -DwildcardSuites=org.apache.spark.kyuubi.TRowSetBenchmark
* }}}
*/
class TRowSetBenchmark extends KyuubiFunSuite with RowSetHelper with KyuubiBenchmarkBase {
private val runBenchmark = sys.env.contains("RUN_BENCHMARK")

private val rowCount = 1000
private val rotations = 5
private lazy val allRows = (0 until rowCount).map(genRow)

test("row-based toTRowSet benchmark") {
assume(runBenchmark)
val rowSetType = "column-based"
withHeader(rowSetType) {
tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V5, rowSetType)
}
}

test("column-based toTRowSet benchmark") {
assume(runBenchmark)
val rowSetType = "column-based"
withHeader(rowSetType) {
tRowSetGenerationBenchmark(HIVE_CLI_SERVICE_PROTOCOL_V6, rowSetType)
}
}

private def tRowSetGenerationBenchmark(
protocolVersion: TProtocolVersion,
rowSetType: String): Unit = {
val benchmark =
new Benchmark(
s"$rowSetType TRowSet benchmark",
rowCount,
warmupTime = 3.seconds,
output = output)

schemaStructFields.zipWithIndex.foreach {
case (field, idx) =>
val rowsOfSingleType = allRows.map(row => Row(row.get(idx)))
val schemaOfSingleType = StructType(Seq(field))

val commentOrName = field.getComment().getOrElse(field.dataType.typeName)
benchmark.addCase(s"$commentOrName", rotations) { _ =>
benchmarkToTRowSet(
rowsOfSingleType,
schemaOfSingleType,
protocolVersion)
}
}

benchmark.addCase(s"with all types", rotations) { _ =>
benchmarkToTRowSet(allRows, schema, protocolVersion)
}

benchmark.run()
}

private def benchmarkToTRowSet(
rows: Seq[Row],
schema: StructType,
protocolVersion: TProtocolVersion): Unit = {
RowSet.toTRowSet(rows, schema, protocolVersion)
}
}
Loading
Loading