Skip to content

Commit

Permalink
Add benchmark with multiple join column
Browse files Browse the repository at this point in the history
  • Loading branch information
zeotuan committed Dec 10, 2024
1 parent 90ec1e1 commit 37eca14
Showing 1 changed file with 42 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package com.github.mrpowers.spark.fast.tests

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import java.util.concurrent.TimeUnit
import scala.util.Try

private class DatasetComparerBenchmark extends DatasetComparer {
lazy val spark: SparkSession = {
val session = SparkSession
.builder()
.master("local")
.appName("spark session")
.getOrCreate()
session.sparkContext.setLogLevel("ERROR")
session
}

@Benchmark
@BenchmarkMode(Array(Mode.SingleShotTime))
@Fork(value = 2)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
def assertLargeDatasetEqualityV2(blackHole: Blackhole): Boolean = {
val spark = SparkSession
.builder()
.master("local")
.appName("spark session")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

val ds1 = spark.range(0, 1000000, 1, 8)
val ds3 = ds1

Expand All @@ -37,14 +41,7 @@ private class DatasetComparerBenchmark extends DatasetComparer {
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
def assertLargeDatasetEqualityV2WithPrimaryKey(blackHole: Blackhole): Boolean = {
val spark = SparkSession
.builder()
.master("local")
.appName("spark session")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

def assertLargeDatasetEqualityV2WithSinglePrimaryKey(blackHole: Blackhole): Boolean = {
val ds1 = spark.range(0, 1000000, 1, 8)
val ds3 = ds1

Expand All @@ -61,13 +58,6 @@ private class DatasetComparerBenchmark extends DatasetComparer {
@Measurement(iterations = 10)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
def assertLargeDatasetEquality(blackHole: Blackhole): Boolean = {
val spark = SparkSession
.builder()
.master("local")
.appName("spark session")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

val ds1 = spark.range(0, 1000000, 1, 8)
val ds3 = ds1

Expand All @@ -76,4 +66,34 @@ private class DatasetComparerBenchmark extends DatasetComparer {
blackHole.consume(result)
result.isSuccess
}

@Benchmark
@BenchmarkMode(Array(Mode.SingleShotTime))
@Fork(value = 2)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
def assertLargeDatasetEqualityV2WithCompositePrimaryKey2(blackHole: Blackhole): Boolean = {
val ds1 = spark.range(0, 1000000, 1, 8).withColumn("id2", col("id") + 1)
val ds3 = ds1
val result = Try(assertLargeDatasetEqualityV2(ds1, ds3, primaryKeys = Seq("id", "id2")))

blackHole.consume(result)
result.isSuccess
}

@Benchmark
@BenchmarkMode(Array(Mode.SingleShotTime))
@Fork(value = 2)
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
def assertLargeDatasetEqualityV2WithCompositePrimaryKey3(blackHole: Blackhole): Boolean = {
val ds1 = spark.range(0, 1000000, 1, 8).withColumn("id2", col("id") + 1).withColumn("id3", col("id2") + 1)
val ds3 = ds1
val result = Try(assertLargeDatasetEqualityV2(ds1, ds3, primaryKeys = Seq("id", "id2", "id3")))

blackHole.consume(result)
result.isSuccess
}
}

0 comments on commit 37eca14

Please sign in to comment.