From ed18d70e39b8d84ab3516a9b3654086a2cfc6873 Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Wed, 22 Jan 2025 17:33:53 +0800 Subject: [PATCH] BugFix: Fix direct-load write failure when there are empty spark partitions. (#14) --- .../spark/writer/DirectLoadWriter.scala | 2 + .../spark/OceanBaseMySQLConnectorITCase.scala | 43 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala index 059724e..30b9305 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/writer/DirectLoadWriter.scala @@ -61,6 +61,8 @@ class DirectLoadWriter(oceanBaseConfig: OceanBaseConfig) extends Serializable { } private def flush(buffer: ArrayBuffer[Row], directLoader: DirectLoader): Unit = { + if (buffer.isEmpty) return + val bucket = new ObDirectLoadBucket() buffer.foreach( row => { diff --git a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala index d32b41e..8c22eaf 100644 --- a/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala +++ b/spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala @@ -137,6 +137,49 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase { dropTables("products") } + @Test + def testDirectLoadWithEmptySparkPartition(): Unit = { + initialize("sql/mysql/products.sql") + + val session = SparkSession.builder().master("local[*]").getOrCreate() + + session.sql(s""" + |CREATE TEMPORARY VIEW test_sink + |USING oceanbase + |OPTIONS( + | "url"= "$getJdbcUrlWithoutDB", + | "rpc-port" = "$getRpcPort", + | "schema-name"="$getSchemaName", + | "table-name"="products", + | "username"="$getUsername", + | "password"="$getPassword", + | "direct-load.enabled"=true, + | "direct-load.host"="$getHost", + | "direct-load.rpc-port"=$getRpcPort + |); + |""".stripMargin) + + session + .sql(""" + |INSERT INTO test_sink + |SELECT /*+ REPARTITION(3) */ 101, 'scooter', 'Small 2-wheel scooter', 3.14; + |""".stripMargin) + .repartition(3) + + val expected: util.List[String] = util.Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000" + ) + session.stop() + + waitingAndAssertTableCount("products", expected.size) + + val actual: util.List[String] = queryTable("products") + + assertEqualsInAnyOrder(expected, actual) + + dropTables("products") + } + @Test def testDataFrameDirectLoadWrite(): Unit = { initialize("sql/mysql/products.sql")