From 2a7c5f98c0d0fb517d8f156f7c4239f9fd93bf35 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Tue, 21 Nov 2023 23:56:10 +0800 Subject: [PATCH] Refactor GovernanceRepositoryAPI (#29117) * Remove GovernanceRepositoryAPI.getChildrenKeys() * Remove GovernanceRepositoryAPI.watchPipeLineRootPath() * Rename GovernanceRepositoryAPI.watchPipeLineRootPath() * Rename GovernanceRepositoryAPI.updateJobItemErrorMessage() * Refactor GovernanceRepositoryAPI --- .../node/PipelineMetaDataNodeWatcher.java | 2 +- .../repository/GovernanceRepositoryAPI.java | 44 ++++++++++--------- .../GovernanceRepositoryAPIImpl.java | 27 +++++++----- .../PipelineJobIteErrorMessageManager.java | 5 +-- .../core/job/service/PipelineJobManager.java | 5 +-- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 +-- .../GovernanceRepositoryAPIImplTest.java | 24 +++++----- .../MigrationDataConsistencyCheckerTest.java | 10 ++++- 8 files changed, 66 insertions(+), 56 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java index 39f5c0a064ba1..42003ec1d661a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeWatcher.java @@ -50,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher { private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) { listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class) .stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each))); - PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(PipelineNodePath.DATA_PIPELINE_ROOT, this::dispatchEvent); + PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watchPipeLineRootPath(this::dispatchEvent); } private void dispatchEvent(final DataChangedEvent event) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java index 83904a361a910..87496931229b9 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java @@ -17,8 +17,10 @@ package org.apache.shardingsphere.data.pipeline.common.registrycenter.repository; +import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; import java.util.Collection; @@ -31,6 +33,13 @@ */ public interface GovernanceRepositoryAPI { + /** + * Watch pipeLine root path. + * + * @param listener data changed event listener + */ + void watchPipeLineRootPath(DataChangedEventListener listener); + /** * Whether job configuration existed. * @@ -147,36 +156,29 @@ public interface GovernanceRepositoryAPI { void deleteJob(String jobId); /** - * Get node's sub-nodes list. - * - * @param key key of data - * @return sub-nodes name list - */ - List getChildrenKeys(String key); - - /** - * Watch key or path of governance server. + * Persist job root info. * - * @param key key of data - * @param listener data changed event listener + * @param jobId job ID + * @param jobClass job class */ - void watch(String key, DataChangedEventListener listener); + void persistJobRootInfo(String jobId, Class jobClass); /** - * Persist data. - * - * @param key key of data - * @param value value of data + * Persist job configuration. + * + * @param jobId job ID + * @param jobConfigPOJO job configuration POJO */ - void persist(String key, String value); + void persistJobConfiguration(String jobId, JobConfigurationPOJO jobConfigPOJO); /** - * Update data. + * Update job item error message. * - * @param key key of data - * @param value value of data + * @param jobId job ID + * @param shardingItem sharding item + * @param errorMessage error message */ - void update(String key, String value); + void updateJobItemErrorMessage(String jobId, int shardingItem, String errorMessage); /** * Get sharding items of job. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java index 9e12ee8d9a6da..1ea1dee6c9428 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java @@ -20,13 +20,16 @@ import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; +import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; @@ -50,6 +53,11 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP private final ClusterPersistRepository repository; + @Override + public void watchPipeLineRootPath(final DataChangedEventListener listener) { + repository.watch(PipelineNodePath.DATA_PIPELINE_ROOT, listener); + } + @Override public boolean isJobConfigurationExisted(final String jobId) { return null != repository.getDirectly(PipelineMetaDataNode.getJobConfigurationPath(jobId)); @@ -142,28 +150,23 @@ public void deleteJob(final String jobId) { } @Override - public List getChildrenKeys(final String key) { - return repository.getChildrenKeys(key); - } - - @Override - public void watch(final String key, final DataChangedEventListener listener) { - repository.watch(key, listener); + public void persistJobRootInfo(final String jobId, final Class jobClass) { + repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobClass.getName()); } @Override - public void persist(final String key, final String value) { - repository.persist(key, value); + public void persistJobConfiguration(final String jobId, final JobConfigurationPOJO jobConfigPOJO) { + repository.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfigPOJO)); } @Override - public void update(final String key, final String value) { - repository.update(key, value); + public void updateJobItemErrorMessage(final String jobId, final int shardingItem, final String errorMessage) { + repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), errorMessage); } @Override public List getShardingItems(final String jobId) { - List result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId)); + List result = repository.getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId)); return result.stream().map(Integer::parseInt).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java index a20cd2e7a4fc4..669d7eebbfe87 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; @@ -56,7 +55,7 @@ public String getErrorMessage() { * @param error error */ public void updateErrorMessage(final Object error) { - governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), null == error ? "" : buildErrorMessage(error)); + governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, null == error ? "" : buildErrorMessage(error)); } private String buildErrorMessage(final Object error) { @@ -67,6 +66,6 @@ private String buildErrorMessage(final Object error) { * Clean job item error message. */ public void cleanErrorMessage() { - governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem), ""); + governanceRepositoryAPI.updateJobItemErrorMessage(jobId, shardingItem, ""); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index e8e1c82148112..9dfa1c4f62a91 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -33,7 +33,6 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @@ -80,8 +79,8 @@ public Optional start(final PipelineJobConfiguration jobConfig) { log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); return Optional.of(jobId); } - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName()); - repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobId), YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO())); + repositoryAPI.persistJobRootInfo(jobId, jobAPI.getJobClass()); + repositoryAPI.persistJobConfiguration(jobId, jobConfig.convertToJobConfigurationPOJO()); return Optional.of(jobId); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 17c3daa7a1fbf..0fdff431d4b5d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -49,7 +49,6 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; -import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; @@ -125,10 +124,10 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT if (repositoryAPI.isJobConfigurationExisted(jobConfig.getJobId())) { log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId()); } else { - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName()); + repositoryAPI.persistJobRootInfo(jobConfig.getJobId(), getJobClass()); JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); jobConfigPOJO.setDisabled(true); - repositoryAPI.persist(PipelineMetaDataNode.getJobConfigurationPath(jobConfig.getJobId()), YamlEngine.marshal(jobConfigPOJO)); + repositoryAPI.persistJobConfiguration(jobConfig.getJobId(), jobConfigPOJO); if (!param.isFull()) { initIncrementalPosition(jobConfig); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index 9c5bca8cf1d9d..cf5203910f467 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.job.service; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineNodePath; @@ -32,6 +33,8 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder; import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils; import org.junit.jupiter.api.BeforeAll; @@ -71,7 +74,7 @@ static void beforeClass() { } private static void watch() { - governanceRepositoryAPI.watch(PipelineNodePath.DATA_PIPELINE_ROOT, event -> { + governanceRepositoryAPI.watchPipeLineRootPath(event -> { if ((PipelineNodePath.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) { EVENT_ATOMIC_REFERENCE.set(event); COUNT_DOWN_LATCH.countDown(); @@ -82,7 +85,7 @@ private static void watch() { @Test void assertIsJobConfigurationExisted() { assertFalse(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); - governanceRepositoryAPI.persist("/pipeline/jobs/foo_job/config", "foo"); + getClusterPersistRepository().persist("/pipeline/jobs/foo_job/config", "foo"); assertTrue(governanceRepositoryAPI.isJobConfigurationExisted("foo_job")); } @@ -114,24 +117,16 @@ void assertPersistJobCheckResult() { @Test void assertDeleteJob() { - governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); + getClusterPersistRepository().persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); governanceRepositoryAPI.deleteJob("1"); Optional actual = governanceRepositoryAPI.getJobItemProgress("1", 0); assertFalse(actual.isPresent()); } - @Test - void assertGetChildrenKeys() { - governanceRepositoryAPI.persist(PipelineNodePath.DATA_PIPELINE_ROOT + "/1", ""); - List actual = governanceRepositoryAPI.getChildrenKeys(PipelineNodePath.DATA_PIPELINE_ROOT); - assertFalse(actual.isEmpty()); - assertTrue(actual.contains("1")); - } - @Test void assertWatch() throws InterruptedException { String key = PipelineNodePath.DATA_PIPELINE_ROOT + "/1"; - governanceRepositoryAPI.persist(key, ""); + getClusterPersistRepository().persist(key, ""); boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS); assertTrue(awaitResult); DataChangedEvent event = EVENT_ATOMIC_REFERENCE.get(); @@ -167,6 +162,11 @@ void assertLatestCheckJobIdPersistenceDeletion() { assertFalse(governanceRepositoryAPI.getLatestCheckJobId(parentJobId).isPresent(), "Expected no checkJobId to be present after deletion"); } + private ClusterPersistRepository getClusterPersistRepository() { + ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager(); + return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository(); + } + private MigrationJobItemContext mockJobItemContext() { MigrationJobItemContext result = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()); MigrationTaskConfiguration taskConfig = result.getTaskConfig(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index 3ba0e7724e8c2..375062ab19799 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; @@ -32,6 +33,8 @@ import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder; import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils; import org.junit.jupiter.api.BeforeAll; @@ -59,7 +62,7 @@ void assertCountAndDataCheck() throws SQLException { jobConfigurationPOJO.setJobName(jobConfig.getJobId()); jobConfigurationPOJO.setShardingTotalCount(1); GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()); - governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO)); + getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO)); governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 0, ""); Map actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null), createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE", null); @@ -68,6 +71,11 @@ void assertCountAndDataCheck() throws SQLException { assertTrue(actual.get(checkKey).isMatched()); } + private ClusterPersistRepository getClusterPersistRepository() { + ContextManager contextManager = PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getContextManager(); + return (ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository(); + } + private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) { return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2"); }