diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java index 9a463d6467848..7a878fd607631 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java @@ -44,6 +44,7 @@ public class RaftRegistryProperties { private Duration cliTimeout = Duration.ofSeconds(5); private Duration refreshLeaderTimeout = Duration.ofSeconds(2); private Duration connectStateCheckInterval = Duration.ofSeconds(2); + private Duration heartBeatTimeOut = Duration.ofSeconds(20); private int subscribeListenerThreadPoolSize = 1; private int connectionListenerThreadPoolSize = 1; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java index 365dbdccc17cf..82426c982b6d2 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegisterClient.java @@ -64,9 +64,9 @@ public interface IRaftRegisterClient extends AutoCloseable { *

* If the key already exists, then update the value. If the key does not exist, then insert a new key-value pair. * - * @param key the key of the register data - * @param value the value to be associated with the key - * @param deleteOnDisconnect if true, the data will be deleted when the client disconnects + * @param key the key of the register data + * @param value the value to be associated with the key + * @param deleteOnDisconnect if true, the key-value pair will be deleted when the client disconnects */ void putRegistryData(String key, String value, boolean deleteOnDisconnect); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java index b59e414458127..0a4cec044126e 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegisterClient.java @@ -21,13 +21,14 @@ import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.plugin.registry.raft.IRaftConnectionStateManager; -import org.apache.dolphinscheduler.plugin.registry.raft.IRaftLockManager; -import org.apache.dolphinscheduler.plugin.registry.raft.IRaftSubscribeDataManager; -import org.apache.dolphinscheduler.plugin.registry.raft.RaftConnectionStateManager; -import org.apache.dolphinscheduler.plugin.registry.raft.RaftLockManager; import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties; -import org.apache.dolphinscheduler.plugin.registry.raft.RaftSubscribeDataManager; +import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftConnectionStateManager; +import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftLockManager; +import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftSubscribeDataManager; +import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftConnectionStateManager; +import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftLockManager; +import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftSubscribeDataManager; +import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.RegistryException; @@ -116,16 +117,22 @@ public void subscribeRaftRegistryDataChange(String path, SubscribeListener liste @Override public String getRegistryDataByKey(String key) { - String value = readUtf8(rheaKvStore.bGet(key)); - if (value == null) { - throw new RegistryException("key does not exist"); + String compositeValue = readUtf8(rheaKvStore.bGet(key)); + if (compositeValue == null) { + throw new RegistryException("key does not exist:" + key); } - return value; + String[] nodeTypeAndValue = compositeValue.split("#"); + if (nodeTypeAndValue.length != 2) { + throw new RegistryException("value format is incorrect for key: " + key + ", value: " + compositeValue); + } + return nodeTypeAndValue[1]; } @Override public void putRegistryData(String key, String value, boolean deleteOnDisconnect) { - rheaKvStore.bPut(key, writeUtf8(value)); + NodeType nodeType = deleteOnDisconnect ? NodeType.EPHEMERAL : NodeType.PERSISTENT; + String compositeValue = nodeType.getName() + "#" + value; + rheaKvStore.bPut(key, writeUtf8(compositeValue)); } @Override diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftConnectionStateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftConnectionStateManager.java similarity index 96% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftConnectionStateManager.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftConnectionStateManager.java index 1851896e351b9..f5c737b212316 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftConnectionStateManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftConnectionStateManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.raft; +package org.apache.dolphinscheduler.plugin.registry.raft.manage; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftLockManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftLockManager.java similarity index 96% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftLockManager.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftLockManager.java index e904dc4b04390..edd264dcc68f6 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftLockManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftLockManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.raft; +package org.apache.dolphinscheduler.plugin.registry.raft.manage; /** * Interface for managing locks in a raft registry client. diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftSubscribeDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftSubscribeDataManager.java similarity index 95% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftSubscribeDataManager.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftSubscribeDataManager.java index f8ab846a72c3d..21f18110c1a8d 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/IRaftSubscribeDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftSubscribeDataManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.raft; +package org.apache.dolphinscheduler.plugin.registry.raft.manage; import org.apache.dolphinscheduler.registry.api.SubscribeListener; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftConnectionStateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftConnectionStateManager.java similarity index 97% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftConnectionStateManager.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftConnectionStateManager.java index 1137880926e9d..54c2b8fdc101e 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftConnectionStateManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftConnectionStateManager.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.raft; +package org.apache.dolphinscheduler.plugin.registry.raft.manage; +import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftLockManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftLockManager.java similarity index 97% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftLockManager.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftLockManager.java index 8cafdf71337ec..7eee5c7263a2f 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftLockManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftLockManager.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.raft; +package org.apache.dolphinscheduler.plugin.registry.raft.manage; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties; import org.apache.dolphinscheduler.plugin.registry.raft.model.RaftLockEntry; import java.util.Map; diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftSubscribeDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java similarity index 57% rename from dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftSubscribeDataManager.java rename to dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java index b02d0db13cef0..eac01f00273c5 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftSubscribeDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java @@ -15,11 +15,16 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.plugin.registry.raft; +package org.apache.dolphinscheduler.plugin.registry.raft.manage; import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.model.BaseHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties; +import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeItem; +import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -76,37 +81,60 @@ public void addDataSubscribeListener(String path, SubscribeListener listener) { private class SubscribeCheckTask implements Runnable { - private final Map oldDataMap = new ConcurrentHashMap<>(); + private final Map oldDataMap = new ConcurrentHashMap<>(); @Override public void run() { - final Map newDataMap = getNodeDataMap(); - if (dataSubScribeMap.isEmpty() || newDataMap.isEmpty()) { - return; - } - // find the different - final Map addedData = new HashMap<>(); - final Map deletedData = new HashMap<>(); - final Map updatedData = new HashMap<>(); - for (Map.Entry entry : newDataMap.entrySet()) { - final String oldData = oldDataMap.get(entry.getKey()); - if (oldData == null) { - addedData.put(entry.getKey(), entry.getValue()); - } else { - if (!oldData.equals(entry.getValue())) { - updatedData.put(entry.getKey(), entry.getValue()); + try { + final Map newDataMap = getNodeDataMap(); + if (dataSubScribeMap.isEmpty() || newDataMap.isEmpty()) { + return; + } + // find the different + final Map addedData = new HashMap<>(); + final Map deletedData = new HashMap<>(); + final Map updatedData = new HashMap<>(); + for (Map.Entry entry : newDataMap.entrySet()) { + final NodeItem oldData = oldDataMap.get(entry.getKey()); + if (oldData == null) { + addedData.put(entry.getKey(), entry.getValue().getNodeValue()); + } else if (NodeType.EPHEMERAL.getName().equals(entry.getValue().getNodeType()) + && isUnHealthy(entry.getValue().getNodeValue())) { + kvStore.bDelete(entry.getKey()); + newDataMap.remove(entry.getKey(), entry.getValue()); + } else if (!oldData.getNodeValue().equals(entry.getValue().getNodeValue())) { + updatedData.put(entry.getKey(), entry.getValue().getNodeValue()); + } + } + for (Map.Entry entry : oldDataMap.entrySet()) { + if (!newDataMap.containsKey(entry.getKey())) { + deletedData.put(entry.getKey(), entry.getValue().getNodeValue()); } } + oldDataMap.clear(); + oldDataMap.putAll(newDataMap); + // trigger listener + triggerListener(addedData, deletedData, updatedData); + } catch (Exception ex) { + log.error("Error in SubscribeCheckTask run method", ex); } - for (Map.Entry entry : oldDataMap.entrySet()) { - if (!newDataMap.containsKey(entry.getKey())) { - deletedData.put(entry.getKey(), entry.getValue()); + } + + private boolean isUnHealthy(String heartBeat) { + try { + // consider this not a valid heartbeat instance, do not check + if (heartBeat == null || !heartBeat.contains("reportTime")) { + return false; } + BaseHeartBeat baseHeartBeat = JSONUtils.parseObject(heartBeat, BaseHeartBeat.class); + if (baseHeartBeat != null) { + return System.currentTimeMillis() - baseHeartBeat.getReportTime() > properties.getHeartBeatTimeOut() + .toMillis(); + } + } catch (Exception ex) { + log.error("Fail to parse heartBeat : {}", heartBeat, ex); } - oldDataMap.clear(); - oldDataMap.putAll(newDataMap); - // trigger listener - triggerListener(addedData, deletedData, updatedData); + return false; } private void triggerListener(Map addedData, Map deletedData, @@ -126,20 +154,41 @@ private void triggerListener(Map addedData, Map } } - private Map getNodeDataMap() { - final Map dataMap = new HashMap<>(); + private Map getNodeDataMap() { + final Map nodeItemMap = new HashMap<>(); final List entryList = kvStore.bScan(RegistryNodeType.ALL_SERVERS.getRegistryPath(), RegistryNodeType.ALL_SERVERS.getRegistryPath() + Constants.SINGLE_SLASH + Constants.RAFT_END_KEY); + for (KVEntry kvEntry : entryList) { final String entryKey = readUtf8(kvEntry.getKey()); - final String entryValue = readUtf8(kvEntry.getValue()); - if (StringUtils.isEmpty(entryValue) + final String compositeValue = readUtf8(kvEntry.getValue()); + + if (StringUtils.isEmpty(compositeValue) || !entryKey.startsWith(RegistryNodeType.ALL_SERVERS.getRegistryPath())) { continue; } - dataMap.put(entryKey, entryValue); + + String[] nodeTypeAndValue = parseCompositeValue(compositeValue); + if (nodeTypeAndValue.length < 2) { + continue; + } + String nodeType = nodeTypeAndValue[0]; + String nodeValue = nodeTypeAndValue[1]; + + nodeItemMap.put(entryKey, NodeItem.builder().nodeValue(nodeValue).nodeType(nodeType).build()); + } + return nodeItemMap; + } + + private String[] parseCompositeValue(String compositeValue) { + String[] nodeTypeAndValue = compositeValue.split("#"); + if (nodeTypeAndValue.length < 2) { + log.error("Invalid compositeValue: {}", compositeValue); + return new String[]{}; } - return dataMap; + String nodeType = nodeTypeAndValue[0]; + String nodeValue = nodeTypeAndValue[1]; + return new String[]{nodeType, nodeValue}; } private void triggerListener(Map nodeDataMap, String subscribeKey, diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeItem.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeItem.java new file mode 100644 index 0000000000000..bc0ecf4eba3a2 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeItem.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.raft.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class NodeItem { + + private String nodeType; + private String nodeValue; +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeType.java new file mode 100644 index 0000000000000..89074579d36f4 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.raft.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public enum NodeType { + + EPHEMERAL("ephemeralNode"), + PERSISTENT("persistentNode"); + private final String name; +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java index 6dfe7c721272c..8e67528be3725 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/register/raft/RaftRegistryTestCase.java @@ -16,25 +16,16 @@ */ package org.apache.dolphinscheduler.plugin.register.raft; -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertThrows; - +import com.google.common.truth.Truth; +import lombok.SneakyThrows; +import org.apache.dolphinscheduler.common.model.BaseHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistry; import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties; import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.SubscribeListener; - -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import lombok.SneakyThrows; - import org.assertj.core.util.Lists; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,7 +34,15 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; -import com.google.common.truth.Truth; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertThrows; @SpringBootTest(classes = RaftRegistryProperties.class) @SpringBootApplication(scanBasePackageClasses = RaftRegistryProperties.class) @@ -87,7 +86,7 @@ public void testConnectUntilTimeout() { @SneakyThrows @Test - public void testSubscribe() { + public void testEphemeralNodeSubscribe() { final AtomicBoolean subscribeAdded = new AtomicBoolean(false); final AtomicBoolean subscribeRemoved = new AtomicBoolean(false); final AtomicBoolean subscribeUpdated = new AtomicBoolean(false); @@ -106,13 +105,19 @@ public void testSubscribe() { }; String key = "/nodes/master/"; + BaseHeartBeat baseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis()) + .host("127.0.0.1:8081") + .build(); + registry.subscribe(key, subscribeListener); - registry.put(key, String.valueOf(System.nanoTime()), true); - // If multiple event occurs in a refresh time, only the last event will be triggered - Thread.sleep(3000); - registry.put(key, String.valueOf(System.nanoTime()), true); + registry.put(key, JSONUtils.toJsonString(baseHeartBeat), true); + Thread.sleep(3000); - registry.delete(key); + BaseHeartBeat newBaseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis()) + .host("127.0.0.1:8081") + .build(); + registry.put(key, JSONUtils.toJsonString(newBaseHeartBeat), true); + Thread.sleep(20000); await().atMost(Duration.ofSeconds(10)) .untilAsserted(() -> { @@ -120,6 +125,62 @@ public void testSubscribe() { Assertions.assertTrue(subscribeUpdated.get()); Assertions.assertTrue(subscribeRemoved.get()); }); + + // verify that the temporary node data has been removed + try { + String currentData = registry.get(key); + } catch (RegistryException ex) { + Assertions.assertEquals("key does not exist:" + key, ex.getMessage(), + "Unexpected registry exception message"); + } + } + + @SneakyThrows + @Test + public void testPersistentNodeSubscribe() { + final AtomicBoolean subscribeAdded = new AtomicBoolean(false); + final AtomicBoolean subscribeRemoved = new AtomicBoolean(false); + final AtomicBoolean subscribeUpdated = new AtomicBoolean(false); + + SubscribeListener subscribeListener = event -> { + System.out.println("Receive event: " + event); + if (event.type() == Event.Type.ADD) { + subscribeAdded.compareAndSet(false, true); + } + if (event.type() == Event.Type.REMOVE) { + subscribeRemoved.compareAndSet(false, true); + } + if (event.type() == Event.Type.UPDATE) { + subscribeUpdated.compareAndSet(false, true); + } + }; + + String key = "/nodes/master/"; + BaseHeartBeat baseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis()) + .host("127.0.0.1:8081") + .build(); + + registry.subscribe(key, subscribeListener); + registry.put(key, JSONUtils.toJsonString(baseHeartBeat), false); + + Thread.sleep(3000); + BaseHeartBeat newBaseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis()) + .host("127.0.0.1:8081") + .build(); + registry.put(key, JSONUtils.toJsonString(newBaseHeartBeat), false); + Thread.sleep(20000); + + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + Assertions.assertTrue(subscribeAdded.get()); + Assertions.assertTrue(subscribeUpdated.get()); + Assertions.assertFalse(subscribeRemoved.get()); + }); + + String currentData = registry.get(key); + Assertions.assertNotNull(currentData, "Node data was unexpectedly removed"); + Assertions.assertEquals(JSONUtils.toJsonString(newBaseHeartBeat), currentData, + "Node data does not match the expected value"); } @SneakyThrows