From 1f912900e60f2173bf393888eda8ebda6b81cc6d Mon Sep 17 00:00:00 2001
From: linjc13 <linjc13@chinatelecom.cn>
Date: Wed, 8 Jan 2025 10:27:13 +0800
Subject: [PATCH] [FLINK-36793][cdc-source-connectors] Fix the problem with the
 block splitter logic of Oracle CDC incremental snapshot, causing the split
 slice to be too large

---
 .../splitter/JdbcSourceChunkSplitter.java     | 15 ++--
 .../splitter/OracleChunkSplitter.java         | 50 +++++++++--
 .../splitter/OracleChunkSplitterTest.java     | 85 +++++++++++++++++++
 .../dialect/SqlServerChunkSplitter.java       |  6 +-
 4 files changed, 141 insertions(+), 15 deletions(-)
 create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java

diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 280ed8f4684..29cd900e5ff 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -261,12 +261,16 @@ protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
     }
 
     /** ChunkEnd less than or equal to max. */
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
+    protected boolean isChunkEndLeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
+            throws SQLException {
         return ObjectUtils.compare(chunkEnd, max) <= 0;
     }
 
     /** ChunkEnd greater than or equal to max. */
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
+    protected boolean isChunkEndGeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
+            throws SQLException {
         return ObjectUtils.compare(chunkEnd, max) >= 0;
     }
 
@@ -389,7 +393,8 @@ private SnapshotSplit splitOneUnevenlySizedChunk(TableId tableId) throws SQLExce
                         chunkSize);
         // may sleep a while to avoid DDOS on MySQL server
         maySleep(nextChunkId, tableId);
