Skip to content

Commit 7ad79c4

Browse files
committed
added check before setting checkpoint directory
1 parent 4a02faa commit 7ad79c4

File tree

3 files changed

+25
-21
lines changed

3 files changed

+25
-21
lines changed

spark/client/src/main/java/zingg/spark/client/SparkClient.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package zingg.spark.client;
22

3+
import org.apache.spark.SparkContext;
34
import org.apache.spark.api.java.JavaSparkContext;
45
import org.apache.spark.sql.Column;
56
import org.apache.spark.sql.Dataset;
@@ -79,12 +80,18 @@ public SparkSession getSession() {
7980
SparkSession s = SparkSession
8081
.builder()
8182
.appName("Zingg")
82-
.getOrCreate();
83-
JavaSparkContext ctx = JavaSparkContext.fromSparkContext(s.sparkContext());
83+
.getOrCreate();
84+
SparkContext sparkContext = s.sparkContext();
85+
if (sparkContext.getCheckpointDir().isEmpty()) {
86+
sparkContext.setCheckpointDir("/tmp/checkpoint");
87+
}
88+
JavaSparkContext ctx = JavaSparkContext.fromSparkContext(sparkContext);
8489
JavaSparkContext.jarOfClass(IZingg.class);
8590
LOG.debug("Context " + ctx.toString());
8691
//initHashFns();
87-
ctx.setCheckpointDir("/tmp/checkpoint");
92+
if (!ctx.getCheckpointDir().isPresent()) {
93+
ctx.setCheckpointDir(String.valueOf(sparkContext.getCheckpointDir()));
94+
}
8895
setSession(s);
8996
return s;
9097
}

spark/core/src/test/java/zingg/spark/core/executor/TestSparkExecutorsCompound.java

+6-16
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,26 @@
44

55
import org.apache.commons.logging.Log;
66
import org.apache.commons.logging.LogFactory;
7-
import org.apache.spark.api.java.JavaSparkContext;
87
import org.apache.spark.sql.Column;
98
import org.apache.spark.sql.Dataset;
109
import org.apache.spark.sql.Row;
1110
import org.apache.spark.sql.SparkSession;
1211
import org.apache.spark.sql.types.DataType;
1312

14-
import zingg.common.client.IZingg;
13+
import org.junit.jupiter.api.extension.ExtendWith;
1514
import zingg.common.client.ZinggClientException;
1615
import zingg.common.client.util.DFObjectUtil;
1716
import zingg.common.client.util.IWithSession;
1817
import zingg.common.client.util.WithSession;
1918
import zingg.common.core.executor.TestExecutorsCompound;
2019
import zingg.common.core.executor.TrainMatcher;
2120
import zingg.spark.client.util.SparkDFObjectUtil;
21+
import zingg.spark.core.TestSparkBase;
2222
import zingg.spark.core.context.ZinggSparkContext;
2323
import zingg.spark.core.executor.labeller.ProgrammaticSparkLabeller;
2424
import zingg.spark.core.executor.validate.SparkTrainMatchValidator;
2525

26+
@ExtendWith(TestSparkBase.class)
2627
public class TestSparkExecutorsCompound extends TestExecutorsCompound<SparkSession,Dataset<Row>,Row,Column,DataType> {
2728
protected static final String CONFIG_FILE = "zingg/spark/core/executor/configSparkIntTest.json";
2829
protected static final String TEST_DATA_FILE = "zingg/spark/core/executor/test.csv";
@@ -31,22 +32,11 @@ public class TestSparkExecutorsCompound extends TestExecutorsCompound<SparkSessi
3132

3233
protected ZinggSparkContext ctx;
3334

34-
public TestSparkExecutorsCompound() throws IOException, ZinggClientException {
35-
SparkSession spark = SparkSession
36-
.builder()
37-
.master("local[*]")
38-
.appName("Zingg" + "Junit")
39-
.getOrCreate();
40-
41-
JavaSparkContext ctx1 = new JavaSparkContext(spark.sparkContext());
42-
JavaSparkContext.jarOfClass(IZingg.class);
43-
ctx1.setCheckpointDir("/tmp/checkpoint");
44-
35+
public TestSparkExecutorsCompound(SparkSession sparkSession) throws IOException, ZinggClientException {
4536
this.ctx = new ZinggSparkContext();
46-
this.ctx.setSession(spark);
37+
this.ctx.setSession(sparkSession);
4738
this.ctx.setUtils();
48-
init(spark);
49-
//setupArgs();
39+
init(sparkSession);
5040
}
5141

5242
@Override

spark/core/src/test/java/zingg/spark/core/session/SparkSessionProvider.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5+
import org.apache.spark.SparkContext;
56
import org.apache.spark.api.java.JavaSparkContext;
67
import org.apache.spark.sql.SparkSession;
78
import zingg.common.client.Arguments;
@@ -28,9 +29,15 @@ private void initializeSession() {
2829
.appName("ZinggJunit")
2930
.config("spark.debug.maxToStringFields", 100)
3031
.getOrCreate();
31-
javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
32+
SparkContext sparkContext = sparkSession.sparkContext();
33+
if (sparkContext.getCheckpointDir().isEmpty()) {
34+
sparkContext.setCheckpointDir("/tmp/checkpoint");
35+
}
36+
javaSparkContext = new JavaSparkContext(sparkContext);
3237
JavaSparkContext.jarOfClass(IZingg.class);
33-
javaSparkContext.setCheckpointDir("/tmp/checkpoint");
38+
if (!javaSparkContext.getCheckpointDir().isPresent()) {
39+
javaSparkContext.setCheckpointDir(String.valueOf(sparkContext.getCheckpointDir()));
40+
}
3441
args = new Arguments();
3542
zinggSparkContext = new ZinggSparkContext();
3643
zinggSparkContext.init(sparkSession);

0 commit comments

Comments
 (0)