diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java index 24e8d61da8caa..47db9466af5ab 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java @@ -32,7 +32,7 @@ public final class DropStreamingUpdater implements RALUpdater new PipelineInternalException("Can't drop streaming job which is active")); dropJob(jobId); cleanup(jobConfig); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index 2a3a39d82d55f..ad7f7b7741001 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -164,7 +164,7 @@ public void stopStreaming(final String jobId, final ChannelId channelId) { * @param jobId job ID */ public void dropStreaming(final String jobId) { - jobAPI.stopAndDrop(jobId); + jobAPI.dropStreaming(jobId); } /** diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 70da4a2663aa8..5d0d7dea41711 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -118,7 +118,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ initSchemaAndTable(containerComposer, connection, 0); } DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData(); - startCDCClient(containerComposer, dialectDatabaseMetaData); + final CDCClient cdcClient = buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData); Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty()); String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString(); containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId)); @@ -140,6 +140,9 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ containerComposer.getDatabaseType()); assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName); assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address")); + cdcClient.close(); + Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> containerComposer.queryForListWithLog("SHOW STREAMING LIST") + .stream().noneMatch(each -> Boolean.parseBoolean(each.get("active").toString()))); containerComposer.proxyExecuteWithLog(String.format("DROP STREAMING '%s'", jobId), 0); assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty()); } @@ -166,16 +169,17 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont containerComposer.getUsername(), containerComposer.getPassword())); } - private void startCDCClient(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { + private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; - CDCClient cdcClient = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); - cdcClient.connect(recordConsumer, new RetryStreamingExceptionHandler(cdcClient, 5, 5000), (ctx, result) -> log.error("Server error: {}", result.getErrorMessage())); - cdcClient.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); + CDCClient result = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); + result.connect(recordConsumer, new RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> log.error("Server error: {}", serverErrorResult.getErrorMessage())); + result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); // TODO add full=false test case later - cdcClient.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), + result.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), SchemaTable.newBuilder().setTable("t_address").build())), true)); + return result; } private List> listOrderRecords(final PipelineContainerComposer containerComposer, final String tableNameWithSchema) throws SQLException {