Skip to content

Commit

Permalink
Rename DataPipelineDataNodeConstants (#28976)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 7, 2023
1 parent 2b9fcc9 commit b3d90fb
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,19 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.common.constant;
package org.apache.shardingsphere.data.pipeline.common.metadata.node;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Data pipeline constants.
* Data pipeline data node constants.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DataPipelineConstants {

/**
* Data pipeline node name.
*/
public static final String DATA_PIPELINE_NODE_NAME = "pipeline";
public final class DataPipelineDataNodeConstants {

/**
* Data pipeline root path.
*/
public static final String DATA_PIPELINE_ROOT = "/" + DATA_PIPELINE_NODE_NAME;
public static final String DATA_PIPELINE_ROOT = "/pipeline";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;

import java.util.regex.Pattern;
Expand All @@ -30,7 +29,7 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineMetaDataNode {

private static final String JOB_PATTERN_PREFIX = DataPipelineConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)";
private static final String JOB_PATTERN_PREFIX = DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/jobs/(j\\d{2}\\d{2}[0-9a-z]+)";

public static final Pattern CONFIG_PATTERN = Pattern.compile(JOB_PATTERN_PREFIX + "/config");

Expand All @@ -48,8 +47,8 @@ public static String getMetaDataDataSourcesPath(final JobType jobType) {

private static String getMetaDataRootPath(final JobType jobType) {
return null == jobType
? String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "metadata")
: String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata");
? String.join("/", DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, "metadata")
: String.join("/", DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, jobType.getType().toLowerCase(), "metadata");
}

/**
Expand All @@ -73,7 +72,7 @@ public static String getElasticJobNamespace() {
}

private static String getJobsPath() {
return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, "jobs");
return String.join("/", DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, "jobs");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
Expand Down Expand Up @@ -51,7 +50,7 @@ public final class PipelineMetaDataNodeWatcher {
private PipelineMetaDataNodeWatcher(final PipelineContextKey contextKey) {
listenerMap.putAll(ShardingSphereServiceLoader.getServiceInstances(PipelineMetaDataChangedEventHandler.class)
.stream().collect(Collectors.toMap(PipelineMetaDataChangedEventHandler::getKeyPattern, each -> each)));
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).watch(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, this::dispatchEvent);
}

private void dispatchEvent(final DataChangedEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.common.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.DataPipelineDataNodeConstants;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
Expand Down Expand Up @@ -70,8 +70,8 @@ static void beforeClass() {
}

private static void watch() {
governanceRepositoryAPI.watch(DataPipelineConstants.DATA_PIPELINE_ROOT, event -> {
if ((DataPipelineConstants.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) {
governanceRepositoryAPI.watch(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT, event -> {
if ((DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1").equals(event.getKey())) {
EVENT_ATOMIC_REFERENCE.set(event);
COUNT_DOWN_LATCH.countDown();
}
Expand Down Expand Up @@ -114,23 +114,23 @@ void assertPersistJobCheckResult() {

@Test
void assertDeleteJob() {
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.persist(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1", "");
governanceRepositoryAPI.deleteJob("1");
Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
assertFalse(actual.isPresent());
}

@Test
void assertGetChildrenKeys() {
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", "");
List<String> actual = governanceRepositoryAPI.getChildrenKeys(DataPipelineConstants.DATA_PIPELINE_ROOT);
governanceRepositoryAPI.persist(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1", "");
List<String> actual = governanceRepositoryAPI.getChildrenKeys(DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT);
assertFalse(actual.isEmpty());
assertTrue(actual.contains("1"));
}

@Test
void assertWatch() throws InterruptedException {
String key = DataPipelineConstants.DATA_PIPELINE_ROOT + "/1";
String key = DataPipelineDataNodeConstants.DATA_PIPELINE_ROOT + "/1";
governanceRepositoryAPI.persist(key, "");
boolean awaitResult = COUNT_DOWN_LATCH.await(10, TimeUnit.SECONDS);
assertTrue(awaitResult);
Expand Down

0 comments on commit b3d90fb

Please sign in to comment.