Skip to content

Commit f5a9f7c

Browse files
committed
[spark] Disable v2 write in v1 append location unit tests
1 parent 0446894 commit f5a9f7c

File tree

8 files changed

+464
-447
lines changed

8 files changed

+464
-447
lines changed

.github/workflows/utitcase-spark-3.x.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ jobs:
6363
jvm_timezone=$(random_timezone)
6464
echo "JVM timezone is set to $jvm_timezone"
6565
test_modules=""
66-
for suffix in ut 3.5 3.4 3.3 3.2; do
66+
for suffix in ut 3.5 3.4; do
6767
test_modules+="org.apache.paimon:paimon-spark-${suffix}_${{ matrix.scala_version }},"
6868
done
6969
test_modules="${test_modules%,}"

paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.parquet.column.ParquetProperties;
2323
import org.apache.parquet.column.ParquetProperties.WriterVersion;
24+
import org.apache.parquet.conf.HadoopParquetConfiguration;
25+
import org.apache.parquet.conf.ParquetConfiguration;
2426
import org.apache.parquet.crypto.FileEncryptionProperties;
2527
import org.apache.parquet.hadoop.api.WriteSupport;
2628
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -69,7 +71,8 @@ public class ParquetWriter<T> implements Closeable {
6971
int maxPaddingSize,
7072
ParquetProperties encodingProps)
7173
throws IOException {
72-
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
74+
HadoopParquetConfiguration parquetConf = new HadoopParquetConfiguration(conf);
75+
WriteSupport.WriteContext writeContext = writeSupport.init(parquetConf);
7376
MessageType schema = writeContext.getSchema();
7477

7578
ParquetFileWriter fileWriter =

paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import java.sql.Date
2929
class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
3030

3131
override protected def sparkConf: SparkConf = {
32-
super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
32+
super.sparkConf
33+
.set("spark.sql.catalog.paimon.cache-enabled", "false")
34+
.set("spark.paimon.write.use-v2-write", "false")
3335
}
3436

3537
import testImplicits._

paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala

Lines changed: 102 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -29,107 +29,109 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
2929
import testImplicits._
3030

3131
test("Paimon Procedure: create and delete tag") {
32-
failAfter(streamingTimeout) {
33-
withTempDir {
34-
checkpointDir =>
35-
// define a pk table and test `forEachBatch` api
36-
spark.sql(s"""
37-
|CREATE TABLE T (a INT, b STRING)
38-
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
39-
|""".stripMargin)
40-
val location = loadTable("T").location().toString
41-
42-
val inputData = MemoryStream[(Int, String)]
43-
val stream = inputData
44-
.toDS()
45-
.toDF("a", "b")
46-
.writeStream
47-
.option("checkpointLocation", checkpointDir.getCanonicalPath)
48-
.foreachBatch {
49-
(batch: Dataset[Row], _: Long) =>
50-
batch.write.format("paimon").mode("append").save(location)
32+
withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) {
33+
failAfter(streamingTimeout) {
34+
withTempDir {
35+
checkpointDir =>
36+
// define a pk table and test `forEachBatch` api
37+
spark.sql(s"""
38+
|CREATE TABLE T (a INT, b STRING)
39+
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
40+
|""".stripMargin)
41+
val location = loadTable("T").location().toString
42+
43+
val inputData = MemoryStream[(Int, String)]
44+
val stream = inputData
45+
.toDS()
46+
.toDF("a", "b")
47+
.writeStream
48+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
49+
.foreachBatch {
50+
(batch: Dataset[Row], _: Long) =>
51+
batch.write.format("paimon").mode("append").save(location)
52+
}
53+
.start()
54+
55+
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
56+
57+
try {
58+
// snapshot-1
59+
inputData.addData((1, "a"))
60+
stream.processAllAvailable()
61+
checkAnswer(query(), Row(1, "a") :: Nil)
62+
63+
// snapshot-2
64+
inputData.addData((2, "b"))
65+
stream.processAllAvailable()
66+
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
67+
68+
// snapshot-3
69+
inputData.addData((2, "b2"))
70+
stream.processAllAvailable()
71+
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
72+
checkAnswer(
73+
spark.sql(
74+
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"),
75+
Row(true) :: Nil)
76+
checkAnswer(
77+
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
78+
Row("test_tag") :: Nil)
79+
// test rename_tag
80+
checkAnswer(
81+
spark.sql(
82+
"CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag', target_tag => 'test_tag_1')"),
83+
Row(true) :: Nil)
84+
checkAnswer(
85+
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
86+
Row("test_tag_1") :: Nil)
87+
checkAnswer(
88+
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_1')"),
89+
Row(true) :: Nil)
90+
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
91+
checkAnswer(
92+
spark.sql(
93+
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
94+
Row(true) :: Nil)
95+
checkAnswer(
96+
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
97+
Row("test_latestSnapshot_tag") :: Nil)
98+
checkAnswer(
99+
spark.sql(
100+
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
101+
Row(true) :: Nil)
102+
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
103+
104+
// snapshot-4
105+
inputData.addData((2, "c1"))
106+
stream.processAllAvailable()
107+
checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Nil)
108+
109+
checkAnswer(
110+
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's4')"),
111+
Row(true) :: Nil)
112+
113+
// snapshot-5
114+
inputData.addData((3, "c2"))
115+
stream.processAllAvailable()
116+
checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Row(3, "c2") :: Nil)
117+
118+
checkAnswer(
119+
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's5')"),
120+
Row(true) :: Nil)
121+
122+
checkAnswer(
123+
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
124+
Row("s4") :: Row("s5") :: Nil)
125+
126+
checkAnswer(
127+
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 's4,s5')"),
128+
Row(true) :: Nil)
129+
130+
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
131+
} finally {
132+
stream.stop()
51133
}
52-
.start()
53-
54-
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
55-
56-
try {
57-
// snapshot-1
58-
inputData.addData((1, "a"))
59-
stream.processAllAvailable()
60-
checkAnswer(query(), Row(1, "a") :: Nil)
61-
62-
// snapshot-2
63-
inputData.addData((2, "b"))
64-
stream.processAllAvailable()
65-
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
66-
67-
// snapshot-3
68-
inputData.addData((2, "b2"))
69-
stream.processAllAvailable()
70-
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
71-
checkAnswer(
72-
spark.sql(
73-
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"),
74-
Row(true) :: Nil)
75-
checkAnswer(
76-
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
77-
Row("test_tag") :: Nil)
78-
// test rename_tag
79-
checkAnswer(
80-
spark.sql(
81-
"CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag', target_tag => 'test_tag_1')"),
82-
Row(true) :: Nil)
83-
checkAnswer(
84-
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
85-
Row("test_tag_1") :: Nil)
86-
checkAnswer(
87-
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_1')"),
88-
Row(true) :: Nil)
89-
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
90-
checkAnswer(
91-
spark.sql(
92-
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
93-
Row(true) :: Nil)
94-
checkAnswer(
95-
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
96-
Row("test_latestSnapshot_tag") :: Nil)
97-
checkAnswer(
98-
spark.sql(
99-
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
100-
Row(true) :: Nil)
101-
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
102-
103-
// snapshot-4
104-
inputData.addData((2, "c1"))
105-
stream.processAllAvailable()
106-
checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Nil)
107-
108-
checkAnswer(
109-
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's4')"),
110-
Row(true) :: Nil)
111-
112-
// snapshot-5
113-
inputData.addData((3, "c2"))
114-
stream.processAllAvailable()
115-
checkAnswer(query(), Row(1, "a") :: Row(2, "c1") :: Row(3, "c2") :: Nil)
116-
117-
checkAnswer(
118-
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 's5')"),
119-
Row(true) :: Nil)
120-
121-
checkAnswer(
122-
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
123-
Row("s4") :: Row("s5") :: Nil)
124-
125-
checkAnswer(
126-
spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 's4,s5')"),
127-
Row(true) :: Nil)
128-
129-
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
130-
} finally {
131-
stream.stop()
132-
}
134+
}
133135
}
134136
}
135137
}

paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -29,68 +29,70 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
2929
import testImplicits._
3030

3131
test("Paimon Procedure: rollback to snapshot and tag") {
32-
failAfter(streamingTimeout) {
33-
withTempDir {
34-
checkpointDir =>
35-
// define a pk table and test `forEachBatch` api
36-
spark.sql(s"""
37-
|CREATE TABLE T (a INT, b STRING)
38-
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
39-
|""".stripMargin)
40-
val location = loadTable("T").location().toString
32+
withSparkSQLConf(("spark.paimon.write.use-v2-write", "false")) {
33+
failAfter(streamingTimeout) {
34+
withTempDir {
35+
checkpointDir =>
36+
// define a pk table and test `forEachBatch` api
37+
spark.sql(s"""
38+
|CREATE TABLE T (a INT, b STRING)
39+
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
40+
|""".stripMargin)
41+
val location = loadTable("T").location().toString
4142

42-
val inputData = MemoryStream[(Int, String)]
43-
val stream = inputData
44-
.toDS()
45-
.toDF("a", "b")
46-
.writeStream
47-
.option("checkpointLocation", checkpointDir.getCanonicalPath)
48-
.foreachBatch {
49-
(batch: Dataset[Row], _: Long) =>
50-
batch.write.format("paimon").mode("append").save(location)
51-
}
52-
.start()
43+
val inputData = MemoryStream[(Int, String)]
44+
val stream = inputData
45+
.toDS()
46+
.toDF("a", "b")
47+
.writeStream
48+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
49+
.foreachBatch {
50+
(batch: Dataset[Row], _: Long) =>
51+
batch.write.format("paimon").mode("append").save(location)
52+
}
53+
.start()
5354

54-
val table = loadTable("T")
55-
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
55+
val table = loadTable("T")
56+
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
5657

57-
try {
58-
// snapshot-1
59-
inputData.addData((1, "a"))
60-
stream.processAllAvailable()
61-
checkAnswer(query(), Row(1, "a") :: Nil)
58+
try {
59+
// snapshot-1
60+
inputData.addData((1, "a"))
61+
stream.processAllAvailable()
62+
checkAnswer(query(), Row(1, "a") :: Nil)
6263

63-
checkAnswer(
64-
spark.sql(
65-
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"),
66-
Row(true) :: Nil)
64+
checkAnswer(
65+
spark.sql(
66+
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 1)"),
67+
Row(true) :: Nil)
6768

68-
// snapshot-2
69-
inputData.addData((2, "b"))
70-
stream.processAllAvailable()
71-
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
69+
// snapshot-2
70+
inputData.addData((2, "b"))
71+
stream.processAllAvailable()
72+
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
7273

73-
// snapshot-3
74-
inputData.addData((2, "b2"))
75-
stream.processAllAvailable()
76-
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
77-
assertThrows[RuntimeException] {
78-
spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')")
79-
}
80-
// rollback to snapshot
81-
checkAnswer(
82-
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"),
83-
Row(table.latestSnapshot().get().id, 2) :: Nil)
84-
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
74+
// snapshot-3
75+
inputData.addData((2, "b2"))
76+
stream.processAllAvailable()
77+
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
78+
assertThrows[RuntimeException] {
79+
spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')")
80+
}
81+
// rollback to snapshot
82+
checkAnswer(
83+
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"),
84+
Row(table.latestSnapshot().get().id, 2) :: Nil)
85+
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
8586

86-
// rollback to tag
87-
checkAnswer(
88-
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"),
89-
Row(table.latestSnapshot().get().id, 1) :: Nil)
90-
checkAnswer(query(), Row(1, "a") :: Nil)
91-
} finally {
92-
stream.stop()
93-
}
87+
// rollback to tag
88+
checkAnswer(
89+
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => 'test_tag')"),
90+
Row(table.latestSnapshot().get().id, 1) :: Nil)
91+
checkAnswer(query(), Row(1, "a") :: Nil)
92+
} finally {
93+
stream.stop()
94+
}
95+
}
9496
}
9597
}
9698
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class SparkConnectorOptions {
4949
public static final ConfigOption<Boolean> USE_V2_WRITE =
5050
key("write.use-v2-write")
5151
.booleanType()
52-
.defaultValue(false)
52+
.defaultValue(true)
5353
.withDescription(
5454
"If true, v2 write will be used. Currently, only HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other bucket modes. Currently, Spark V2 write does not support TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.");
5555

0 commit comments

Comments
 (0)