Skip to content

Commit

Permalink
Rename PipelineLifecycleRunnable (#28954)
Browse files Browse the repository at this point in the history
* Rename PipelineLifecycleRunnable

* Move PipelineLifecycleRunnable
  • Loading branch information
terrymanu authored Nov 6, 2023
1 parent 55c71a6 commit de54a69
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;

/**
* Dumper interface.
*/
public interface Dumper extends LifecycleExecutor {
public interface Dumper extends PipelineLifecycleRunnable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.api.executor;
package org.apache.shardingsphere.data.pipeline.api.runnable;

/**
* Lifecycle executor.
* Pipeline lifecycle runnable.
*/
// TODO task?
public interface LifecycleExecutor extends Runnable {
public interface PipelineLifecycleRunnable extends Runnable {

/**
* Start run execute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.execute;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;

import java.sql.SQLException;
import java.time.Instant;
Expand All @@ -28,10 +28,10 @@
import java.util.concurrent.atomic.AtomicReference;

/**
* Abstract lifecycle executor.
* Abstract pipeline lifecycle runnable.
*/
@Slf4j
public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
public abstract class AbstractPipelineLifecycleRunnable implements PipelineLifecycleRunnable {

private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;

Expand Down Expand Up @@ -73,12 +73,12 @@ public static ExecuteEngine newFixedThreadInstance(final int threadNumber, final
/**
* Submit a {@code LifecycleExecutor} with callback {@code ExecuteCallback} to execute.
*
* @param lifecycleExecutor lifecycle executor
* @param pipelineLifecycleRunnable lifecycle executor
* @param executeCallback execute callback
* @return execute future
*/
public CompletableFuture<?> submit(final LifecycleExecutor lifecycleExecutor, final ExecuteCallback executeCallback) {
return CompletableFuture.runAsync(lifecycleExecutor, executorService).whenCompleteAsync((unused, throwable) -> {
public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable, final ExecuteCallback executeCallback) {
return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService).whenCompleteAsync((unused, throwable) -> {
if (null == throwable) {
executeCallback.onSuccess();
} else {
Expand All @@ -91,11 +91,11 @@ public CompletableFuture<?> submit(final LifecycleExecutor lifecycleExecutor, fi
/**
* Submit a {@code LifecycleExecutor} to execute.
*
* @param lifecycleExecutor lifecycle executor
* @param pipelineLifecycleRunnable lifecycle executor
* @return execute future
*/
public CompletableFuture<?> submit(final LifecycleExecutor lifecycleExecutor) {
return CompletableFuture.runAsync(lifecycleExecutor, executorService);
public CompletableFuture<?> submit(final PipelineLifecycleRunnable pipelineLifecycleRunnable) {
return CompletableFuture.runAsync(pipelineLifecycleRunnable, executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
Expand Down Expand Up @@ -67,7 +67,7 @@
* Inventory dumper.
*/
@Slf4j
public final class InventoryDumper extends AbstractLifecycleExecutor implements Dumper {
public final class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {

@Getter(AccessLevel.PROTECTED)
private final InventoryDumperContext dumperContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.shardingsphere.data.pipeline.core.importer;

import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;

/**
* Importer.
*/
public interface Importer extends LifecycleExecutor {
public interface Importer extends PipelineLifecycleRunnable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.importer;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
Expand All @@ -36,7 +36,7 @@
* Single channel consumer importer.
*/
@RequiredArgsConstructor
public final class SingleChannelConsumerImporter extends AbstractLifecycleExecutor implements Importer {
public final class SingleChannelConsumerImporter extends AbstractPipelineLifecycleRunnable implements Importer {

private final PipelineChannel channel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class AbstractLifecycleExecutorTest {
class AbstractPipelineLifecycleRunnableTest {

@Test
void assertRunning() {
FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable();
assertFalse(executor.isRunning());
executor.start();
assertTrue(executor.isRunning());
Expand All @@ -40,15 +40,15 @@ void assertRunning() {

@Test
void assertStartRunOnce() {
FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable();
executor.start();
executor.start();
assertThat(executor.runBlockingCount.get(), is(1));
}

@Test
void assertStopRunOnce() {
FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable();
executor.start();
executor.stop();
executor.stop();
Expand All @@ -57,22 +57,22 @@ void assertStopRunOnce() {

@Test
void assertNoStopBeforeStarting() {
FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable();
executor.stop();
executor.stop();
assertThat(executor.doStopCount.get(), is(0));
}

@Test
void assertStopStart() {
FixtureLifecycleExecutor executor = new FixtureLifecycleExecutor();
FixturePipelineLifecycleRunnable executor = new FixturePipelineLifecycleRunnable();
executor.stop();
executor.start();
assertThat(executor.doStopCount.get(), is(0));
assertThat(executor.runBlockingCount.get(), is(0));
}

private static class FixtureLifecycleExecutor extends AbstractLifecycleExecutor {
private static class FixturePipelineLifecycleRunnable extends AbstractPipelineLifecycleRunnable {

private final AtomicInteger runBlockingCount = new AtomicInteger();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;

Expand All @@ -45,24 +45,24 @@ class ExecuteEngineTest {

@Test
void assertSubmitAndTaskSucceeded() {
LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
PipelineLifecycleRunnable pipelineLifecycleRunnable = mock(PipelineLifecycleRunnable.class);
ExecuteCallback callback = mock(ExecuteCallback.class);
ExecuteEngine executeEngine = ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName());
Future<?> future = executeEngine.submit(lifecycleExecutor, callback);
Future<?> future = executeEngine.submit(pipelineLifecycleRunnable, callback);
assertTimeout(Duration.ofSeconds(30L), () -> future.get());
shutdownAndAwaitTerminal(executeEngine);
verify(lifecycleExecutor).run();
verify(pipelineLifecycleRunnable).run();
verify(callback).onSuccess();
}

@Test
void assertSubmitAndTaskFailed() {
LifecycleExecutor lifecycleExecutor = mock(LifecycleExecutor.class);
PipelineLifecycleRunnable pipelineLifecycleRunnable = mock(PipelineLifecycleRunnable.class);
RuntimeException expectedException = new RuntimeException("Expected");
doThrow(expectedException).when(lifecycleExecutor).run();
doThrow(expectedException).when(pipelineLifecycleRunnable).run();
ExecuteCallback callback = mock(ExecuteCallback.class);
ExecuteEngine executeEngine = ExecuteEngine.newCachedThreadInstance(ExecuteEngineTest.class.getSimpleName());
Future<?> future = executeEngine.submit(lifecycleExecutor, callback);
Future<?> future = executeEngine.submit(pipelineLifecycleRunnable, callback);
Optional<Throwable> actualCause = assertTimeout(Duration.ofSeconds(30L), () -> execute(future));
assertTrue(actualCause.isPresent());
assertThat(actualCause.get(), is(expectedException));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
Expand Down Expand Up @@ -64,7 +64,7 @@
* MySQL incremental dumper.
*/
@Slf4j
public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
public final class MySQLIncrementalDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper {

private final IncrementalDumperContext dumperContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand Down Expand Up @@ -56,7 +56,7 @@
* WAL dumper of openGauss.
*/
@Slf4j
public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
public final class OpenGaussWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper {

private final IncrementalDumperContext dumperContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand Down Expand Up @@ -58,7 +58,7 @@
* PostgreSQL WAL dumper.
*/
@Slf4j
public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
public final class PostgreSQLWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper {

private final IncrementalDumperContext dumperContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
Expand Down Expand Up @@ -53,7 +53,7 @@
*/
@RequiredArgsConstructor
@Slf4j
public final class CDCImporter extends AbstractLifecycleExecutor implements Importer {
public final class CDCImporter extends AbstractPipelineLifecycleRunnable implements Importer {

@Getter
private final String importerId = RandomStringUtils.randomAlphanumeric(8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.runnable.PipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
Expand Down Expand Up @@ -60,7 +60,7 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {

private final String parentJobId;

private final LifecycleExecutor checkExecutor;
private final PipelineLifecycleRunnable checkExecutor;

private final AtomicReference<PipelineDataConsistencyChecker> consistencyChecker = new AtomicReference<>();

Expand All @@ -69,7 +69,7 @@ public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemC
checkJobConfig = jobItemContext.getJobConfig();
checkJobId = checkJobConfig.getJobId();
parentJobId = checkJobConfig.getParentJobId();
checkExecutor = new CheckLifecycleExecutor();
checkExecutor = new CheckPipelineLifecycleRunnable();
}

@Override
Expand All @@ -88,7 +88,7 @@ public void stop() {
checkExecutor.stop();
}

private final class CheckLifecycleExecutor extends AbstractLifecycleExecutor {
private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifecycleRunnable {

@Override
protected void runBlocking() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;

import org.apache.shardingsphere.data.pipeline.common.execute.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;

public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
public final class FixtureIncrementalDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper {

@Override
protected void runBlocking() {
Expand Down

0 comments on commit de54a69

Please sign in to comment.