Skip to content

Commit

Permalink
Improve drop streaming, not allow drop active job (#28992)
Browse files Browse the repository at this point in the history
* Improve drop streaming, not allow drop active job

* Improve CDCE2EIT
  • Loading branch information
azexcy authored Nov 9, 2023
1 parent 818b49b commit 7308e0c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class DropStreamingUpdater implements RALUpdater<DropStreamingState

@Override
public void executeUpdate(final String databaseName, final DropStreamingStatement sqlStatement) throws SQLException {
jobAPI.stopAndDrop(sqlStatement.getJobId());
jobAPI.dropStreaming(sqlStatement.getJobId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
Expand Down Expand Up @@ -328,17 +329,14 @@ public void commit(final String jobId) {
}

/**
* Stop and drop job.
* Drop streaming job.
*
* @param jobId job id
*/
public void stopAndDrop(final String jobId) {
CDCJobConfiguration jobConfig = getJobConfiguration(jobId);
if (CDCSinkType.SOCKET == jobConfig.getSinkConfig().getSinkType()) {
PipelineJobCenter.stop(jobId);
} else {
stop(jobId);
}
public void dropStreaming(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
dropJob(jobId);
cleanup(jobConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void stopStreaming(final String jobId, final ChannelId channelId) {
* @param jobId job ID
*/
public void dropStreaming(final String jobId) {
jobAPI.stopAndDrop(jobId);
jobAPI.dropStreaming(jobId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
initSchemaAndTable(containerComposer, connection, 0);
}
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
startCDCClient(containerComposer, dialectDatabaseMetaData);
final CDCClient cdcClient = buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
Expand All @@ -140,6 +140,9 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
containerComposer.getDatabaseType());
assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName);
assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address"));
cdcClient.close();
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> containerComposer.queryForListWithLog("SHOW STREAMING LIST")
.stream().noneMatch(each -> Boolean.parseBoolean(each.get("active").toString())));
containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0);
assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
}
Expand All @@ -166,16 +169,17 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont
containerComposer.getUsername(), containerComposer.getPassword()));
}

private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : "";
CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
cdcClient.connect(recordConsumer, new RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> log.error("Server error: {}", result.getErrorMessage()));
cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD));
CDCClient result = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
result.connect(recordConsumer, new RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> log.error("Server error: {}", serverErrorResult.getErrorMessage()));
result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
result.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
SchemaTable.newBuilder().setTable("t_address").build())), true));
return result;
}

private List<Map<String, Object>> listOrderRecords(final PipelineContainerComposer containerComposer, final String tableNameWithSchema) throws SQLException {
Expand Down

0 comments on commit 7308e0c

Please sign in to comment.