Skip to content

Commit 7d879d8

Browse files
committed
added check before setting checkpoint directory
1 parent ecfc4e7 commit 7d879d8

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

spark/core/src/main/java/zingg/spark/core/executor/ZinggSparkContext.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5+
import org.apache.spark.SparkContext;
56
import org.apache.spark.api.java.JavaSparkContext;
67
import org.apache.spark.sql.Column;
78
import org.apache.spark.sql.Dataset;
@@ -71,12 +72,16 @@ public void init(IZinggLicense license)
7172
zSession = new ZSparkSession(spark, license);
7273
}
7374
if (ctx==null) {
74-
ctx = JavaSparkContext.fromSparkContext(zSession.getSession().sparkContext());
75+
SparkContext sparkContext = zSession.getSession().sparkContext();
76+
if (sparkContext.getCheckpointDir().isEmpty()) {
77+
sparkContext.setCheckpointDir("/tmp/checkpoint");
78+
}
79+
ctx = JavaSparkContext.fromSparkContext(sparkContext);
7580
JavaSparkContext.jarOfClass(IZingg.class);
7681
LOG.debug("Context " + ctx.toString());
7782
//initHashFns();
7883
if (!ctx.getCheckpointDir().isPresent()) {
79-
ctx.setCheckpointDir("/tmp/checkpoint");
84+
ctx.setCheckpointDir(sparkContext.getCheckpointDir().get());
8085
}
8186
setUtils();
8287
}

spark/core/src/test/java/zingg/spark/core/executor/ZinggSparkTester.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import org.apache.commons.logging.Log;
88
import org.apache.commons.logging.LogFactory;
9+
import org.apache.spark.SparkContext;
910
import org.apache.spark.api.java.JavaSparkContext;
1011
import org.apache.spark.sql.Dataset;
1112
import org.apache.spark.sql.Row;
@@ -49,15 +50,20 @@ public static void setup() {
4950
.master("local[*]")
5051
.appName("Zingg" + "Junit")
5152
.getOrCreate();
52-
ctx = new JavaSparkContext(spark.sparkContext());
53+
SparkContext sparkContext = spark.sparkContext();
54+
if (sparkContext.getCheckpointDir().isEmpty()) {
55+
sparkContext.setCheckpointDir("/tmp/checkpoint");
56+
}
57+
ctx = new JavaSparkContext(sparkContext);
5358
JavaSparkContext.jarOfClass(IZingg.class);
5459
args = new Arguments();
5560
zsCTX = new ZinggSparkContext();
5661
zsCTX.ctx = ctx;
5762
zSession = new ZSparkSession(spark, null);
5863
zsCTX.zSession = zSession;
59-
60-
ctx.setCheckpointDir("/tmp/checkpoint");
64+
if (!ctx.getCheckpointDir().isPresent()) {
65+
ctx.setCheckpointDir(sparkContext.getCheckpointDir().get());
66+
}
6167
zsCTX.setPipeUtil(new SparkPipeUtil(zSession));
6268
zsCTX.setDSUtil(new SparkDSUtil(zSession));
6369
zsCTX.setHashUtil(new SparkHashUtil(zSession));

0 commit comments

Comments
 (0)