From 11715482689721e9bc025d11a798462aa9762881 Mon Sep 17 00:00:00 2001 From: ggbocoder <119659920+ggbocoder@users.noreply.github.com> Date: Tue, 13 Aug 2024 23:56:44 +0800 Subject: [PATCH] feature: init namingserver client (#6536) --- all/pom.xml | 5 + bom/pom.xml | 5 + changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../java/org/apache/seata/common/XID.java | 1 + .../apache/seata/common/metadata/Node.java | 67 +++ .../metadata/namingserver/Instance.java | 38 +- .../namingserver/NamingServerNode.java | 38 +- .../common/metadata/namingserver/Unit.java | 17 +- .../seata/common/util/HttpClientUtil.java | 41 +- .../metadata/namingserver/InstanceTest.java | 5 +- .../namingserver/NamingServerNodeTest.java | 18 +- .../config/apollo/ApolloConfiguration.java | 2 + discovery/pom.xml | 1 + discovery/seata-discovery-all/pom.xml | 5 + .../discovery/registry/RegistryType.java | 6 +- .../seata-discovery-namingserver/pom.xml | 79 +++ .../registry/namingserver/NamingListener.java | 27 + .../namingserver/NamingRegistryException.java | 49 ++ .../NamingserverRegistryProvider.java | 30 ++ .../NamingserverRegistryServiceImpl.java | 486 ++++++++++++++++++ ....seata.discovery.registry.RegistryProvider | 17 + .../NamingserverRegistryServiceImplTest.java | 349 +++++++++++++ .../src/test/resources/registry.conf | 120 +++++ .../controller/NamingController.java | 21 +- .../namingserver/entity/pojo/ClusterData.java | 2 +- .../namingserver/manager/NamingManager.java | 4 +- .../java/org/apache/seata/server/Server.java | 74 +-- 28 files changed, 1413 insertions(+), 96 deletions(-) create mode 100644 discovery/seata-discovery-namingserver/pom.xml create mode 100644 discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingListener.java create mode 100644 discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingRegistryException.java create mode 100644 discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryProvider.java create mode 100644 discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java create mode 100644 discovery/seata-discovery-namingserver/src/main/resources/META-INF/services/org.apache.seata.discovery.registry.RegistryProvider create mode 100644 discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java create mode 100644 discovery/seata-discovery-namingserver/src/test/resources/registry.conf diff --git a/all/pom.xml b/all/pom.xml index e25f728d8b5..7e4eb9ab293 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -146,6 +146,11 @@ seata-discovery-etcd3 ${project.version} + + org.apache.seata + seata-discovery-namingserver + ${project.version} + org.apache.seata seata-brpc diff --git a/bom/pom.xml b/bom/pom.xml index 514d4f048bb..722c97660f9 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -126,6 +126,11 @@ seata-discovery-zk ${project.version} + + org.apache.seata + seata-discovery-namingserver + ${project.version} + org.apache.seata seata-discovery-redis diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index c97cc6c29f9..230b811e940 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -3,6 +3,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: +- [[#6536](https://github.com/apache/incubator-seata/pull/6536)] support naming server client - [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support - [[#6537](https://github.com/apache/incubator-seata/pull/6537)] support Namingserver - [[#6538](https://github.com/apache/incubator-seata/pull/6538)] Integration of naming server on the Seata server side diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 44aab0dccc6..205feebb489 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -3,6 +3,7 @@ ### feature: +- [[#6536](https://github.com/apache/incubator-seata/pull/6536)] 支持 naming server客户端 - [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容 - [[#6537](https://github.com/apache/incubator-seata/pull/6537)] 支持 Namingserver - [[#6538](https://github.com/apache/incubator-seata/pull/6538)] seata server端集成naming server diff --git a/common/src/main/java/org/apache/seata/common/XID.java b/common/src/main/java/org/apache/seata/common/XID.java index 30c5161a05f..90b68ffb31f 100644 --- a/common/src/main/java/org/apache/seata/common/XID.java +++ b/common/src/main/java/org/apache/seata/common/XID.java @@ -18,6 +18,7 @@ import static org.apache.seata.common.Constants.IP_PORT_SPLIT_CHAR; + /** * The type Xid. * diff --git a/common/src/main/java/org/apache/seata/common/metadata/Node.java b/common/src/main/java/org/apache/seata/common/metadata/Node.java index de77f003812..9d105a5581d 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Node.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Node.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class Node { @@ -28,6 +29,10 @@ public class Node { private Endpoint internal; + private double weight = 1.0; + private boolean healthy = true; + private long timeStamp; + private String group; private ClusterRole role = ClusterRole.MEMBER; @@ -97,6 +102,49 @@ public void setInternal(Endpoint internal) { this.internal = internal; } + @Override + public int hashCode() { + return Objects.hash(control, transaction); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Node node = (Node) o; + return Objects.equals(control, node.control) && Objects.equals(transaction, node.transaction); + } + + + // convert to String + public String toJsonString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append("\"controlEndpoint\": ").append(control.toString()).append(", "); + sb.append("\"transactionEndpoint\": ").append(transaction.toString()).append(", "); + sb.append("\"weight\": ").append(weight).append(", "); + sb.append("\"healthy\": ").append(healthy).append(", "); + sb.append("\"timeStamp\": ").append(timeStamp).append(", "); + sb.append("\"metadata\": {"); + + // handle metadata k-v map + int i = 0; + for (Map.Entry entry : metadata.entrySet()) { + if (i > 0) { + sb.append(", "); + } + sb.append("\"").append(entry.getKey()).append("\": \"").append(entry.getValue()).append("\""); + i++; + } + + sb.append("}}"); + return sb.toString(); + } + public static class Endpoint { private String host; @@ -136,6 +184,25 @@ public String createAddress() { return host + ":" + port; } + @Override + public int hashCode() { + return Objects.hash(host,port,protocol); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Endpoint endpoint = (Endpoint) o; + return Objects.equals(endpoint.host,this.host) + && Objects.equals(endpoint.port,this.port) + && Objects.equals(endpoint.protocol,this.protocol); + } + @Override public String toString() { return "Endpoint{" + "host='" + host + '\'' + ", port=" + port + '}'; diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java index d75599d489a..d08cd622c5d 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java @@ -34,11 +34,12 @@ public class Instance { private String namespace; private String clusterName; private String unit; - private Node.Endpoint controlEndpoint = new Node.Endpoint(); - private Node.Endpoint transactionEndpoint = new Node.Endpoint(); + private Node.Endpoint control = new Node.Endpoint(); + private Node.Endpoint transaction = new Node.Endpoint(); private double weight = 1.0; private boolean healthy = true; private long term; + private long timestamp; private ClusterRole role = ClusterRole.MEMBER; private Map metadata = new HashMap<>(); @@ -83,20 +84,20 @@ public void setRole(ClusterRole role) { this.role = role; } - public Node.Endpoint getControlEndpoint() { - return controlEndpoint; + public Node.Endpoint getControl() { + return control; } - public void setControlEndpoint(Node.Endpoint controlEndpoint) { - this.controlEndpoint = controlEndpoint; + public void setControl(Node.Endpoint control) { + this.control = control; } - public Node.Endpoint getTransactionEndpoint() { - return transactionEndpoint; + public Node.Endpoint getTransaction() { + return transaction; } - public void setTransactionEndpoint(Node.Endpoint transactionEndpoint) { - this.transactionEndpoint = transactionEndpoint; + public void setTransaction(Node.Endpoint transaction) { + this.transaction = transaction; } public double getWeight() { @@ -124,6 +125,14 @@ public void setTerm(long term) { this.term = term; } + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + public void addMetadata(String key, Object value) { this.metadata.put(key, value); } @@ -134,7 +143,7 @@ public void setMetadata(Map metadata) { @Override public int hashCode() { - return Objects.hash(controlEndpoint, transactionEndpoint); + return Objects.hash(control, transaction); } @Override @@ -146,7 +155,7 @@ public boolean equals(Object o) { return false; } Instance instance = (Instance) o; - return Objects.equals(controlEndpoint, instance.controlEndpoint) && Objects.equals(transactionEndpoint, instance.transactionEndpoint); + return Objects.equals(control, instance.control) && Objects.equals(transaction, instance.transaction); } @@ -168,11 +177,12 @@ public Map toMap() { resultMap.put("namespace", namespace); resultMap.put("clusterName", clusterName); resultMap.put("unit", unit); - resultMap.put("controlEndpoint", controlEndpoint.toString()); - resultMap.put("transactionEndpoint", transactionEndpoint.toString()); + resultMap.put("control", control.toString()); + resultMap.put("transaction", transaction.toString()); resultMap.put("weight", String.valueOf(weight)); resultMap.put("healthy", String.valueOf(healthy)); resultMap.put("term", String.valueOf(term)); + resultMap.put("timestamp",String.valueOf(timestamp)); resultMap.put("metadata", mapToJsonString(metadata)); return resultMap; diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java index 391112433bc..5645ac0e3ed 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java @@ -28,6 +28,26 @@ public class NamingServerNode extends Node { private boolean healthy = true; private long term; + public double getWeight() { + return weight; + } + + public boolean isHealthy() { + return healthy; + } + + public void setHealthy(boolean healthy) { + this.healthy = healthy; + } + + public long getTerm() { + return term; + } + + public void setTerm(long term) { + this.term = term; + } + @Override public int hashCode() { return Objects.hash(getControl(), getTransaction()); @@ -45,25 +65,15 @@ public boolean equals(Object o) { return Objects.equals(getControl(), node.getControl()) && Objects.equals(getTransaction(), node.getTransaction()); } - public boolean isTotalEqual(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { + public boolean isChanged(Object obj) { + if (Objects.isNull(obj)) { return false; } - NamingServerNode otherNode = (NamingServerNode) obj; - // check each member variable - return Objects.equals(getControl(), otherNode.getControl()) && - Objects.equals(getTransaction(), otherNode.getTransaction()) && - Double.compare(otherNode.weight, weight) == 0 && - healthy == otherNode.healthy && - Objects.equals(getRole(), otherNode.getRole()) && - term == otherNode.term && - Objects.equals(getMetadata(), otherNode.getMetadata()); + // other node is newer than me + return otherNode.term > term; } // convert to String diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Unit.java b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Unit.java index f85dad24bd5..8128f116d03 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Unit.java +++ b/common/src/main/java/org/apache/seata/common/metadata/namingserver/Unit.java @@ -24,7 +24,7 @@ public class Unit { private String unitName; - private List nodeList; + private List nodeList; public String getUnitName() { return unitName; @@ -34,11 +34,11 @@ public void setUnitName(String unitName) { this.unitName = unitName; } - public List getNamingInstanceList() { + public List getNamingInstanceList() { return nodeList; } - public void setNamingInstanceList(List nodeList) { + public void setNamingInstanceList(List nodeList) { this.nodeList = nodeList; } @@ -51,16 +51,17 @@ public void removeInstance(Node node) { /** * @param node node */ - public void addInstance(NamingServerNode node) { + public boolean addInstance(NamingServerNode node) { if (nodeList.contains(node)) { - Node node1 = nodeList.get(nodeList.indexOf(node)); - if (node.isTotalEqual(node1)) { - return; - } else { + NamingServerNode node1 = nodeList.get(nodeList.indexOf(node)); + if (node1.isChanged(node)) { nodeList.remove(node1); + } else { + return false; } } nodeList.add(node); + return true; } diff --git a/common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java b/common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java index 928fda7bdd4..63a5592acd5 100644 --- a/common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java +++ b/common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java @@ -17,7 +17,6 @@ package org.apache.seata.common.util; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.NameValuePair; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.config.RequestConfig; @@ -52,15 +51,17 @@ public class HttpClientUtil { private static final Map HTTP_CLIENT_MAP = new ConcurrentHashMap<>(); private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER = - new PoolingHttpClientConnectionManager(); + new PoolingHttpClientConnectionManager(); static { POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10); POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10); Runtime.getRuntime().addShutdownHook(new Thread(() -> HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> { try { + //delay 3s, make sure unregister http request send successfully + Thread.sleep(3000); client.close(); - } catch (IOException e) { + } catch (IOException | InterruptedException e) { LOGGER.error(e.getMessage(), e); } }))); @@ -88,10 +89,35 @@ public static CloseableHttpResponse doPost(String url, Map param String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8); StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED); httpPost.setEntity(stringEntity); - } else if (ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) { - ObjectMapper objectMapper = new ObjectMapper(); - String requestBody = objectMapper.writeValueAsString(params); - StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_JSON); + } + } + CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout, + k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER) + .setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout) + .setSocketTimeout(timeout).setConnectTimeout(timeout).build()) + .build()); + return client.execute(httpPost); + } catch (URISyntaxException | ClientProtocolException e) { + LOGGER.error(e.getMessage(), e); + } + return null; + } + + // post request + public static CloseableHttpResponse doPost(String url, String body, Map header, + int timeout) throws IOException { + try { + URIBuilder builder = new URIBuilder(url); + URI uri = builder.build(); + HttpPost httpPost = new HttpPost(uri); + String contentType = ""; + if (header != null) { + header.forEach(httpPost::addHeader); + contentType = header.get("Content-Type"); + } + if (StringUtils.isNotBlank(contentType)) { + if (ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) { + StringEntity stringEntity = new StringEntity(body, ContentType.APPLICATION_JSON); httpPost.setEntity(stringEntity); } } @@ -107,6 +133,7 @@ public static CloseableHttpResponse doPost(String url, Map param return null; } + // get request public static CloseableHttpResponse doGet(String url, Map param, Map header, int timeout) throws IOException { diff --git a/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java b/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java index 77c5e669c11..989d3cc0190 100644 --- a/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java +++ b/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java @@ -37,8 +37,9 @@ void toJsonString() throws JsonProcessingException { mmap.put("k","v"); map.put("k",mmap); instance.setMetadata(map); - instance.setControlEndpoint(new Node.Endpoint("1.1.1.1",888)); - instance.setTransactionEndpoint(new Node.Endpoint("2.2.2.2",999)); + instance.setControl(new Node.Endpoint("1.1.1.1",888)); + instance.setTransaction(new Node.Endpoint("2.2.2.2",999)); + System.out.println(instance.toJsonString()); assertEquals(instance.toJsonString(),objectMapper.writeValueAsString(instance)); } } \ No newline at end of file diff --git a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java index 9e6a025c5ca..8ddeadfaef2 100644 --- a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java +++ b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seata.common.metadata.Node; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -38,7 +39,22 @@ void toJsonString() throws JsonProcessingException { node.setGroup("group"); node.setControl(new Node.Endpoint("1.1.1.1",888)); node.setTransaction(new Node.Endpoint("2.2.2.2",999)); + System.out.println(node.toJsonString()); assertEquals(node.toJsonString(),objectMapper.writeValueAsString(node)); - + } + + @Test + public void testContains() { + NamingServerNode node1 = new NamingServerNode(); + node1.setControl(new Node.Endpoint("111.11.11.1",123)); + node1.setTransaction(new Node.Endpoint("111.11.11.1",124)); + Node node2 = new Node(); + node2.setControl(new Node.Endpoint("111.11.11.1",123)); + node2.setTransaction(new Node.Endpoint("111.11.11.1",124)); + NamingServerNode node3 = new NamingServerNode(); + node3.setControl(new Node.Endpoint("111.11.11.1",123)); + node3.setTransaction(new Node.Endpoint("111.11.11.1",124)); + Assertions.assertFalse(node1.equals(node2)); + Assertions.assertTrue(node1.equals(node3)); } } \ No newline at end of file diff --git a/config/seata-config-apollo/src/main/java/org/apache/seata/config/apollo/ApolloConfiguration.java b/config/seata-config-apollo/src/main/java/org/apache/seata/config/apollo/ApolloConfiguration.java index 815ac6c3166..279b2604e9c 100644 --- a/config/seata-config-apollo/src/main/java/org/apache/seata/config/apollo/ApolloConfiguration.java +++ b/config/seata-config-apollo/src/main/java/org/apache/seata/config/apollo/ApolloConfiguration.java @@ -99,6 +99,7 @@ private ApolloConfiguration() { } } + /** * Gets instance. * @@ -126,6 +127,7 @@ public String getLatestConfig(String dataId, String defaultValue, long timeoutMi return (String) configFuture.get(); } + @Override public boolean putConfig(String dataId, String content, long timeoutMills) { throw new NotSupportYetException("not support putConfig"); diff --git a/discovery/pom.xml b/discovery/pom.xml index 52f43d2a23f..8f5772e57f6 100644 --- a/discovery/pom.xml +++ b/discovery/pom.xml @@ -40,6 +40,7 @@ seata-discovery-zk seata-discovery-redis seata-discovery-nacos + seata-discovery-namingserver seata-discovery-etcd3 seata-discovery-sofa seata-discovery-raft diff --git a/discovery/seata-discovery-all/pom.xml b/discovery/seata-discovery-all/pom.xml index 218c6af9ee7..89c2600b48d 100644 --- a/discovery/seata-discovery-all/pom.xml +++ b/discovery/seata-discovery-all/pom.xml @@ -70,5 +70,10 @@ seata-discovery-sofa ${project.version} + + ${project.groupId} + seata-discovery-namingserver + ${project.version} + diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryType.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryType.java index ac955089dee..20d3d8a7d43 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryType.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryType.java @@ -60,7 +60,11 @@ public enum RegistryType { /** * Custom registry type */ - Custom; + Custom, + /** + * NamingServer registry type + */ + NamingServer; /** * Gets type. diff --git a/discovery/seata-discovery-namingserver/pom.xml b/discovery/seata-discovery-namingserver/pom.xml new file mode 100644 index 00000000000..69bfa9ba621 --- /dev/null +++ b/discovery/seata-discovery-namingserver/pom.xml @@ -0,0 +1,79 @@ + + + + + + org.apache.seata + seata-discovery + ${revision} + + 4.0.0 + seata-discovery-namingserver + seata-discovery-namingserver ${project.version} + discovery-namingserver for Seata built with Maven + + + + org.apache.seata + seata-discovery-core + ${project.version} + + + org.springframework + spring-core + + + org.apache.seata + seata-common + ${project.version} + compile + + + org.springframework.boot + spring-boot-test + test + + + junit + junit + + + org.springframework + spring-test + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.httpcomponents + httpcore + + + org.apache.httpcomponents + httpclient + + + diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingListener.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingListener.java new file mode 100644 index 00000000000..23cc5e19187 --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.discovery.registry.namingserver; + + +public interface NamingListener { + /** + * on event + * + * @param vGroup + */ + void onEvent(String vGroup); +} diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingRegistryException.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingRegistryException.java new file mode 100644 index 00000000000..a893785cf7e --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingRegistryException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.discovery.registry.namingserver; + +public class NamingRegistryException extends RuntimeException { + + /** + * naming registry exception. + * + * @param message the message + */ + public NamingRegistryException(String message) { + super(message); + } + + /** + * naming registry exception. + * + * @param message the message + * @param cause the cause + */ + public NamingRegistryException(String message, Throwable cause) { + super(message, cause); + } + + /** + * naming registry exception. + * + * @param cause the cause + */ + public NamingRegistryException(Throwable cause) { + super(cause); + } + +} diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryProvider.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryProvider.java new file mode 100644 index 00000000000..79e52ecf0fd --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryProvider.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.discovery.registry.namingserver; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.discovery.registry.RegistryProvider; +import org.apache.seata.discovery.registry.RegistryService; + + +@LoadLevel(name = "NamingServer", order = 1) +public class NamingserverRegistryProvider implements RegistryProvider { + @Override + public RegistryService provide() { + return NamingserverRegistryServiceImpl.getInstance(); + } +} diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java new file mode 100644 index 00000000000..d62b3e75bf5 --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.discovery.registry.namingserver; + + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.entity.ContentType; +import org.apache.http.protocol.HTTP; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.namingserver.MetaResponse; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.common.util.HttpClientUtil; +import org.apache.seata.discovery.registry.RegistryService; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +public class NamingserverRegistryServiceImpl implements RegistryService { + private static final Logger LOGGER = LoggerFactory.getLogger(NamingserverRegistryServiceImpl.class); + + public static volatile NamingserverRegistryServiceImpl instance; + private static final String NAMESPACE_KEY = "namespace"; + private static final String VGROUP_KEY = "vGroup"; + private static final String CLIENT_TERM_KEY = "clientTerm"; + private static final String DEFAULT_NAMESPACE = "public"; + private static final String NAMING_SERVICE_URL_KEY = "server-addr"; + private static final String FILE_ROOT_REGISTRY = "registry"; + private static final String FILE_CONFIG_SPLIT_CHAR = "."; + private static final String REGISTRY_TYPE = "namingserver"; + private static final String HTTP_PREFIX = "http://"; + private static final String TIME_OUT_KEY = "timeout"; + + private static final String HEART_BEAT_KEY = "heartbeat-period"; + private static int HEARTBEAT_PERIOD = 30 * 1000; + private static final int HEALTHCHECK_PERIOD = 3 * 1000; + private static final int PULL_PERIOD = 30 * 1000; + private static final int LONG_POLL_TIME_OUT_PERIOD = 28 * 1000; + private static final int THREAD_POOL_NUM = 1; + private static final int HEALTH_CHECK_THRESHOLD = 1; // namingserver is considered unhealthy if failing in healthy check more than 1 times + private volatile long term = 0; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private ScheduledFuture heartBeatScheduledFuture; + private volatile boolean isSubscribed = false; + private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE; + private String namingServerAddressCache; + private static ConcurrentMap AVAILABLE_NAMINGSERVER_MAP = new ConcurrentHashMap<>(); + private static final ConcurrentMap> VGROUP_ADDRESS_MAP = new ConcurrentHashMap<>(); + private static final ConcurrentMap> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>(); + protected final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", THREAD_POOL_NUM, true)); + private final ExecutorService notifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("serviceNamingNotifier", THREAD_POOL_NUM)); + + + private NamingserverRegistryServiceImpl() { + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + String heartBeatKey = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, HEART_BEAT_KEY); + HEARTBEAT_PERIOD = FILE_CONFIG.getInt(heartBeatKey, HEARTBEAT_PERIOD); + List urlList = getNamingAddrs(); + checkAvailableNamingAddr(urlList); + this.executorService.scheduleAtFixedRate(() -> { + checkAvailableNamingAddr(urlList); + }, HEARTBEAT_PERIOD, HEALTHCHECK_PERIOD, TimeUnit.MILLISECONDS); + } + + private void checkAvailableNamingAddr(List urlList) { + for (String url : urlList) { + AtomicInteger unHealthCount = AVAILABLE_NAMINGSERVER_MAP.computeIfAbsent(url, value -> new AtomicInteger(0)); + // do health check + boolean isHealthy = doHealthCheck(url); + int unHealthCountBefore = unHealthCount.get(); + if (!isHealthy) { + unHealthCount.incrementAndGet(); + } else { + unHealthCount.set(0); + AVAILABLE_NAMINGSERVER_MAP.put(url, unHealthCount); + } + // record message that naming server node going online or going offline + int unHealthCountAfter = unHealthCount.get(); + if (!Objects.equals(unHealthCountAfter, 0) && unHealthCountAfter == HEALTH_CHECK_THRESHOLD) { + LOGGER.error("naming server node go offline {}", url); + } + if (!Objects.equals(unHealthCountAfter, unHealthCountBefore) && unHealthCountAfter == 0) { + LOGGER.info("naming server node go online {}", url); + } + } + } + + /** + * Gets instance. + * + * @return the instance + */ + static NamingserverRegistryServiceImpl getInstance() { + + if (instance == null) { + synchronized (NamingserverRegistryServiceImpl.class) { + if (instance == null) { + instance = new NamingserverRegistryServiceImpl(); + } + } + } + return instance; + } + + + @Override + public void register(InetSocketAddress address) throws Exception { + unregister(address); + NetUtil.validAddress(address); + Instance instance = Instance.getInstance(); + instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty")); + + instance.setTimestamp(System.currentTimeMillis()); + doRegister(instance, getNamingAddrs()); + + if (heartBeatScheduledFuture != null && !heartBeatScheduledFuture.isCancelled()) { + heartBeatScheduledFuture.cancel(false); + } + + heartBeatScheduledFuture = this.executorService.scheduleAtFixedRate(() -> { + try { + instance.setTimestamp(System.currentTimeMillis()); + doRegister(instance, getNamingAddrs()); + } catch (Exception e) { + LOGGER.error("Naming server register Exception", e); + } + }, HEARTBEAT_PERIOD, HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS); + + } + + public void doRegister(Instance instance, List urlList) { + for (String urlSuffix : urlList) { + // continue if name server node is unhealthy + if (AVAILABLE_NAMINGSERVER_MAP.computeIfAbsent(urlSuffix, value -> new AtomicInteger(0)).get() >= HEALTH_CHECK_THRESHOLD) { + continue; + } + String url = HTTP_PREFIX + urlSuffix + "/naming/v1/register?"; + String namespace = instance.getNamespace(); + String clusterName = instance.getClusterName(); + String unit = instance.getUnit(); + String jsonBody = instance.toJsonString(); + String params = "namespace=" + namespace + "&clusterName=" + clusterName + "&unit=" + unit; + url += params; + Map header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + + try (CloseableHttpResponse response = HttpClientUtil.doPost(url, jsonBody, header, 3000)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("instance has been registered successfully:{}", statusCode); + } + } else { + LOGGER.warn("instance has been registered unsuccessfully:{}", statusCode); + } + } catch (Exception e) { + LOGGER.error("instance has been registered failed in namingserver {}", url); + } + } + } + + public boolean doHealthCheck(String url) { + url = HTTP_PREFIX + url + "/naming/v1/health"; + Map header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + try (CloseableHttpResponse response = HttpClientUtil.doGet(url, null, header, 3000)) { + int statusCode = response.getStatusLine().getStatusCode(); + return statusCode == 200; + } catch (Exception e) { + return false; + } + } + + @Override + public void unregister(InetSocketAddress address) { + // stop heartbeat + + if (heartBeatScheduledFuture != null && !heartBeatScheduledFuture.isCancelled()) { + heartBeatScheduledFuture.cancel(true); + } + NetUtil.validAddress(address); + Instance instance = Instance.getInstance(); + instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty")); + for (String urlSuffix : getNamingAddrs()) { + String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?"; + String unit = instance.getUnit(); + String jsonBody = instance.toJsonString(); + String params = "unit=" + unit; + url += params; + Map header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + try (CloseableHttpResponse response = HttpClientUtil.doPost(url, jsonBody, header, 3000)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200) { + LOGGER.info("instance has been unregistered successfully:{}", statusCode); + } else { + LOGGER.warn("instance has been unregistered unsuccessfully:{}", statusCode); + } + } catch (Exception e) { + LOGGER.error("instance has been unregistered failed in namingserver {}", url, e); + } + } + } + + @Override + public void subscribe(String cluster, NamingListener listener) throws Exception { + + } + + public void subscribe(NamingListener listener, String vGroup) throws Exception { + LISTENER_SERVICE_MAP.computeIfAbsent(vGroup, key -> new ArrayList<>()).add(listener); + isSubscribed = true; + notifierExecutor.execute(() -> { + long currentTime = System.currentTimeMillis(); + while (isSubscribed) { + try { + // pull + boolean needFetch = System.currentTimeMillis() - currentTime > PULL_PERIOD; + if (!needFetch) { + // push + needFetch = watch(vGroup); + } + if (needFetch) { + for (NamingListener namingListener : LISTENER_SERVICE_MAP.get(vGroup)) { + try { + namingListener.onEvent(vGroup); + } catch (Exception e) { + LOGGER.warn("vGroup {} onEvent wrong {}", vGroup, e); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ignored) { + } + } + } + namingServerAddressCache = null; + currentTime = System.currentTimeMillis(); + } + } catch (Exception ex) { + LOGGER.error("watch failed! ", ex); + try { + Thread.sleep(1000); + } catch (Exception ignore) { + } + } + } + }); + Runtime.getRuntime().addShutdownHook(new Thread(notifierExecutor::shutdown)); + } + + public boolean watch(String vGroup) { + String namingAddr = getNamingAddr(); + String clientAddr = NetUtil.getLocalHost(); + StringBuilder watchAddrBuilder = new StringBuilder(HTTP_PREFIX) + .append(namingAddr) + .append("/naming/v1/watch?") + .append(VGROUP_KEY).append("=").append(vGroup) + .append("&").append(CLIENT_TERM_KEY).append("=").append(term) + .append("&").append(TIME_OUT_KEY).append("=").append(LONG_POLL_TIME_OUT_PERIOD) + .append("&clientAddr=").append(clientAddr); + String watchAddr = watchAddrBuilder.toString(); + + try (CloseableHttpResponse response = HttpClientUtil.doPost(watchAddr, (String) null, null, 30000)) { + if (response != null) { + StatusLine statusLine = response.getStatusLine(); + return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK; + } + } catch (Exception e) { + LOGGER.error("watch failed: {}", e.getMessage()); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ignored) { + } + } + return false; + } + + @Override + public void unsubscribe(String cluster, NamingListener listener) throws Exception { + + } + + public void unsubscribe(NamingListener listener, String vGroup) throws Exception { + // remove watchers + List listeners = LISTENER_SERVICE_MAP.get(vGroup); + if (listeners != null) { + listeners.remove(listener); + if (listeners.isEmpty()) { + LISTENER_SERVICE_MAP.remove(vGroup); + } + } + + // close subscribe thread + isSubscribed = false; + + } + + public void unsubscribe(String vGroup) throws Exception { + LISTENER_SERVICE_MAP.remove(vGroup); + isSubscribed = false; + } + + @Override + public List aliveLookup(String transactionServiceGroup) { + return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>()); + } + + @Override + public List refreshAliveLookup(String transactionServiceGroup, + List aliveAddress) { + return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress); + } + + /** + * @param key vGroup name + * @return List available instance list + * @throws Exception + */ + + + @Override + public List lookup(String key) throws Exception { + if (!isSubscribed) { + // get available instanceList by vGroup + refreshGroup(key); + // subscribe the vGroup + subscribe(vGroup -> { + try { + refreshGroup(vGroup); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, key); + } + + return VGROUP_ADDRESS_MAP.get(key); + } + + + public List refreshGroup(String vGroup) throws IOException { + Map paraMap = new HashMap<>(); + String namingAddr = getNamingAddr(); + paraMap.put(VGROUP_KEY, vGroup); + paraMap.put(NAMESPACE_KEY, getNamespace()); + String url = HTTP_PREFIX + namingAddr + "/naming/v1/discovery"; + Map header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + try (CloseableHttpResponse response = HttpClientUtil.doGet(url, paraMap, header, 3000)) { + if (response == null) { + throw new NamingRegistryException("cannot lookup server list in vgroup: " + vGroup); + } + String jsonResponse = EntityUtils.toString(response.getEntity(), "UTF-8"); + response.close(); + // jsonResponse -> MetaResponse + MetaResponse metaResponse = OBJECT_MAPPER.readValue(jsonResponse, new TypeReference() { + }); + // MetaResponse -> endpoint list + List newAddressList = metaResponse.getClusterList().stream() + .flatMap(cluster -> cluster.getUnitData().stream()) + .flatMap(unit -> unit.getNamingInstanceList().stream()) + .map(namingInstance -> new InetSocketAddress(namingInstance.getTransaction().getHost(), namingInstance.getTransaction().getPort())).collect(Collectors.toList()); + if (metaResponse.getTerm() > 0) { + term = metaResponse.getTerm(); + } + VGROUP_ADDRESS_MAP.put(vGroup, newAddressList); + removeOfflineAddressesIfNecessary(vGroup, newAddressList); + } catch (IOException e) { + e.printStackTrace(); + throw new RemoteException(); + } + + return VGROUP_ADDRESS_MAP.get(vGroup); + } + + @Override + public void close() throws Exception { + + } + + @Override + public String getServiceGroup(String key) { + return RegistryService.super.getServiceGroup(key); + } + + + public String getNamespace() { + String namespaceKey = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, NAMESPACE_KEY); + String namespace = FILE_CONFIG.getConfig(namespaceKey); + if (StringUtils.isBlank(namespace)) { + namespace = DEFAULT_NAMESPACE; + } + return namespace; + } + + + /** + * get one namingserver url + * + * @return url + */ + public String getNamingAddr() { + if (namingServerAddressCache != null) { + return namingServerAddressCache; + } + Map availableNamingserverMap = new HashMap<>(AVAILABLE_NAMINGSERVER_MAP); + List availableNamingserverList = new ArrayList<>(); + for (Map.Entry entry : availableNamingserverMap.entrySet()) { + String namingServerAddress = entry.getKey(); + Integer numberOfFailures = entry.getValue().get(); + + if (numberOfFailures < HEALTH_CHECK_THRESHOLD) { + availableNamingserverList.add(namingServerAddress); + } + } + if (availableNamingserverList.isEmpty()) { + throw new NamingRegistryException("no available namingserver address!"); + } else { + namingServerAddressCache = availableNamingserverList.get(ThreadLocalRandom.current().nextInt(availableNamingserverList.size())); + return namingServerAddressCache; + } + + } + + /** + * get all namingserver urlList + * + * @return url List + */ + public List getNamingAddrs() { + String namingAddrsKey = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, NAMING_SERVICE_URL_KEY); + + String urlListStr = FILE_CONFIG.getConfig(namingAddrsKey); + if (urlListStr.isEmpty()) { + throw new NamingRegistryException("Naming server url can not be null!"); + } + return Arrays.stream(urlListStr.split(",")).collect(Collectors.toList()); + } + + +} \ No newline at end of file diff --git a/discovery/seata-discovery-namingserver/src/main/resources/META-INF/services/org.apache.seata.discovery.registry.RegistryProvider b/discovery/seata-discovery-namingserver/src/main/resources/META-INF/services/org.apache.seata.discovery.registry.RegistryProvider new file mode 100644 index 00000000000..54858d099c7 --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/main/resources/META-INF/services/org.apache.seata.discovery.registry.RegistryProvider @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.discovery.registry.namingserver.NamingserverRegistryProvider diff --git a/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java b/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java new file mode 100644 index 00000000000..44c0af65709 --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.discovery.registry.namingserver; + +import java.net.InetSocketAddress; +import java.rmi.RemoteException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.http.entity.ContentType; +import org.apache.http.protocol.HTTP; +import org.apache.seata.common.holder.ObjectHolder; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.common.util.HttpClientUtil; +import org.apache.seata.discovery.registry.RegistryService; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.env.PropertiesPropertySource; + + +import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Disabled +class NamingserverRegistryServiceImplTest { + + private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE; + + @BeforeAll + public static void beforeClass() throws Exception { + System.setProperty("registry.namingserver.namespace", "dev"); + System.setProperty("registry.namingserver.cluster", "cluster1"); + System.setProperty("registry.namingserver.serverAddr", "127.0.0.1:8080"); + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + + // 获取应用程序环境 + ConfigurableEnvironment environment = context.getEnvironment(); + MutablePropertySources propertySources = environment.getPropertySources(); + Properties customProperties = new Properties(); + customProperties.setProperty("seata.registry.namingserver.server-addr[0]", "127.0.0.1:8080"); + + PropertiesPropertySource customPropertySource = new PropertiesPropertySource("customSource", customProperties); + propertySources.addLast(customPropertySource); + ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment); + + } + + + @Test + public void unregister1() throws Exception { + NamingserverRegistryServiceImpl namingserverRegistryService = NamingserverRegistryServiceImpl.getInstance(); + InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8080); + namingserverRegistryService.register(inetSocketAddress); + namingserverRegistryService.unregister(inetSocketAddress); + } + + + @Test + public void getNamingAddrsTest() { + NamingserverRegistryServiceImpl namingserverRegistryService = NamingserverRegistryServiceImpl.getInstance(); + List list = namingserverRegistryService.getNamingAddrs(); + assertEquals(list.size(), 1); + } + + + @Test + public void getNamingAddrTest() { + NamingserverRegistryServiceImpl namingserverRegistryService = NamingserverRegistryServiceImpl.getInstance(); + String addr = namingserverRegistryService.getNamingAddr(); + assertEquals(addr, "127.0.0.1:8080"); + } + + + @Test + public void convertTest() { + InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8088); + assertEquals(inetSocketAddress.getAddress().getHostAddress(), "127.0.0.1"); + assertEquals(inetSocketAddress.getPort(), 8088); + } + + + @Test + public void testRegister1() throws Exception { + + RegistryService registryService = new NamingserverRegistryProvider().provide(); + + + InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); + //1.register + registryService.register(inetSocketAddress1); + + //2.create vGroup in cluster + createGroupInCluster("dev", "group1", "cluster1"); + + //3.get instances + List list = registryService.lookup("group1"); + + assertEquals(list.size(), 1); + InetSocketAddress inetSocketAddress = list.get(0); + assertEquals(inetSocketAddress.getAddress().getHostAddress(), "127.0.0.1"); + assertEquals(inetSocketAddress.getPort(), 8088); + + registryService.unregister(inetSocketAddress1); + + + } + + + @Test + public void testRegister2() throws Exception { + NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); + InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); + InetSocketAddress inetSocketAddress2 = new InetSocketAddress("127.0.0.1", 8088); + //1.register + registryService.register(inetSocketAddress1); + registryService.register(inetSocketAddress2); + + //2.create vGroup in cluster + String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); + createGroupInCluster(namespace, "group1", "cluster1"); + + //3.get instances + List list = registryService.lookup("group1"); + + assertEquals(list.size(), 1); + + registryService.unregister(inetSocketAddress1); + registryService.unregister(inetSocketAddress2); + registryService.unsubscribe("group1"); + } + + + @Test + public void testRegister3() throws Exception { + NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); + InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); + InetSocketAddress inetSocketAddress2 = new InetSocketAddress("127.0.0.1", 8089); + InetSocketAddress inetSocketAddress3 = new InetSocketAddress("127.0.0.1", 8090); + InetSocketAddress inetSocketAddress4 = new InetSocketAddress("127.0.0.1", 8091); + //1.register + registryService.register(inetSocketAddress1); + registryService.register(inetSocketAddress2); + registryService.register(inetSocketAddress3); + registryService.register(inetSocketAddress4); + + //2.create vGroup in cluster + String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); + createGroupInCluster(namespace, "group2", "cluster1"); + + //3.get instances + List list = registryService.lookup("group2"); + + assertEquals(list.size(), 4); + + registryService.unregister(inetSocketAddress1); + registryService.unregister(inetSocketAddress2); + registryService.unregister(inetSocketAddress3); + registryService.unregister(inetSocketAddress4); + + registryService.unsubscribe("group2"); + + } + + + @Test + public void testUnregister() throws Exception { + RegistryService registryService = new NamingserverRegistryProvider().provide(); + InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); + //1.register + registryService.register(inetSocketAddress1); + + //2.create vGroup in cluster + String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); + createGroupInCluster(namespace, "group1", "cluster1"); + + //3.get instances + List list = registryService.lookup("group1"); + + assertEquals(list.size(), 1); + + //4.unregister + registryService.unregister(inetSocketAddress1); + + //5.get instances + List list1 = registryService.lookup("group1"); + assertEquals(list1.size(), 0); + + } + + + // @Disabled + @Test + public void testWatch() throws Exception { + NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); + + //1.注册cluster1下的一个节点 + InetSocketAddress inetSocketAddress1 = new InetSocketAddress("127.0.0.1", 8088); + registryService.register(inetSocketAddress1); + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + int delaySeconds = 500; + //2.延迟0.5s后在cluster1下创建事务分组group1 + executor.schedule(() -> { + try { + + String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); + createGroupInCluster(namespace, "group1", "cluster1"); + } catch (Exception e) { + throw new RuntimeException(e); + } + executor.shutdown(); // 任务执行后关闭执行器 + }, delaySeconds, TimeUnit.MILLISECONDS); + //3.watch事务分组group1 + long timestamp1 = System.currentTimeMillis(); + boolean needFetch = registryService.watch("group1"); + long timestamp2 = System.currentTimeMillis(); + //4. 0.5s后group1被映射到cluster1下,应该有数据在1s内推送到client端 + assert timestamp2 - timestamp1 < 1500; + + //5. 获取实例 + List list = registryService.lookup("group1"); + registryService.unsubscribe("group1"); + assertEquals(list.size(), 1); + InetSocketAddress inetSocketAddress = list.get(0); + assertEquals(inetSocketAddress.getAddress().getHostAddress(), "127.0.0.1"); + assertEquals(inetSocketAddress.getPort(), 8088); + + + } + + // @Disabled + @Test + public void testSubscribe() throws Exception { + NamingserverRegistryServiceImpl registryService = NamingserverRegistryServiceImpl.getInstance(); + + AtomicBoolean isNotified = new AtomicBoolean(false); + //1.subscribe + registryService.subscribe(vGroup -> { + try { + isNotified.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, "group2"); + + //2.register + InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8088); + registryService.register(inetSocketAddress); + String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); + createGroupInCluster(namespace, "group2", "cluster1"); + + //3.check + assertEquals(isNotified.get(), true); + registryService.unsubscribe("group2"); + } + + + @Test + public void testUnsubscribe() throws Exception { + NamingserverRegistryServiceImpl registryService = (NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide(); + + + NamingListenerimpl namingListenerimpl = new NamingListenerimpl(); + + //1.subscribe + registryService.subscribe(namingListenerimpl, "group1"); + + //2.register + InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8088); + registryService.register(inetSocketAddress); + String namespace = FILE_CONFIG.getConfig("registry.namingserver.namespace"); + createGroupInCluster(namespace, "group1", "cluster1"); + + //3.check + assertEquals(namingListenerimpl.isNotified, true); + namingListenerimpl.setNotified(false); + + //4.unsubscribe + registryService.unsubscribe(namingListenerimpl, "group1"); + + //5.unregister + + registryService.unregister(inetSocketAddress); + //5.check + assertEquals(namingListenerimpl.isNotified, false); + + + } + + + public void createGroupInCluster(String namespace, String vGroup, String clusterName) throws Exception { + Map paraMap = new HashMap<>(); + paraMap.put("namespace", namespace); + paraMap.put("vGroup", vGroup); + paraMap.put("clusterName", clusterName); + String url = "http://127.0.0.1:8080/naming/v1/createGroup"; + Map header = new HashMap<>(); + header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); + try { + CloseableHttpResponse response = HttpClientUtil.doGet(url, paraMap, header, 30000); + } catch (Exception e) { + throw new RemoteException(); + } + } + + private class NamingListenerimpl implements NamingListener { + + public boolean isNotified = false; + + public boolean isNotified() { + return isNotified; + } + + public void setNotified(boolean notified) { + isNotified = notified; + } + + @Override + public void onEvent(String vGroup) { + isNotified = true; + } + } +}; diff --git a/discovery/seata-discovery-namingserver/src/test/resources/registry.conf b/discovery/seata-discovery-namingserver/src/test/resources/registry.conf new file mode 100644 index 00000000000..3190efa3931 --- /dev/null +++ b/discovery/seata-discovery-namingserver/src/test/resources/registry.conf @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +registry { + # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom + type = "namingserver" + + nacos { + application = "seata-server" + serverAddr = "127.0.0.1:8848" + group = "SEATA_GROUP" + namespace = "" + username = "" + password = "" + contextPath = "/foo" + ##if use MSE Nacos with auth, mutex with username/password attribute + #accessKey = "" + #secretKey = "" + ##if use Nacos naming meta-data for SLB service registry, specify nacos address pattern rules here + #slbPattern = "" + } + eureka { + serviceUrl = "http://localhost:8761/eureka" + weight = "1" + } + redis { + serverAddr = "localhost:6379" + db = "0" + password = "" + timeout = "0" + } + zk { + serverAddr = "127.0.0.1:2181" + sessionTimeout = 6000 + connectTimeout = 2000 + username = "" + password = "" + } + consul { + serverAddr = "127.0.0.1:8500" + aclToken = "" + } + etcd3 { + serverAddr = "http://localhost:2379" + } + sofa { + serverAddr = "127.0.0.1:9603" + region = "DEFAULT_ZONE" + datacenter = "DefaultDataCenter" + group = "SEATA_GROUP" + addressWaitTime = "3000" + } + file { + name = "file.conf" + } + custom { + name = "" + } +} + +config { + # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom + type = "file" + + nacos { + serverAddr = "127.0.0.1:8848" + namespace = "" + group = "SEATA_GROUP" + username = "" + password = "" + contextPath = "/bar" + ##if use MSE Nacos with auth, mutex with username/password attribute + #accessKey = "" + #secretKey = "" + dataId = "seata.properties" + } + consul { + serverAddr = "127.0.0.1:8500" + key = "seata.properties" + aclToken = "" + } + apollo { + appId = "seata-server" + apolloMeta = "http://192.168.1.204:8801" + namespace = "application" + apolloAccesskeySecret = "" + } + zk { + serverAddr = "127.0.0.1:2181" + sessionTimeout = 6000 + connectTimeout = 2000 + username = "" + password = "" + nodePath = "/seata/seata.properties" + } + etcd3 { + serverAddr = "http://localhost:2379" + key = "seata.properties" + } + file { + name = "file.conf" + } + custom { + name = "" + } +} diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java index 1d13a6cc7c0..239aec0f216 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java @@ -18,7 +18,6 @@ import org.apache.seata.common.metadata.namingserver.MetaResponse; -import org.apache.seata.common.metadata.Node; import org.apache.seata.common.metadata.namingserver.NamingServerNode; import org.apache.seata.common.result.Result; import org.apache.seata.namingserver.listener.Watcher; @@ -57,9 +56,9 @@ public class NamingController { @PostMapping("/register") public Result registerInstance(@RequestParam String namespace, - @RequestParam String clusterName, - @RequestParam String unit, - @RequestBody NamingServerNode registerBody) { + @RequestParam String clusterName, + @RequestParam String unit, + @RequestBody NamingServerNode registerBody) { Result result = new Result<>(); boolean isSuccess = namingManager.registerInstance(registerBody, namespace, clusterName, unit); if (isSuccess) { @@ -73,7 +72,7 @@ public Result registerInstance(@RequestParam String namespace, @PostMapping("/unregister") public Result unregisterInstance(@RequestParam String unit, - @RequestBody Node registerBody) { + @RequestBody NamingServerNode registerBody) { Result result = new Result<>(); boolean isSuccess = namingManager.unregisterInstance(unit, registerBody); if (isSuccess) { @@ -98,9 +97,9 @@ public MetaResponse discovery(@RequestParam String vGroup, @RequestParam String @PostMapping("/changeGroup") public Result changeGroup(@RequestParam String namespace, - @RequestParam String clusterName, - @RequestParam String unitName, - @RequestParam String vGroup) { + @RequestParam String clusterName, + @RequestParam String unitName, + @RequestParam String vGroup) { Result addGroupResult = namingManager.addGroup(namespace, vGroup, clusterName, unitName); if (!addGroupResult.isSuccess()) { @@ -112,7 +111,7 @@ public Result changeGroup(@RequestParam String namespace, return removeGroupResult; } namingManager.changeGroup(namespace, clusterName, unitName, vGroup); - return new Result<>("200", "change vGroup " + vGroup + "to cluster " + clusterName + "successfully!"); + return new Result<>("200", "change vGroup " + vGroup + "to cluster " + clusterName + " successfully!"); } /** @@ -142,5 +141,9 @@ public List getWatchList() { .collect(Collectors.toList()); } + @GetMapping("/health") + public String healthCheck() { + return "ok"; + } } diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java index c862e1477fc..32dd9f7aa2a 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java @@ -131,7 +131,7 @@ public boolean registerInstance(NamingServerNode instance, String unitName) { } Unit currentUnit = unitData.computeIfAbsent(unitName, value -> { Unit unit = new Unit(); - List instances = new CopyOnWriteArrayList<>(); + List instances = new CopyOnWriteArrayList<>(); unit.setUnitName(unitName); unit.setNamingInstanceList(instances); return unit; diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 07074b6db19..39378f23975 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -140,7 +140,7 @@ public Result addGroup(String namespace, String vGroup, String clusterNa Map header = new HashMap<>(); header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); - try (CloseableHttpResponse closeableHttpResponse = HttpClientUtil.doGet(httpUrl, params, header, 30000)) { + try (CloseableHttpResponse closeableHttpResponse = HttpClientUtil.doGet(httpUrl, params, header, 3000)) { if (closeableHttpResponse == null || closeableHttpResponse.getStatusLine().getStatusCode() != 200) { return new Result<>(String.valueOf(closeableHttpResponse.getStatusLine().getStatusCode()), "add vGroup in new cluster failed"); @@ -170,7 +170,7 @@ public Result removeGroup(String namespace, String vGroup, String unitNa Map header = new HashMap<>(); header.put(HTTP.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.getMimeType()); try (CloseableHttpResponse closeableHttpResponse = - HttpClientUtil.doGet(httpUrl, params, header, 30000)) { + HttpClientUtil.doGet(httpUrl, params, header, 3000)) { if (closeableHttpResponse == null || closeableHttpResponse.getStatusLine().getStatusCode() != 200) { LOGGER.warn("remove vGroup in old cluster failed"); diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java index bdf3bd5307f..f4923cdbdae 100644 --- a/server/src/main/java/org/apache/seata/server/Server.java +++ b/server/src/main/java/org/apache/seata/server/Server.java @@ -65,43 +65,45 @@ public class Server { public static void metadataInit() { - - ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); - - // load node properties - Instance instance = Instance.getInstance(); - // load namespace - String namespace = environment.getProperty(NAMESPACE_KEY, "public"); - instance.setNamespace(namespace); - // load cluster name - String clusterName = environment.getProperty(CLUSTER_NAME_KEY, "default"); - instance.setClusterName(clusterName); - - // load cluster type - String clusterType = String.valueOf(StoreConfig.getSessionMode()); - instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); - - // load unit name - instance.setUnit(String.valueOf(UUID.randomUUID())); - - // load node Endpoint - instance.setControlEndpoint(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); - - // load metadata - for (PropertySource propertySource : environment.getPropertySources()) { - if (propertySource instanceof EnumerablePropertySource) { - EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource) propertySource; - for (String propertyName : enumerablePropertySource.getPropertyNames()) { - if (propertyName.startsWith(META_PREFIX)) { - instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName)); + VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY + + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) { + ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); + + // load node properties + Instance instance = Instance.getInstance(); + // load namespace + String namespace = environment.getProperty(NAMESPACE_KEY, "public"); + instance.setNamespace(namespace); + // load cluster name + String clusterName = environment.getProperty(CLUSTER_NAME_KEY, "default"); + instance.setClusterName(clusterName); + + // load cluster type + String clusterType = String.valueOf(StoreConfig.getSessionMode()); + instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); + + // load unit name + instance.setUnit(String.valueOf(UUID.randomUUID())); + + // load node Endpoint + instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); + + // load metadata + for (PropertySource propertySource : environment.getPropertySources()) { + if (propertySource instanceof EnumerablePropertySource) { + EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource) propertySource; + for (String propertyName : enumerablePropertySource.getPropertyNames()) { + if (propertyName.startsWith(META_PREFIX)) { + instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName)); + } } } } - } - // load vgroup mapping relationship - VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); - instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); + // load vgroup mapping relationship + instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); + } vGroupMappingStoreManager.notifyMapping(); } @@ -157,10 +159,8 @@ public static void start(String[] args) { // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); - if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY - + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) { - metadataInit(); - } + metadataInit(); + nettyRemotingServer.init(); } }