diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java index 79724699ab672..dc12667cd5848 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java @@ -17,10 +17,10 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.dumper; -import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable; /** * Dumper interface. */ -public interface Dumper extends LifecycleExecutor { +public interface Dumper extends PipelineLifecycleRunnable { } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java similarity index 85% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java index 3ae17b7159d24..4b0233f2f97b2 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/LifecycleExecutor.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/runnable/PipelineLifecycleRunnable.java @@ -15,13 +15,12 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.executor; +package org.apache.shardingsphere.data.pipeline.api.runnable; /** - * Lifecycle executor. + * Pipeline lifecycle runnable. */ -// TODO task? -public interface LifecycleExecutor extends Runnable { +public interface PipelineLifecycleRunnable extends Runnable { /** * Start run execute. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java similarity index 92% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java index 9d91c2d92279c..51dcacb213a74 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnable.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.common.execute; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable; import java.sql.SQLException; import java.time.Instant; @@ -28,10 +28,10 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Abstract lifecycle executor. + * Abstract pipeline lifecycle runnable. */ @Slf4j -public abstract class AbstractLifecycleExecutor implements LifecycleExecutor { +public abstract class AbstractPipelineLifecycleRunnable implements PipelineLifecycleRunnable { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java index c1a74f7de7858..eb6a1b4ea62d6 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngine.java @@ -20,7 +20,7 @@ import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; @@ -73,12 +73,12 @@ public static ExecuteEngine newFixedThreadInstance(final int threadNumber, final /** * Submit a {@code LifecycleExecutor} with callback {@code ExecuteCallback} to execute. * - * @param lifecycleExecutor lifecycle executor + * @param pipelineLifecycleRunnable lifecycle executor * @param executeCallback execute callback * @return execute future */ - public CompletableFuture submit(final LifecycleExecutor lifecycleExecutor, final ExecuteCallback executeCallback) { - return CompletableFuture.runAsync(lifecycleExecutor, executorService).whenCompleteAsync((unused, throwable) -> { + public CompletableFuture submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable, final ExecuteCallback executeCallback) { + return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService).whenCompleteAsync((unused, throwable) -> { if (null == throwable) { executeCallback.onSuccess(); } else { @@ -91,11 +91,11 @@ public CompletableFuture submit(final LifecycleExecutor lifecycleExecutor, fi /** * Submit a {@code LifecycleExecutor} to execute. * - * @param lifecycleExecutor lifecycle executor + * @param pipelineLifecycleRunnable lifecycle executor * @return execute future */ - public CompletableFuture submit(final LifecycleExecutor lifecycleExecutor) { - return CompletableFuture.runAsync(lifecycleExecutor, executorService); + public CompletableFuture submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable) { + return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index ebd0f8183eee7..cb2575bc5b43d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -21,7 +21,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; @@ -67,7 +67,7 @@ * Inventory dumper. */ @Slf4j -public final class InventoryDumper extends AbstractLifecycleExecutor implements Dumper { +public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper { @Getter(AccessLevel.PROTECTED) private final InventoryDumperContext dumperContext; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java index a7707c7b091d1..2217f39a38ab1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/Importer.java @@ -17,10 +17,10 @@ package org.apache.shardingsphere.data.pipeline.core.importer; -import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable; /** * Importer. */ -public interface Importer extends LifecycleExecutor { +public interface Importer extends PipelineLifecycleRunnable { } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java index 41589b79391a4..e272ebd7b831f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.importer; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; @@ -36,7 +36,7 @@ * Single channel consumer importer. */ @RequiredArgsConstructor -public final class SingleChannelConsumerImporter extends AbstractLifecycleExecutor implements Importer { +public final class SingleChannelConsumerImporter extends AbstractPipelineLifecycleRunnable implements Importer { private final PipelineChannel channel; diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java similarity index 80% rename from kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java rename to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java index b27766aa4bff5..00616f8f5dbaf 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractPipelineLifecycleRunnableTest.java @@ -26,11 +26,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -class AbstractLifecycleExecutorTest { +class AbstractPipelineLifecycleRunnableTest { @Test void assertRunning() { - FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor(); + FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable(); assertFalse(executor.isRunning()); executor.start(); assertTrue(executor.isRunning()); @@ -40,7 +40,7 @@ void assertRunning() { @Test void assertStartRunOnce() { - FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor(); + FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable(); executor.start(); executor.start(); assertThat(executor.runBlockingCount.get(), is(1)); @@ -48,7 +48,7 @@ void assertStartRunOnce() { @Test void assertStopRunOnce() { - FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor(); + FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable(); executor.start(); executor.stop(); executor.stop(); @@ -57,7 +57,7 @@ void assertStopRunOnce() { @Test void assertNoStopBeforeStarting() { - FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor(); + FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable(); executor.stop(); executor.stop(); assertThat(executor.doStopCount.get(), is(0)); @@ -65,14 +65,14 @@ void assertNoStopBeforeStarting() { @Test void assertStopStart() { - FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor(); + FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable(); executor.stop(); executor.start(); assertThat(executor.doStopCount.get(), is(0)); assertThat(executor.runBlockingCount.get(), is(0)); } - private static class FixtureLifecycleExecutor extends AbstractLifecycleExecutor { + private static class FixturePipelineLifecycleRunnable extends AbstractPipelineLifecycleRunnable { private final AtomicInteger runBlockingCount = new AtomicInteger(); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java index 4193427298a47..99c26bf500579 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/ExecuteEngineTest.java @@ -19,7 +19,7 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; -import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable; import org.junit.jupiter.api.Test; import org.mockito.internal.configuration.plugins.Plugins; @@ -45,24 +45,24 @@ class ExecuteEngineTest { @Test void assertSubmitAndTaskSucceeded() { - LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class); + PipelineLifecycleRunnable pipelineLifecycleRunnable = mock(PipelineLifecycleRunnable.class); ExecuteCallback callback = mock(ExecuteCallback.class); ExecuteEngine executeEngine = ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName()); - Future future = executeEngine.submit(lifecycleExecutor, callback); + Future future = executeEngine.submit(pipelineLifecycleRunnable, callback); assertTimeout(Duration.ofSeconds(30L), () -> future.get()); shutdownAndAwaitTerminal(executeEngine); - verify(lifecycleExecutor).run(); + verify(pipelineLifecycleRunnable).run(); verify(callback).onSuccess(); } @Test void assertSubmitAndTaskFailed() { - LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class); + PipelineLifecycleRunnable pipelineLifecycleRunnable = mock(PipelineLifecycleRunnable.class); RuntimeException expectedException = new RuntimeException("Expected"); - doThrow(expectedException).when(lifecycleExecutor).run(); + doThrow(expectedException).when(pipelineLifecycleRunnable).run(); ExecuteCallback callback = mock(ExecuteCallback.class); ExecuteEngine executeEngine = ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName()); - Future future = executeEngine.submit(lifecycleExecutor, callback); + Future future = executeEngine.submit(pipelineLifecycleRunnable, callback); Optional actualCause = assertTimeout(Duration.ofSeconds(30L), () -> execute(future)); assertTrue(actualCause.isPresent()); assertThat(actualCause.get(), is(expectedException)); diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 705a02200a8db..833834e256083 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; @@ -64,7 +64,7 @@ * MySQL incremental dumper. */ @Slf4j -public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper { +public final class MySQLIncrementalDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper { private final IncrementalDumperContext dumperContext; diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index dfa912c4a0e0f..74de5515898e6 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -56,7 +56,7 @@ * WAL dumper of openGauss. */ @Slf4j -public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper { +public final class OpenGaussWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper { private final IncrementalDumperContext dumperContext; diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index b23686cbe4e6b..c284d8967e827 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -58,7 +58,7 @@ * PostgreSQL WAL dumper. */ @Slf4j -public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper { +public final class PostgreSQLWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper { private final IncrementalDumperContext dumperContext; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java index bef90f99488e6..0950df7e01f7d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java @@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; @@ -53,7 +53,7 @@ */ @RequiredArgsConstructor @Slf4j -public final class CDCImporter extends AbstractLifecycleExecutor implements Importer { +public final class CDCImporter extends AbstractPipelineLifecycleRunnable implements Importer { @Getter private final String importerId = RandomStringUtils.randomAlphanumeric(8); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 62129db313b9e..4034b3d6056b9 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -19,8 +19,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; -import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; +import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; @@ -60,7 +60,7 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner { private final String parentJobId; - private final LifecycleExecutor checkExecutor; + private final PipelineLifecycleRunnable checkExecutor; private final AtomicReference consistencyChecker = new AtomicReference<>(); @@ -69,7 +69,7 @@ public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemC checkJobConfig = jobItemContext.getJobConfig(); checkJobId = checkJobConfig.getJobId(); parentJobId = checkJobConfig.getParentJobId(); - checkExecutor = new CheckLifecycleExecutor(); + checkExecutor = new CheckPipelineLifecycleRunnable(); } @Override @@ -88,7 +88,7 @@ public void stop() { checkExecutor.stop(); } - private final class CheckLifecycleExecutor extends AbstractLifecycleExecutor { + private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifecycleRunnable { @Override protected void runBlocking() { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java index cc4f96860bb40..40467c113712a 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java @@ -17,10 +17,10 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.fixture; -import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; -public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper { +public final class FixtureIncrementalDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper { @Override protected void runBlocking() {