This repository has been archived by the owner on Oct 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathtpcds-throughput-test.scala
179 lines (155 loc) · 7.56 KB
/
tpcds-throughput-test.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import java.text.SimpleDateFormat;
import java.util.Date
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import com.databricks.spark.sql.perf.tpcds.TPCDS
import com.databricks.spark.sql.perf.Benchmark.ExperimentStatus
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, substring}
val conf = spark.sparkContext.getConf
// how many streams you want to start
val streamNumber = conf.getInt("spark.driver.streamNumber", 2)
// data scale in GB
val scaleFactor = conf.getInt("spark.driver.scaleFactor", 1)
// support parquet or orc
val format = conf.get("spark.driver.format", "parquet")
// create partitioned table
val partitionTables = conf.getBoolean("spark.driver.partitioned", true)
// how many times to run the whole set of queries.
val iterations = conf.getInt("spark.driver.iterations", 1)
// support s3a://s3_bucket, gs://gs_bucket
// wasbs://container@storage_account.blob.core.windows.net
// abfs://container@storage_account.dfs.core.windows.net
val fsdir = conf.get("spark.driver.fsdir", "")
// If the tables in database are not fully created in the previous run, you need to force to drop and recreate the database and tables.
val recreateDatabase = conf.getBoolean("spark.driver.recreateDatabase", false)
val query_filter = Seq() // Seq() == all queries
// val query_filter = Seq("q1-v2.4", "q2-v2.4") // run subset of queries
val randomizeQueries = true // run queries in a random order. Recommended for parallel runs.
if (fsdir == "") {
println("File system dir must be specified with --conf spark.driver.fsdir")
sys.exit(0)
}
val current_time = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date)
// detailed results will be written as JSON to this location.
var resultLocation = s"${fsdir}/shared/data/results/tpcds_${format}/${scaleFactor}/${current_time}"
val data_path = s"${fsdir}/shared/data/tpcds/tpcds_${format}/${scaleFactor}"
var databaseName = s"tpcds_${format}_scale_${scaleFactor}_db"
val use_arrow = conf.getBoolean("spark.driver.useArrow", false) // when you want to use gazella_plugin to run TPC-DS, you need to set it true.
if (use_arrow){
resultLocation = s"${fsdir}/shared/data/results/tpcds_arrow/${scaleFactor}/${current_time}"
databaseName = s"tpcds_arrow_scale_${scaleFactor}_db"
val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
if (spark.catalog.databaseExists(s"$databaseName")) {
if (!recreateDatabase) {
println(s"Using existing $databaseName")
} else {
println(s"$databaseName exists, now drop and recreate it...")
sql(s"drop database if exists $databaseName cascade")
sql(s"create database if not exists $databaseName").show
}
} else {
println(s"$databaseName doesn't exist. Creating...")
sql(s"create database if not exists $databaseName").show
}
sql(s"use $databaseName").show
for (table <- tables) {
if (spark.catalog.tableExists(s"$table")){
println(s"$table exists.")
}else{
spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
}
}
if (partitionTables) {
for (table <- tables) {
try{
sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
}catch{
case e: Exception => println(e)
}
}
}
} else {
// Check whether the database is created, we create external tables if not
val databaseExists = spark.catalog.databaseExists(s"$databaseName")
if (databaseExists && !recreateDatabase) {
println(s"Using existing $databaseName")
} else {
if (databaseExists) {
println(s"$databaseName exists, now drop and recreate it...")
sql(s"drop database if exists $databaseName cascade")
} else {
println(s"$databaseName doesn't exist. Creating...")
}
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(spark.sqlContext, "", s"${scaleFactor}", false)
tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = partitionTables)
}
}
val timeout = 60 // timeout in hours
// COMMAND ----------
// Spark configuration
spark.conf.set("spark.sql.broadcastTimeout", "10000") // good idea for Q14, Q88.
// ... + any other configuration tuning
// COMMAND ----------
sql(s"use $databaseName")
val tpcds = new TPCDS (sqlContext = spark.sqlContext)
def queries(stream: Int) = {
val filtered_queries = query_filter match {
case Seq() => tpcds.tpcds2_4Queries
case _ => tpcds.tpcds2_4Queries.filter(q => query_filter.contains(q.name))
}
if (randomizeQueries && stream != 0) scala.util.Random.shuffle(filtered_queries) else filtered_queries
}
class ThreadStream(experiment:ExperimentStatus, i:Int) extends Thread{
override def run(){
println("stream_" + i + " has started...")
println(experiment.toString)
experiment.waitForFinish(timeout*60*60)
println("stream_" + i + " has finished.")
}
}
val threadPool:ExecutorService=Executors.newFixedThreadPool(streamNumber)
val experiments:Array[ExperimentStatus] = new Array[ExperimentStatus](streamNumber)
try {
for(i <- 0 to (streamNumber - 1)){
experiments(i) = tpcds.runExperiment(
queries(i),
iterations = iterations,
resultLocation = resultLocation,
tags = Map("runtype" -> "benchmark", "database" -> databaseName, "scale_factor" -> s"${scaleFactor}")
)
threadPool.execute(new ThreadStream(experiments(i), i))
}
}finally{
threadPool.shutdown()
threadPool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
}
val summary_dfs = new Array[DataFrame](streamNumber)
for(i <- 0 to (streamNumber - 1)){
summary_dfs(i) = experiments(i).getCurrentResults.withColumn("Name", substring(col("name"), 2, 100)).withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0).select('Name, 'Runtime).agg(sum("Runtime")).withColumn("stream", lit("stream_" + i)).select("stream", "sum(Runtime)")
}
var summary_df = summary_dfs(0)
for (i <- 0 to (streamNumber - 1)){
if (i != 0) {
summary_df = summary_df.union(summary_dfs(i))
}
}
summary_df = summary_df.union(summary_df.agg(max("sum(Runtime)")).withColumnRenamed("max(sum(Runtime))","sum(Runtime)").withColumn("stream", lit("max_stream")).select("stream", "sum(Runtime)"))
summary_df.show()
// Save the performance summary dataframe to a CSV file with a specified file name
val finalResultPath = s"${resultLocation}/summary"
summary_df.repartition(1).write.option("header", "true").mode("overwrite").csv(finalResultPath)
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
val fs = FileSystem.get(URI.create(finalResultPath), sc.hadoopConfiguration)
val file = fs.globStatus(new Path(s"$finalResultPath/*.csv"))(0).getPath().getName()
val srcPath=new Path(s"$finalResultPath/$file")
val destPath= new Path(s"$finalResultPath/summary.csv")
fs.rename(srcPath, destPath)
for(i <- 0 to (streamNumber - 1)){
println(s"stream_${i} result is saved to ${experiments(i).resultPath}")
}
println(s"Performance summary is saved to ${destPath}")
sys.exit(0)