-        if (chunkEnd != null && isChunkEndLeMax(chunkEnd, minMaxOfSplitColumn[1], splitColumn)) {
+        if (chunkEnd != null
+                && isChunkEndLeMax(jdbcConnection, chunkEnd, minMaxOfSplitColumn[1], splitColumn)) {
             nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
             return createSnapshotSplit(tableId, nextChunkId++, splitType, chunkStartVal, chunkEnd);
         } else {
@@ -489,7 +494,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
         Object chunkStart = null;
         Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
         int count = 0;
-        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) {
+        while (chunkEnd != null && isChunkEndLeMax(jdbcConnection, chunkEnd, max, splitColumn)) {
             // we start from [null, min + chunk_size) and avoid [null, min)
             splits.add(ChunkRange.of(chunkStart, chunkEnd));
             // may sleep a while to avoid DDOS on PostgreSQL server
@@ -518,7 +523,7 @@ private Object nextChunkEnd(
             // should query the next one larger than chunkEnd
             chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
         }
-        if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
+        if (isChunkEndGeMax(jdbc, chunkEnd, max, splitColumn)) {
             return null;
         } else {
             return chunkEnd;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index 0e69cdc5266..22f537f51ca 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -106,12 +106,29 @@ protected boolean isEvenlySplitColumn(Column splitColumn) {
 
     /** ChunkEnd less than or equal to max. */
     @Override
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
+    protected boolean isChunkEndLeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
+            throws SQLException {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
-            chunkEndMaxCompare =
-                    ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) max).getBytes())
-                            <= 0;
+            String query =
+                    String.format(
+                            "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS ASC");
+            return jdbc.prepareQueryAndMap(
+                    query,
+                    ps -> {
+                        ps.setObject(1, chunkEnd.toString());
+                        ps.setObject(2, max.toString());
+                    },
+                    rs -> {
+                        if (rs.next()) {
+                            Object obj = rs.getObject(1);
+                            return obj.toString().equals(chunkEnd.toString())
+                                    || chunkEnd.toString().equals(max.toString());
+                        } else {
+                            throw new RuntimeException("compare rowid error");
+                        }
+                    });
         } else {
             chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0;
         }
@@ -120,12 +137,29 @@ protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColum
 
     /** ChunkEnd greater than or equal to max. */
     @Override
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
+    protected boolean isChunkEndGeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn)
+            throws SQLException {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
-            chunkEndMaxCompare =
-                    ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) max).getBytes())
-                            >= 0;
+            String query =
+                    String.format(
+                            "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS DESC");
+            return jdbc.prepareQueryAndMap(
+                    query,
+                    ps -> {
+                        ps.setObject(1, chunkEnd.toString());
+                        ps.setObject(2, max.toString());
+                    },
+                    rs -> {
+                        if (rs.next()) {
+                            Object obj = rs.getObject(1);
+                            return obj.toString().equals(chunkEnd.toString())
+                                    || chunkEnd.toString().equals(max.toString());
+                        } else {
+                            throw new RuntimeException("compare rowid error");
+                        }
+                    });
         } else {
             chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd, max) >= 0;
         }
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
new file mode 100644
index 00000000000..84f69ec40af
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter;
+
+import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
+import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase;
+import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
+
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import oracle.sql.ROWID;
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** IT tests to cover tables chunk splitter process. */
+class OracleChunkSplitterTest extends OracleSourceTestBase {
+
+    @Test
+    void testIsChunkEndGeMax_Rowid_Case() throws SQLException {
+        String a = "AAAzIdACKAAABWCAAA";
+        String b = "AAAzIdAC/AACWIPAAB";
+        rowidGeMaxCheck(a, b, true);
+    }
+
+    @Test
+    void testIsChunkEndLeMax_Rowid_Case() throws SQLException {
+        String a = "AAAzIdACKAAABWCAAA";
+        String b = "AAAzIdAC/AACWIPAAB";
+        rowidLeMaxCheck(a, b, true);
+    }
+
+    private void rowidGeMaxCheck(String chunkEndStr, String maxStr, boolean expected)
+            throws SQLException {
+        JdbcConfiguration jdbcConfig =
+                JdbcConfiguration.create()
+                        .with(JdbcConfiguration.HOSTNAME, ORACLE_CONTAINER.getHost())
+                        .with(JdbcConfiguration.PORT, ORACLE_CONTAINER.getOraclePort())
+                        .with(JdbcConfiguration.USER, ORACLE_CONTAINER.getUsername())
+                        .with(JdbcConfiguration.PASSWORD, ORACLE_CONTAINER.getPassword())
+                        .with(JdbcConfiguration.DATABASE, ORACLE_CONTAINER.getDatabaseName())
+                        .build();
+        JdbcConnection jdbc = OracleConnectionUtils.createOracleConnection(jdbcConfig);
+        ROWID chunkEnd = new ROWID(chunkEndStr);
+        ROWID max = new ROWID(maxStr);
+        ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, null, null);
+        OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, chunkSplitterState);
+        assertTrue(splitter.isChunkEndGeMax(jdbc, chunkEnd, max, null) == expected);
+    }
+
+    private void rowidLeMaxCheck(String chunkEndStr, String maxStr, boolean expected)
+            throws SQLException {
+        JdbcConfiguration jdbcConfig =
+                JdbcConfiguration.create()
+                        .with(JdbcConfiguration.HOSTNAME, ORACLE_CONTAINER.getHost())
+                        .with(JdbcConfiguration.PORT, ORACLE_CONTAINER.getOraclePort())
+                        .with(JdbcConfiguration.USER, ORACLE_CONTAINER.getUsername())
+                        .with(JdbcConfiguration.PASSWORD, ORACLE_CONTAINER.getPassword())
+                        .with(JdbcConfiguration.DATABASE, ORACLE_CONTAINER.getDatabaseName())
+                        .build();
+        JdbcConnection jdbc = OracleConnectionUtils.createOracleConnection(jdbcConfig);
+        ROWID chunkEnd = new ROWID(chunkEndStr);
+        ROWID max = new ROWID(maxStr);
+        ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, null, null);
+        OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, chunkSplitterState);
+        assertTrue(splitter.isChunkEndLeMax(jdbc, chunkEnd, max, null) == expected);
+    }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index 1576c052c76..7b74c754018 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -69,12 +69,14 @@ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
         return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
     }
 
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
+    protected boolean isChunkEndLeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) {
         return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
     }
 
     /** ChunkEnd greater than or equal to max. */
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
+    protected boolean isChunkEndGeMax(
+            JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) {
         return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
     }
 }