Skip to content

Commit

Permalink
BugFix: Fix direct-load write failure when there are empty spark part…
Browse files Browse the repository at this point in the history
…itions. (#14)
  • Loading branch information
yuanoOo authored Jan 22, 2025
1 parent 64900b0 commit ed18d70
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class DirectLoadWriter(oceanBaseConfig: OceanBaseConfig) extends Serializable {
}

private def flush(buffer: ArrayBuffer[Row], directLoader: DirectLoader): Unit = {
if (buffer.isEmpty) return

val bucket = new ObDirectLoadBucket()
buffer.foreach(
row => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,49 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
dropTables("products")
}

@Test
def testDirectLoadWithEmptySparkPartition(): Unit = {
initialize("sql/mysql/products.sql")

val session = SparkSession.builder().master("local[*]").getOrCreate()

session.sql(s"""
|CREATE TEMPORARY VIEW test_sink
|USING oceanbase
|OPTIONS(
| "url"= "$getJdbcUrlWithoutDB",
| "rpc-port" = "$getRpcPort",
| "schema-name"="$getSchemaName",
| "table-name"="products",
| "username"="$getUsername",
| "password"="$getPassword",
| "direct-load.enabled"=true,
| "direct-load.host"="$getHost",
| "direct-load.rpc-port"=$getRpcPort
|);
|""".stripMargin)

session
.sql("""
|INSERT INTO test_sink
|SELECT /*+ REPARTITION(3) */ 101, 'scooter', 'Small 2-wheel scooter', 3.14;
|""".stripMargin)
.repartition(3)

val expected: util.List[String] = util.Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.1400000000"
)
session.stop()

waitingAndAssertTableCount("products", expected.size)

val actual: util.List[String] = queryTable("products")

assertEqualsInAnyOrder(expected, actual)

dropTables("products")
}

@Test
def testDataFrameDirectLoadWrite(): Unit = {
initialize("sql/mysql/products.sql")
Expand Down

0 comments on commit ed18d70

Please sign in to comment.