diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java index 280ed8f4684..29cd900e5ff 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java @@ -261,12 +261,16 @@ protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { } /** ChunkEnd less than or equal to max. */ - protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { + protected boolean isChunkEndLeMax( + JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) + throws SQLException { return ObjectUtils.compare(chunkEnd, max) <= 0; } /** ChunkEnd greater than or equal to max. */ - protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { + protected boolean isChunkEndGeMax( + JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) + throws SQLException { return ObjectUtils.compare(chunkEnd, max) >= 0; } @@ -389,7 +393,8 @@ private SnapshotSplit splitOneUnevenlySizedChunk(TableId tableId) throws SQLExce chunkSize); // may sleep a while to avoid DDOS on MySQL server maySleep(nextChunkId, tableId); - if (chunkEnd != null && isChunkEndLeMax(chunkEnd, minMaxOfSplitColumn[1], splitColumn)) { + if (chunkEnd != null + && isChunkEndLeMax(jdbcConnection, chunkEnd, minMaxOfSplitColumn[1], splitColumn)) { nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd); return createSnapshotSplit(tableId, nextChunkId++, splitType, chunkStartVal, chunkEnd); } else { @@ -489,7 +494,7 @@ private List splitUnevenlySizedChunks( Object chunkStart = null; Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; - while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) { + while (chunkEnd != null && isChunkEndLeMax(jdbcConnection, chunkEnd, max, splitColumn)) { // we start from [null, min + chunk_size) and avoid [null, min) splits.add(ChunkRange.of(chunkStart, chunkEnd)); // may sleep a while to avoid DDOS on PostgreSQL server @@ -518,7 +523,7 @@ private Object nextChunkEnd( // should query the next one larger than chunkEnd chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } - if (isChunkEndGeMax(chunkEnd, max, splitColumn)) { + if (isChunkEndGeMax(jdbc, chunkEnd, max, splitColumn)) { return null; } else { return chunkEnd; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java index 0e69cdc5266..22f537f51ca 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java @@ -106,12 +106,29 @@ protected boolean isEvenlySplitColumn(Column splitColumn) { /** ChunkEnd less than or equal to max. */ @Override - protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { + protected boolean isChunkEndLeMax( + JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) + throws SQLException { boolean chunkEndMaxCompare; if (chunkEnd instanceof ROWID && max instanceof ROWID) { - chunkEndMaxCompare = - ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) max).getBytes()) - <= 0; + String query = + String.format( + "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS ASC"); + return jdbc.prepareQueryAndMap( + query, + ps -> { + ps.setObject(1, chunkEnd.toString()); + ps.setObject(2, max.toString()); + }, + rs -> { + if (rs.next()) { + Object obj = rs.getObject(1); + return obj.toString().equals(chunkEnd.toString()) + || chunkEnd.toString().equals(max.toString()); + } else { + throw new RuntimeException("compare rowid error"); + } + }); } else { chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0; } @@ -120,12 +137,29 @@ protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColum /** ChunkEnd greater than or equal to max. */ @Override - protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { + protected boolean isChunkEndGeMax( + JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) + throws SQLException { boolean chunkEndMaxCompare; if (chunkEnd instanceof ROWID && max instanceof ROWID) { - chunkEndMaxCompare = - ROWID.compareBytes(((ROWID) chunkEnd).getBytes(), ((ROWID) max).getBytes()) - >= 0; + String query = + String.format( + "SELECT CHARTOROWID(?) ROWIDS FROM DUAL UNION SELECT CHARTOROWID(?) ROWIDS FROM DUAL ORDER BY ROWIDS DESC"); + return jdbc.prepareQueryAndMap( + query, + ps -> { + ps.setObject(1, chunkEnd.toString()); + ps.setObject(2, max.toString()); + }, + rs -> { + if (rs.next()) { + Object obj = rs.getObject(1); + return obj.toString().equals(chunkEnd.toString()) + || chunkEnd.toString().equals(max.toString()); + } else { + throw new RuntimeException("compare rowid error"); + } + }); } else { chunkEndMaxCompare = chunkEnd != null && ObjectUtils.compare(chunkEnd, max) >= 0; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java new file mode 100644 index 00000000000..84f69ec40af --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitterTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source.assigner.splitter; + +import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState; +import org.apache.flink.cdc.connectors.oracle.source.OracleSourceTestBase; +import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils; + +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import oracle.sql.ROWID; +import org.junit.jupiter.api.Test; + +import java.sql.SQLException; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** IT tests to cover tables chunk splitter process. */ +class OracleChunkSplitterTest extends OracleSourceTestBase { + + @Test + void testIsChunkEndGeMax_Rowid_Case() throws SQLException { + String a = "AAAzIdACKAAABWCAAA"; + String b = "AAAzIdAC/AACWIPAAB"; + rowidGeMaxCheck(a, b, true); + } + + @Test + void testIsChunkEndLeMax_Rowid_Case() throws SQLException { + String a = "AAAzIdACKAAABWCAAA"; + String b = "AAAzIdAC/AACWIPAAB"; + rowidLeMaxCheck(a, b, true); + } + + private void rowidGeMaxCheck(String chunkEndStr, String maxStr, boolean expected) + throws SQLException { + JdbcConfiguration jdbcConfig = + JdbcConfiguration.create() + .with(JdbcConfiguration.HOSTNAME, ORACLE_CONTAINER.getHost()) + .with(JdbcConfiguration.PORT, ORACLE_CONTAINER.getOraclePort()) + .with(JdbcConfiguration.USER, ORACLE_CONTAINER.getUsername()) + .with(JdbcConfiguration.PASSWORD, ORACLE_CONTAINER.getPassword()) + .with(JdbcConfiguration.DATABASE, ORACLE_CONTAINER.getDatabaseName()) + .build(); + JdbcConnection jdbc = OracleConnectionUtils.createOracleConnection(jdbcConfig); + ROWID chunkEnd = new ROWID(chunkEndStr); + ROWID max = new ROWID(maxStr); + ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, null, null); + OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, chunkSplitterState); + assertTrue(splitter.isChunkEndGeMax(jdbc, chunkEnd, max, null) == expected); + } + + private void rowidLeMaxCheck(String chunkEndStr, String maxStr, boolean expected) + throws SQLException { + JdbcConfiguration jdbcConfig = + JdbcConfiguration.create() + .with(JdbcConfiguration.HOSTNAME, ORACLE_CONTAINER.getHost()) + .with(JdbcConfiguration.PORT, ORACLE_CONTAINER.getOraclePort()) + .with(JdbcConfiguration.USER, ORACLE_CONTAINER.getUsername()) + .with(JdbcConfiguration.PASSWORD, ORACLE_CONTAINER.getPassword()) + .with(JdbcConfiguration.DATABASE, ORACLE_CONTAINER.getDatabaseName()) + .build(); + JdbcConnection jdbc = OracleConnectionUtils.createOracleConnection(jdbcConfig); + ROWID chunkEnd = new ROWID(chunkEndStr); + ROWID max = new ROWID(maxStr); + ChunkSplitterState chunkSplitterState = new ChunkSplitterState(null, null, null); + OracleChunkSplitter splitter = new OracleChunkSplitter(null, null, chunkSplitterState); + assertTrue(splitter.isChunkEndLeMax(jdbc, chunkEnd, max, null) == expected); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java index 1576c052c76..7b74c754018 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java @@ -69,12 +69,14 @@ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId); } - protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) { + protected boolean isChunkEndLeMax( + JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) { return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0; } /** ChunkEnd greater than or equal to max. */ - protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) { + protected boolean isChunkEndGeMax( + JdbcConnection jdbc, Object chunkEnd, Object max, Column splitColumn) { return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0; } }