diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java similarity index 95% rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java index fbefb1b9f8a1f..9d91c2d92279c 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutor.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.executor; +package org.apache.shardingsphere.data.pipeline.common.execute; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; import java.sql.SQLException; import java.time.Instant; @@ -52,9 +53,6 @@ public final void start() { runBlocking(); } - /** - * Run blocking. - */ protected abstract void runBlocking(); @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index b022f32de7227..36943d542bd69 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -21,7 +21,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java index bf7644f232cfd..41589b79391a4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.core.importer; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord; diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutorTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java similarity index 97% rename from kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutorTest.java rename to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java index c4aa4e05c1989..b27766aa4bff5 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutorTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/execute/AbstractLifecycleExecutorTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.api.executor; +package org.apache.shardingsphere.data.pipeline.common.execute; import org.junit.jupiter.api.Test; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index 6d50d4950a03f..ed64a865cf7df 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index 8cdc9e1520bbc..dbc2a573ba66a 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; @@ -96,7 +96,7 @@ protected void runBlocking() { int times = reconnectTimes.incrementAndGet(); log.error("Connect failed, reconnect times={}", times, ex); if (isRunning()) { - Thread.sleep(5000); + Thread.sleep(5000L); } if (times >= 5) { throw new IngestException(ex); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index c2b855c985f84..23a0d808b089a 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java index 87e2d673a8fcb..bef90f99488e6 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java @@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord; diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index a55671d4a18a8..62129db313b9e 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -19,7 +19,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java index 094a4fdbac5a3..cc4f96860bb40 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureIncrementalDumper.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.fixture; -import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor; +import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper; public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {