Skip to content

Commit

Permalink
Pipeline E2E compatible with com.mysql:mysql-connector-j:8.0 (#28853)
Browse files Browse the repository at this point in the history
* Pipeline E2E use statement.execute and getResultSet to compatible with com.mysql:mysql-connector-j:8.0 for extended SQL

* Revert pipeline E2E compatibility code of com.mysql:mysql-connector-j:8.0

* Disable CDCE2EIT for now
  • Loading branch information
sandynz authored Oct 24, 2023
1 parent 55a2ee0 commit 607ba11
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -420,9 +421,11 @@ public List<Map<String, Object>> queryForListWithLog(final String sql) {
*/
public List<Map<String, Object>> queryForListWithLog(final DataSource dataSource, final String sql) {
log.info("Query SQL: {}", sql);
try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
return transformResultSetToList(resultSet);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
return transformResultSetToList(statement.getResultSet());
} catch (final SQLException ex) {
throw new RuntimeException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.List;
Expand Down Expand Up @@ -82,21 +83,25 @@ void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLExc
containerComposer.startIncrementTask(
new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item (item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
stopMigrationByJobId(containerComposer, orderJobId);
startMigrationByJobId(containerComposer, orderJobId);
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", orderItemJobId));
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order", 10000);
containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order_item", 10000);
Properties algorithmProps = new Properties();
algorithmProps.setProperty("chunk-size", "300");
assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH", algorithmProps);
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
assertMigrationSuccessById(containerComposer, orderItemJobId, "DATA_MATCH", algorithmProps);
Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
assertMigrationSuccessById(containerComposer, orderItemJobId, "CRC32_MATCH", new Properties());
for (String each : listJobId(containerComposer)) {
commitMigrationByJobId(containerComposer, each);
}
assertTrue(listJobId(containerComposer).isEmpty());
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;

import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Hex;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand Down Expand Up @@ -90,12 +92,13 @@ void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) th
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
// TODO PostgreSQL update delete events not support if table without unique keys at increment task.
final Consumer<Void> incrementalTaskFn = unused -> {
final Consumer<DataSource> incrementalTaskFn = dataSource -> {
if (containerComposer.getDatabaseType() instanceof MySQLDatabaseType) {
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
}
Object orderId = keyGenerateAlgorithm.generateKey();
insertOneOrder(containerComposer, orderId);
containerComposer.assertOrderRecordExist(dataSource, "t_order", orderId);
};
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
}
Expand Down Expand Up @@ -169,9 +172,10 @@ void assertMultiPrimaryKeyMigrationSuccess(final PipelineTestParameter testParam
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey);
});
}
}
Expand All @@ -191,9 +195,10 @@ void assertMultiUniqueKeyMigrationSuccess(final PipelineTestParameter testParam)
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey);
});
}
}
Expand All @@ -215,12 +220,16 @@ void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final PipelineTestPa
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
// TODO Insert binary string in VARBINARY column. But KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is not Comparable
byte[] uniqueKey = new byte[]{-1, 0, 1};
assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> insertOneOrder(containerComposer, uniqueKey));
assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
// TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
containerComposer.assertOrderRecordExist(dataSource, String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
});
}
}

private void assertMigrationSuccess(final PipelineContainerComposer containerComposer, final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
final String consistencyCheckAlgorithmType, final Consumer<Void> incrementalTaskFn) throws Exception {
final String consistencyCheckAlgorithmType, final Consumer<DataSource> incrementalTaskFn) throws Exception {
containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME));
try (Connection connection = containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
Expand All @@ -232,12 +241,14 @@ private void assertMigrationSuccess(final PipelineContainerComposer containerCom
startMigration(containerComposer, SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
String jobId = listJobId(containerComposer).get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
incrementalTaskFn.accept(null);
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
incrementalTaskFn.accept(jdbcDataSource);
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(containerComposer, jobId, consistencyCheckAlgorithmType);
}
commitMigrationByJobId(containerComposer, jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
}
Expand Down

0 comments on commit 607ba11

Please sign in to comment.