Skip to content

Commit

Permalink
Compatible with com.mysql:mysql-connector-j:8.0 (#28849)
Browse files Browse the repository at this point in the history
* Use mysql:mysql-connector-java:5.1.49 for pipeline E2E

* Replace maven dependency from mysql:mysql-connector-java to com.mysql:mysql-connector-j

* Update mysql-connector-java.version to 8.0.31 for SQL E2E

* Add TODO

* Replace maven dependency from mysql:mysql-connector-java to com.mysql:mysql-connector-j for example pom.ftl

* Pipeline E2E compatible with com.mysql:mysql-connector-j:8.0

* Disable CDCE2EIT for now
  • Loading branch information
sandynz authored Oct 23, 2023
1 parent ffe0a04 commit 067670e
Show file tree
Hide file tree
Showing 21 changed files with 59 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-sql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
- adapter: proxy
database: MySQL
scenario: passthrough
additional-options: '-Dmysql-connector-java.version=8.0.30'
additional-options: '-Dmysql-connector-java.version=8.0.31'
exclude:
- adapter: jdbc
scenario: passthrough
Expand Down
4 changes: 2 additions & 2 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.31</version>
</dependency>

<dependency>
Expand Down
4 changes: 2 additions & 2 deletions jdbc/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
Expand Down
4 changes: 2 additions & 2 deletions kernel/data-pipeline/scenario/cdc/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
</dependencies>
</project>
4 changes: 2 additions & 2 deletions kernel/transaction/type/xa/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
Expand Down
4 changes: 2 additions & 2 deletions mode/type/standalone/repository/provider/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql-connector-java.version}</version>
<scope>test</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions proxy/bootstrap/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/agent/jdbc-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static String getDriverClassName(final DatabaseType databaseType) {
case "H2":
return "org.h2.Driver";
case "MySQL":
return "com.mysql.jdbc.Driver";
return "com.mysql.cj.jdbc.Driver";
case "PostgreSQL":
return "org.postgresql.Driver";
case "openGauss":
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/fixture/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/operation/pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtils;
import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
Expand All @@ -79,6 +80,7 @@
/**
* CDC E2E IT.
*/
@Disabled("TODO Enable MySQL after compatible with com.mysql:mysql-connector-j:8.0")
@PipelineE2ESettings(database = {
@PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/general/mysql.xml"),
@PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/scenario/general/postgresql.xml"),
Expand Down Expand Up @@ -111,11 +113,11 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(containerComposer, connection, 3);
}
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
DataSource dataSource = containerComposer.getProxyDataSource();
Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
log.info("init data begin: {}", LocalDateTime.now());
DataSourceExecuteUtils.execute(jdbcDataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft());
DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"}));
DataSourceExecuteUtils.execute(dataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft());
DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"}));
log.info("init data end: {}", LocalDateTime.now());
try (
Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
Expand All @@ -128,10 +130,10 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
String tableName = dialectDatabaseMetaData.isSchemaAvailable() ? String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
containerComposer.startIncrementTask(new E2EIncrementalTask(jdbcDataSource, tableName, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
containerComposer.startIncrementTask(new E2EIncrementalTask(dataSource, tableName, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
containerComposer.getIncreaseTaskThread().join(10000L);
List<Map<String, Object>> actualProxyList;
try (Connection connection = jdbcDataSource.getConnection()) {
try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", getOrderTableNameWithSchema(dialectDatabaseMetaData)));
actualProxyList = containerComposer.transformResultSetToList(resultSet);
}
Expand All @@ -140,7 +142,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
SchemaTableName orderSchemaTableName = dialectDatabaseMetaData.isSchemaAvailable()
? new SchemaTableName(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
: new SchemaTableName(null, SOURCE_TABLE_NAME);
PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(dataSource, containerComposer.getDatabaseType());
PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4),
containerComposer.getDatabaseType());
assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.List;
Expand Down Expand Up @@ -83,25 +82,21 @@ void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLExc
containerComposer.startIncrementTask(
new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item (item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
stopMigrationByJobId(containerComposer, orderJobId);
startMigrationByJobId(containerComposer, orderJobId);
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order", 10000);
containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order_item", 10000);
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", orderItemJobId));
Properties algorithmProps = new Properties();
algorithmProps.setProperty("chunk-size", "300");
assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH", algorithmProps);
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
assertMigrationSuccessById(containerComposer, orderItemJobId, "DATA_MATCH", algorithmProps);
Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
assertMigrationSuccessById(containerComposer, orderItemJobId, "CRC32_MATCH", new Properties());
for (String each : listJobId(containerComposer)) {
commitMigrationByJobId(containerComposer, each);
}
assertTrue(listJobId(containerComposer).isEmpty());
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;

import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Hex;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
Expand All @@ -40,7 +39,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand Down Expand Up @@ -92,13 +90,12 @@ void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) th
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
// TODO PostgreSQL update delete events not support if table without unique keys at increment task.
final Consumer<DataSource> incrementalTaskFn = dataSource -> {
final Consumer<Void> incrementalTaskFn = unused -> {
if (containerComposer.getDatabaseType() instanceof MySQLDatabaseType) {
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
}
Object orderId = keyGenerateAlgorithm.generateKey();
insertOneOrder(containerComposer, orderId);
containerComposer.assertOrderRecordExist(dataSource, "t_order", orderId);
};
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
}
Expand Down Expand Up @@ -172,10 +169,9 @@ void assertMultiPrimaryKeyMigrationSuccess(final PipelineTestParameter testParam
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey);
});
}
}
Expand All @@ -195,10 +191,9 @@ void assertMultiUniqueKeyMigrationSuccess(final PipelineTestParameter testParam)
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey);
});
}
}
Expand All @@ -220,16 +215,12 @@ void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final PipelineTestPa
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
// TODO Insert binary string in VARBINARY column. But KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is not Comparable
byte[] uniqueKey = new byte[]{-1, 0, 1};
assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
// TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
containerComposer.assertOrderRecordExist(dataSource, String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
});
assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> insertOneOrder(containerComposer, uniqueKey));
}
}

private void assertMigrationSuccess(final PipelineContainerComposer containerComposer, final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
final String consistencyCheckAlgorithmType, final Consumer<DataSource> incrementalTaskFn) throws Exception {
final String consistencyCheckAlgorithmType, final Consumer<Void> incrementalTaskFn) throws Exception {
containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME));
try (Connection connection = containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
Expand All @@ -241,14 +232,12 @@ private void assertMigrationSuccess(final PipelineContainerComposer containerCom
startMigration(containerComposer, SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
String jobId = listJobId(containerComposer).get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
incrementalTaskFn.accept(jdbcDataSource);
incrementalTaskFn.accept(null);
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(containerComposer, jobId, consistencyCheckAlgorithmType);
}
commitMigrationByJobId(containerComposer, jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
}
Expand Down
Loading

0 comments on commit 067670e

Please sign in to comment.