From 17f73fef0614868151f0a8e212f44b68320db955 Mon Sep 17 00:00:00 2001 From: loneylee Date: Thu, 21 Mar 2024 17:36:39 +0800 Subject: [PATCH] add spark session for remote fs --- ...reeWriteOnObjectStorageAbstractSuite.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala index e6a5226155d8f..cff3664946737 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala @@ -16,8 +16,15 @@ */ package io.glutenproject.execution +import io.glutenproject.GlutenConfig + +import org.apache.spark.sql.SparkSession + import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf} import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.commons.io.FileUtils + +import java.io.File // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit @@ -25,6 +32,9 @@ import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { + private var _spark: SparkSession = _ + + override protected def spark: SparkSession = _spark override protected val needCopyParquetToTablePath = true @@ -52,8 +62,19 @@ class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E" + override protected def initializeSession(): Unit = { + if (_spark == null) { + _spark = SparkSession + .builder() + .appName("Gluten-UT-RemoteHS") + .config(sparkConf) + .getOrCreate() + } + } + override protected def sparkConf: SparkConf = { super.sparkConf + .setMaster("local[2]") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.io.compression.codec", "LZ4") .set("spark.sql.shuffle.partitions", "5") @@ -140,5 +161,28 @@ class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite createNotNullTPCHTablesInParquet(tablesPath) } + override protected def afterAll(): Unit = { + try { + super.afterAll() + } finally { + try { + if (_spark != null) { + try { + _spark.sessionState.catalog.reset() + } finally { + _spark.stop() + _spark = null + } + } + } finally { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + } + + FileUtils.forceDelete(new File(basePath)) + // init GlutenConfig in the next beforeAll + GlutenConfig.ins = null + } } // scalastyle:off line.size.limit