Skip to content

Commit d776eb5

Browse files
committedOct 29, 2024·
added singleton SparkSessionProvider
1 parent 8603ff1 commit d776eb5

File tree

3 files changed

+85
-42
lines changed

3 files changed

+85
-42
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package zingg.session;
2+
3+
import org.apache.commons.logging.Log;
4+
import org.apache.commons.logging.LogFactory;
5+
import org.apache.spark.api.java.JavaSparkContext;
6+
import org.apache.spark.sql.SparkSession;
7+
import zingg.common.client.Arguments;
8+
import zingg.common.client.IArguments;
9+
import zingg.common.client.IZingg;
10+
import zingg.spark.core.context.ZinggSparkContext;
11+
12+
public class SparkSessionProvider {
13+
14+
private static SparkSessionProvider sparkSessionProvider;
15+
16+
private SparkSession sparkSession;
17+
private JavaSparkContext javaSparkContext;
18+
private ZinggSparkContext zinggSparkContext;
19+
private IArguments args;
20+
public static final Log LOG = LogFactory.getLog(SparkSessionProvider.class);
21+
22+
private void initializeSession() {
23+
if (sparkSession == null) {
24+
try {
25+
sparkSession = SparkSession
26+
.builder()
27+
.master("local[*]")
28+
.appName("ZinggJunit")
29+
.config("spark.debug.maxToStringFields", 100)
30+
.getOrCreate();
31+
javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
32+
JavaSparkContext.jarOfClass(IZingg.class);
33+
javaSparkContext.setCheckpointDir("/tmp/checkpoint");
34+
args = new Arguments();
35+
zinggSparkContext = new ZinggSparkContext();
36+
zinggSparkContext.init(sparkSession);
37+
} catch (Throwable e) {
38+
if (LOG.isDebugEnabled())
39+
e.printStackTrace();
40+
LOG.info("Problem in spark env setup");
41+
}
42+
} else {
43+
LOG.info("Spark session already active, ignoring create spark session!");
44+
}
45+
}
46+
47+
public static SparkSessionProvider getInstance() {
48+
if (sparkSessionProvider == null) {
49+
sparkSessionProvider = new SparkSessionProvider();
50+
sparkSessionProvider.initializeSession();
51+
}
52+
return sparkSessionProvider;
53+
}
54+
55+
56+
57+
//set getters
58+
public SparkSession getSparkSession() {
59+
return this.sparkSession;
60+
}
61+
62+
public JavaSparkContext getJavaSparkContext() {
63+
return this.javaSparkContext;
64+
}
65+
66+
public ZinggSparkContext getZinggSparkContext() {
67+
return this.zinggSparkContext;
68+
}
69+
70+
public IArguments getArgs() {
71+
return this.args;
72+
}
73+
}

‎spark/core/src/test/java/zingg/spark/core/executor/TestSparkExecutors.java

+6-15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import zingg.common.core.executor.TrainMatcher;
2222
import zingg.common.core.executor.Trainer;
2323
import zingg.common.core.executor.TrainerValidator;
24+
import zingg.session.SparkSessionProvider;
2425
import zingg.spark.core.context.ZinggSparkContext;
2526

2627
public class TestSparkExecutors extends TestExecutorsGeneric<SparkSession,Dataset<Row>,Row,Column,DataType> {
@@ -35,21 +36,11 @@ public class TestSparkExecutors extends TestExecutorsGeneric<SparkSession,Datase
3536

3637
protected ZinggSparkContext ctx;
3738

38-
public TestSparkExecutors() throws IOException, ZinggClientException {
39-
SparkSession spark = SparkSession
40-
.builder()
41-
.master("local[*]")
42-
.appName("Zingg" + "Junit")
43-
.getOrCreate();
44-
45-
JavaSparkContext ctx1 = new JavaSparkContext(spark.sparkContext());
46-
JavaSparkContext.jarOfClass(IZingg.class);
47-
ctx1.setCheckpointDir("/tmp/checkpoint");
48-
49-
this.ctx = new ZinggSparkContext();
50-
this.ctx.setSession(spark);
51-
this.ctx.setUtils();
52-
init(spark);
39+
public TestSparkExecutors() throws IOException, ZinggClientException {
40+
SparkSessionProvider sparkSessionProvider = SparkSessionProvider.getInstance();
41+
ctx = sparkSessionProvider.getZinggSparkContext();
42+
ctx.setSession(sparkSessionProvider.getSparkSession());
43+
init(sparkSessionProvider.getSparkSession());
5344
}
5445

5546
@Override

‎spark/core/src/test/java/zingg/spark/core/executor/ZinggSparkTester.java

+6-27
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import zingg.common.client.IArguments;
2222
import zingg.common.client.IZingg;
2323

24+
import zingg.session.SparkSessionProvider;
2425
import zingg.spark.core.context.ZinggSparkContext;
2526

2627
public class ZinggSparkTester {
@@ -37,33 +38,11 @@ public class ZinggSparkTester {
3738

3839
@BeforeAll
3940
public static void setup() {
40-
try {
41-
spark = SparkSession
42-
.builder()
43-
.master("local[*]")
44-
.appName("ZinggJunit")
45-
.config("spark.debug.maxToStringFields", 100)
46-
.getOrCreate();
47-
ctx = new JavaSparkContext(spark.sparkContext());
48-
JavaSparkContext.jarOfClass(IZingg.class);
49-
ctx.setCheckpointDir("/tmp/checkpoint");
50-
args = new Arguments();
51-
zsCTX = new ZinggSparkContext();
52-
zsCTX.init(spark);
53-
} catch (Throwable e) {
54-
if (LOG.isDebugEnabled())
55-
e.printStackTrace();
56-
LOG.info("Problem in spark env setup");
57-
}
58-
}
59-
60-
@AfterAll
61-
public static void teardown() {
62-
if (ctx != null)
63-
ctx.stop();
64-
65-
if (spark != null)
66-
spark.stop();
41+
SparkSessionProvider sparkSessionProvider = SparkSessionProvider.getInstance();
42+
spark = sparkSessionProvider.getSparkSession();
43+
ctx = sparkSessionProvider.getJavaSparkContext();
44+
args = sparkSessionProvider.getArgs();
45+
zsCTX = sparkSessionProvider.getZinggSparkContext();
6746
}
6847

6948
public Dataset<Row> createDFWithDoubles(int numRows, int numCols) {

0 commit comments

Comments
 (0)
Please sign in to comment.