From dc08160e2d4ed1ae6f04f71f8dc079ea3acf4a67 Mon Sep 17 00:00:00 2001 From: PeppaO <39424591+PeppaO@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:59:52 +0800 Subject: [PATCH 1/6] feature: Raft cluster mode supports address translation (#7069) --- changes/en-us/2.x.md | 4 +- changes/zh-cn/2.x.md | 3 +- .../seata/common/ConfigurationKeys.java | 11 ++ .../exception/ParseEndpointException.java | 38 ++++++ .../apache/seata/common/metadata/Node.java | 86 ++++++++++++- .../org/apache/seata/common/util/NetUtil.java | 16 +++ .../raft/RaftRegistryServiceImpl.java | 117 ++++++++++++++++-- .../raft/RaftRegistryServiceImplTest.java | 34 ++++- script/client/conf/registry.conf | 4 +- script/client/spring/application.properties | 4 + script/client/spring/application.yml | 3 + .../SeataCoreEnvironmentPostProcessor.java | 3 + .../boot/autoconfigure/StarterConstants.java | 2 + .../registry/RegistryMetadataProperties.java | 36 ++++++ .../sync/msg/dto/RaftClusterMetadata.java | 15 ++- 15 files changed, 356 insertions(+), 20 deletions(-) create mode 100644 common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java create mode 100644 seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 73676753a75..1ff5b892226 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -4,7 +4,8 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: -- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] support XXX +- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation + ### bugfix: @@ -35,5 +36,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [PeppaO](https://github.com/PeppaO) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. \ No newline at end of file diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 7e3d9a5938c..f38d504d3fa 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -4,7 +4,7 @@ ### feature: -- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 支持XXX +- [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 ### bugfix: @@ -35,5 +35,6 @@ - [slievrly](https://github.com/slievrly) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) +- [PeppaO](https://github.com/PeppaO) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 \ No newline at end of file diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index 840e0298004..3bb4c9873ff 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -1110,4 +1110,15 @@ public interface ConfigurationKeys { * The constant META_PREFIX */ String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata."; + + /** + * The constant SERVER_REGISTRY_METADATA_PREFIX + */ + String SERVER_REGISTRY_METADATA_PREFIX = SERVER_PREFIX + FILE_ROOT_REGISTRY + ".metadata"; + + /** + * The constant SERVER_REGISTRY_METADATA_EXTERNAL + */ + String SERVER_REGISTRY_METADATA_EXTERNAL = SERVER_REGISTRY_METADATA_PREFIX + ".external"; + } diff --git a/common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java b/common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java new file mode 100644 index 00000000000..e4f550d5480 --- /dev/null +++ b/common/src/main/java/org/apache/seata/common/exception/ParseEndpointException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.common.exception; + +public class ParseEndpointException extends RuntimeException { + public ParseEndpointException() { + } + + public ParseEndpointException(String message) { + super(message); + } + + public ParseEndpointException(String message, Throwable cause) { + super(message, cause); + } + + public ParseEndpointException(Throwable cause) { + super(cause); + } + + public ParseEndpointException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/common/src/main/java/org/apache/seata/common/metadata/Node.java b/common/src/main/java/org/apache/seata/common/metadata/Node.java index 92d43c366f8..bcc85a96962 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Node.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Node.java @@ -18,11 +18,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; - +import org.apache.seata.common.exception.ParseEndpointException; import java.util.HashMap; import java.util.Map; import java.util.Objects; - +import java.util.List; +import java.util.ArrayList; public class Node { @@ -195,4 +196,85 @@ public String toString() { } } + private Node.ExternalEndpoint createExternalEndpoint(String host, int controllerPort, int transactionPort) { + return new Node.ExternalEndpoint(host, controllerPort, transactionPort); + } + + public List createExternalEndpoints(String external) { + List externalEndpoints = new ArrayList<>(); + String[] split = external.split(","); + + for (String s : split) { + String[] item = s.split(":"); + if (item.length == 3) { + try { + String host = item[0]; + int controllerPort = Integer.parseInt(item[1]); + int transactionPort = Integer.parseInt(item[2]); + externalEndpoints.add(createExternalEndpoint(host, controllerPort, transactionPort)); + } catch (NumberFormatException e) { + throw new ParseEndpointException("Invalid port number in: " + s); + } + } else { + throw new ParseEndpointException("Invalid format for endpoint: " + s); + } + } + return externalEndpoints; + } + + public Map updateMetadataWithExternalEndpoints(Map metadata, List externalEndpoints) { + Object obj = metadata.get("external"); + if (obj == null) { + if (!externalEndpoints.isEmpty()) { + Map metadataMap = new HashMap<>(metadata); + metadataMap.put("external", externalEndpoints); + return metadataMap; + } + return metadata; + } + if (obj instanceof List) { + List oldList = (List) obj; + oldList.addAll(externalEndpoints); + return metadata; + } else { + throw new ParseEndpointException("Metadata 'external' is not a List."); + } + } + + public static class ExternalEndpoint { + + private String host; + private int controlPort; + private int transactionPort; + + public ExternalEndpoint(String host, int controlPort, int transactionPort) { + this.host = host; + this.controlPort = controlPort; + this.transactionPort = transactionPort; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getControlPort() { + return controlPort; + } + + public void setControlPort(int controlPort) { + this.controlPort = controlPort; + } + + public int getTransactionPort() { + return transactionPort; + } + + public void setTransactionPort(int transactionPort) { + this.transactionPort = transactionPort; + } + } } diff --git a/common/src/main/java/org/apache/seata/common/util/NetUtil.java b/common/src/main/java/org/apache/seata/common/util/NetUtil.java index 008f0839bef..2043f85c79a 100644 --- a/common/src/main/java/org/apache/seata/common/util/NetUtil.java +++ b/common/src/main/java/org/apache/seata/common/util/NetUtil.java @@ -90,9 +90,25 @@ public static String toIpAddress(SocketAddress address) { * @return the string */ public static String toStringAddress(InetSocketAddress address) { + if (address.getAddress() == null) { + return address.getHostString() + ":" + address.getPort(); + } return address.getAddress().getHostAddress() + ":" + address.getPort(); } + /** + * To string host string. + * + * @param address the address + * @return the string + */ + public static String toStringHost(InetSocketAddress address) { + if (address.getAddress() == null) { + return address.getHostString(); + } + return address.getAddress().getHostAddress(); + } + /** * To inet socket address inet socket address. * diff --git a/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java b/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java index 8ba0d2256ed..06bed7b639d 100644 --- a/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java +++ b/discovery/seata-discovery-raft/src/main/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImpl.java @@ -24,6 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; +import java.util.Optional; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; @@ -38,6 +41,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.exception.AuthenticationFailedException; +import org.apache.seata.common.exception.NotSupportYetException; +import org.apache.seata.common.exception.ParseEndpointException; import org.apache.seata.common.exception.RetryableException; import org.apache.seata.common.metadata.Metadata; import org.apache.seata.common.metadata.MetadataResponse; @@ -45,6 +50,7 @@ import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.HttpClientUtil; +import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigChangeListener; import org.apache.seata.config.Configuration; @@ -114,10 +120,13 @@ public class RaftRegistryServiceImpl implements RegistryService> ALIVE_NODES = new ConcurrentHashMap<>(); + private static final String PREFERRED_NETWORKS; + static { TOKEN_EXPIRE_TIME_IN_MILLISECONDS = CONFIG.getLong(getTokenExpireTimeInMillisecondsKey(), 29 * 60 * 1000L); USERNAME = CONFIG.getConfig(getRaftUserNameKey()); PASSWORD = CONFIG.getConfig(getRaftPassWordKey()); + PREFERRED_NETWORKS = CONFIG.getConfig(getPreferredNetworks()); } private RaftRegistryServiceImpl() { @@ -221,7 +230,7 @@ private static String queryHttpAddress(String clusterName, String group) { List inetSocketAddresses = ALIVE_NODES.get(CURRENT_TRANSACTION_SERVICE_GROUP); if (CollectionUtils.isEmpty(inetSocketAddresses)) { addressList = - nodeList.stream().map(node -> node.getControl().createAddress()).collect(Collectors.toList()); + nodeList.stream().map(RaftRegistryServiceImpl::selectControlEndpointStr).collect(Collectors.toList()); } else { stream = inetSocketAddresses.stream(); } @@ -234,15 +243,20 @@ private static String queryHttpAddress(String clusterName, String group) { Map map = new HashMap<>(); if (CollectionUtils.isNotEmpty(nodeList)) { for (Node node : nodeList) { - map.put(new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()).getAddress().getHostAddress() - + IP_PORT_SPLIT_CHAR + node.getTransaction().getPort(), node); + InetSocketAddress inetSocketAddress = selectTransactionEndpoint(node); + map.put(inetSocketAddress.getHostString() + + IP_PORT_SPLIT_CHAR + inetSocketAddress.getPort(), node); } } addressList = stream.map(inetSocketAddress -> { - String host = inetSocketAddress.getAddress().getHostAddress(); + String host = NetUtil.toStringHost(inetSocketAddress); Node node = map.get(host + IP_PORT_SPLIT_CHAR + inetSocketAddress.getPort()); + InetSocketAddress controlEndpoint = null; + if (node != null) { + controlEndpoint = selectControlEndpoint(node); + } return host + IP_PORT_SPLIT_CHAR - + (node != null ? node.getControl().getPort() : inetSocketAddress.getPort()); + + (controlEndpoint != null ? controlEndpoint.getPort() : inetSocketAddress.getPort()); }).collect(Collectors.toList()); return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size())); } @@ -263,6 +277,11 @@ private static String getRaftPassWordKey() { REGISTRY_TYPE, PRO_PASSWORD_KEY); } + private static String getPreferredNetworks() { + return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, + "preferredNetworks"); + } + private static String getTokenExpireTimeInMillisecondsKey() { return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, TOKEN_VALID_TIME_MS_KEY); @@ -276,9 +295,85 @@ private static boolean isTokenExpired() { return System.currentTimeMillis() >= tokenExpiredTime; } - private InetSocketAddress convertInetSocketAddress(Node node) { - Node.Endpoint endpoint = node.getTransaction(); - return new InetSocketAddress(endpoint.getHost(), endpoint.getPort()); + private static String selectControlEndpointStr(Node node) { + InetSocketAddress control = selectControlEndpoint(node); + return NetUtil.toStringAddress(control); + } + + private static String selectTransactionEndpointStr(Node node) { + InetSocketAddress transaction = selectTransactionEndpoint(node); + return NetUtil.toStringAddress(transaction); + } + + private static InetSocketAddress selectControlEndpoint(Node node) { + return selectEndpoint("control", node); + } + + private static InetSocketAddress selectTransactionEndpoint(Node node) { + return selectEndpoint("transaction", node); + } + + private static InetSocketAddress selectEndpoint(String type, Node node) { + if (StringUtils.isBlank(PREFERRED_NETWORKS)) { + // Use the default method, directly using node.control and node.transaction + switch (type) { + case "control": + return new InetSocketAddress(node.getControl().getHost(), node.getControl().getPort()); + case "transaction": + return new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()); + default: + throw new NotSupportYetException("SelectEndpoint is not support type: " + type); + } + } + Node.ExternalEndpoint externalEndpoint = selectExternalEndpoint(node, PREFERRED_NETWORKS.split(";")); + switch (type) { + case "control": + return new InetSocketAddress(externalEndpoint.getHost(), externalEndpoint.getControlPort()); + case "transaction": + return new InetSocketAddress(externalEndpoint.getHost(), externalEndpoint.getTransactionPort()); + default: + throw new NotSupportYetException("SelectEndpoint is not support type: " + type); + } + } + + private static Node.ExternalEndpoint selectExternalEndpoint(Node node, String[] preferredNetworks) { + Map metadata = node.getMetadata(); + if (CollectionUtils.isEmpty(metadata)) { + throw new ParseEndpointException("Node metadata is empty."); + } + + Object external = metadata.get("external"); + + if (external instanceof List) { + List> externalEndpoints = (List>) external; + + if (CollectionUtils.isEmpty(externalEndpoints)) { + throw new ParseEndpointException("ExternalEndpoints should not be empty."); + } + + for (LinkedHashMap externalEndpoint : externalEndpoints) { + String ip = Optional.ofNullable(externalEndpoint.get("host")) + .map(Object::toString) + .orElse(""); + + if (isPreferredNetwork(ip, Arrays.asList(preferredNetworks))) { + return createExternalEndpoint(externalEndpoint, ip); + } + } + } + throw new ParseEndpointException("No ExternalEndpoints value matches."); + } + + private static boolean isPreferredNetwork(String ip, List preferredNetworks) { + return preferredNetworks.stream().anyMatch(regex -> + StringUtils.isNotBlank(regex) && (ip.matches(regex) || ip.startsWith(regex)) + ); + } + + private static Node.ExternalEndpoint createExternalEndpoint(LinkedHashMap externalEndpoint, String ip) { + int controlPort = Integer.parseInt(externalEndpoint.get("controlPort").toString()); + int transactionPort = Integer.parseInt(externalEndpoint.get("transactionPort").toString()); + return new Node.ExternalEndpoint(ip, controlPort, transactionPort); } @Override @@ -292,7 +387,7 @@ public List aliveLookup(String transactionServiceGroup) { String clusterName = getServiceGroup(transactionServiceGroup); Node leader = METADATA.getLeader(clusterName); if (leader != null) { - return Collections.singletonList(convertInetSocketAddress(leader)); + return Collections.singletonList(selectTransactionEndpoint(leader)); } } return RegistryService.super.aliveLookup(transactionServiceGroup); @@ -340,7 +435,7 @@ public List refreshAliveLookup(String transactionServiceGroup List aliveAddress) { if (METADATA.isRaftMode()) { Node leader = METADATA.getLeader(getServiceGroup(transactionServiceGroup)); - InetSocketAddress leaderAddress = convertInetSocketAddress(leader); + InetSocketAddress leaderAddress = selectTransactionEndpoint(leader); return ALIVE_NODES.put(transactionServiceGroup, aliveAddress.isEmpty() ? aliveAddress : aliveAddress.parallelStream().filter(inetSocketAddress -> { // Since only follower will turn into leader, only the follower node needs to be listened to @@ -478,7 +573,7 @@ public List lookup(String key) throws Exception { } List nodes = METADATA.getNodes(clusterName); if (CollectionUtils.isNotEmpty(nodes)) { - return nodes.parallelStream().map(this::convertInetSocketAddress).collect(Collectors.toList()); + return nodes.parallelStream().map(RaftRegistryServiceImpl::selectTransactionEndpoint).collect(Collectors.toList()); } return Collections.emptyList(); } diff --git a/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java b/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java index e6c88fcc5c0..45b675dbe56 100644 --- a/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java +++ b/discovery/seata-discovery-raft/src/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java @@ -17,13 +17,18 @@ package org.apache.seata.discovery.registry.raft; -import org.apache.seata.common.util.HttpClientUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seata.common.metadata.MetadataResponse; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.util.*; import org.apache.seata.config.ConfigurationFactory; import org.apache.http.HttpStatus; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.StringEntity; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; @@ -33,7 +38,7 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Map; +import java.util.*; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -53,6 +58,7 @@ public static void beforeClass() { System.setProperty("registry.raft.password", "seata"); System.setProperty("registry.raft.serverAddr", "127.0.0.1:8092"); System.setProperty("registry.raft.tokenValidityInMilliseconds", "10000"); + System.setProperty("registry.preferredNetworks", "10.10.*"); ConfigurationFactory.getInstance(); } @@ -145,4 +151,28 @@ public void testSecureTTL() throws NoSuchMethodException, InvocationTargetExcept assertEquals(true, rst); } + /** + * RaftRegistryServiceImpl#controlEndpointStr() + * RaftRegistryServiceImpl#transactionEndpointStr() + */ + @Test + public void testSelectEndpoint() throws JsonProcessingException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + String jsonString = "{\"nodes\":[{\"control\":{\"host\":\"v-0.svc-l.default.svc.cluster.local\",\"port\":7091},\"transaction\":{\"host\":\"v-0.svc-l.default.svc.cluster.local\",\"port\":8091},\"internal\":{\"host\":\"v-0.svc-l.default.svc.cluster.local\",\"port\":9091},\"group\":\"default\",\"role\":\"LEADER\",\"version\":\"2.3.0-SNAPSHOT\",\"metadata\":{\"external\":[{\"host\":\"192.168.105.7\",\"controlPort\":30071,\"transactionPort\":30091},{\"host\":\"10.10.105.7\",\"controlPort\":30071,\"transactionPort\":30091}]}},{\"control\":{\"host\":\"v-2.svc-l.default.svc.cluster.local\",\"port\":7091},\"transaction\":{\"host\":\"v-2.svc-l.default.svc.cluster.local\",\"port\":8091},\"internal\":{\"host\":\"v-2.svc-l.default.svc.cluster.local\",\"port\":9091},\"group\":\"default\",\"role\":\"FOLLOWER\",\"version\":\"2.3.0-SNAPSHOT\",\"metadata\":{\"external\":[{\"host\":\"192.168.105.7\",\"controlPort\":30073,\"transactionPort\":30093},{\"host\":\"10.10.105.7\",\"controlPort\":30073,\"transactionPort\":30093}]}},{\"control\":{\"host\":\"v-1.svc-l.default.svc.cluster.local\",\"port\":7091},\"transaction\":{\"host\":\"v-1.svc-l.default.svc.cluster.local\",\"port\":8091},\"internal\":{\"host\":\"v-1.svc-l.default.svc.cluster.local\",\"port\":9091},\"group\":\"default\",\"role\":\"FOLLOWER\",\"version\":\"2.3.0-SNAPSHOT\",\"metadata\":{\"external\":[{\"host\":\"192.168.105.7\",\"controlPort\":30072,\"transactionPort\":30092},{\"host\":\"10.10.105.7\",\"controlPort\":30072,\"transactionPort\":30092}]}}],\"storeMode\":\"raft\",\"term\":1}"; + + Method selectControlEndpointStrMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("selectControlEndpointStr", Node.class); + selectControlEndpointStrMethod.setAccessible(true); + Method selectTransactionEndpointStrMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("selectTransactionEndpointStr", Node.class); + selectTransactionEndpointStrMethod.setAccessible(true); + + ObjectMapper objectMapper = new ObjectMapper(); + MetadataResponse metadataResponse = objectMapper.readValue(jsonString, MetadataResponse.class); + List nodes = metadataResponse.getNodes(); + + for (Node node : nodes) { + String controlEndpointStr = (String) selectControlEndpointStrMethod.invoke(null, node);; + String transactionEndpointStr = (String) selectTransactionEndpointStrMethod.invoke(null, node);; + Assertions.assertTrue(controlEndpointStr.contains("10.10.105.7:3007")); + Assertions.assertTrue(transactionEndpointStr.contains("10.10.105.7:3009")); + } + } } diff --git a/script/client/conf/registry.conf b/script/client/conf/registry.conf index 1aabbc507f6..d21f03f7232 100644 --- a/script/client/conf/registry.conf +++ b/script/client/conf/registry.conf @@ -18,7 +18,9 @@ registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft、seata type = "file" - + # Supports address translation parameters, currently only supported in raft mode, + # if match the preferredNetworks rule return the first, eg: preferredNetworks = "192.168.*" + preferredNetworks = "" raft { metadata-max-age-ms = 30000 serverAddr = "127.0.0.1:7091" diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index 2a72d1e5f79..620cbd9714d 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -123,6 +123,10 @@ seata.config.custom.name= seata.registry.type=file +# Supports address translation parameters, currently only supported in raft mode? +# if match the preferredNetworks rule return the first, eg: preferredNetworks = "192.168.*" +seata.registry.preferredNetworks = "" + seata.registry.raft.server-addr= seata.registry.raft.metadata-max-age-ms=30000 seata.registry.raft.username=seata diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index a6100f05740..580cb0180e0 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -132,6 +132,9 @@ seata: name: registry: type: file + # Supports address translation parameters, currently only supported in raft mode, + # if match the preferredNetworks rule return the first, eg: preferredNetworks = "192.168.*" + preferredNetworks: "" seata: server-addr: 127.0.0.1:8081 namespace: public diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java index 67bc7bd75ab..bfa4923469f 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/SeataCoreEnvironmentPostProcessor.java @@ -40,6 +40,7 @@ import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryRedisProperties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistrySofaProperties; import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryZooKeeperProperties; +import org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryMetadataProperties; import org.springframework.boot.SpringApplication; import org.springframework.boot.env.EnvironmentPostProcessor; import org.springframework.core.Ordered; @@ -69,6 +70,7 @@ import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SHUTDOWN_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.THREAD_FACTORY_PREFIX; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.TRANSPORT_PREFIX; +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_METADATA_PREFIX; public class SeataCoreEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered { @@ -90,6 +92,7 @@ public static void init() { PROPERTY_BEAN_MAP.put(CONFIG_PREFIX, ConfigProperties.class); PROPERTY_BEAN_MAP.put(CONFIG_FILE_PREFIX, ConfigFileProperties.class); PROPERTY_BEAN_MAP.put(REGISTRY_PREFIX, RegistryProperties.class); + PROPERTY_BEAN_MAP.put(REGISTRY_METADATA_PREFIX, RegistryMetadataProperties.class); PROPERTY_BEAN_MAP.put(CONFIG_NACOS_PREFIX, ConfigNacosProperties.class); PROPERTY_BEAN_MAP.put(CONFIG_CONSUL_PREFIX, ConfigConsulProperties.class); diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java index 5ec088d43ff..db09911af18 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/StarterConstants.java @@ -58,6 +58,8 @@ public interface StarterConstants { String REGISTRY_SOFA_PREFIX = REGISTRY_PREFIX + ".sofa"; String REGISTRY_CUSTOM_PREFIX = REGISTRY_PREFIX + ".custom"; + String REGISTRY_METADATA_PREFIX = REGISTRY_PREFIX + ".metadata"; + String CONFIG_PREFIX = SEATA_PREFIX + ".config"; String CONFIG_NACOS_PREFIX = CONFIG_PREFIX + ".nacos"; String CONFIG_CONSUL_PREFIX = CONFIG_PREFIX + ".consul"; diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java new file mode 100644 index 00000000000..595b322b79a --- /dev/null +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/registry/RegistryMetadataProperties.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.spring.boot.autoconfigure.properties.registry; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_METADATA_PREFIX; + +@Component +@ConfigurationProperties(prefix = REGISTRY_METADATA_PREFIX) +public class RegistryMetadataProperties { + private String external; + + public String getExternal() { + return external; + } + + public RegistryMetadataProperties setExternal(String external) { + this.external = external; + return this; + } +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java index 2e3e217a96c..7f51ff9efac 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java @@ -21,10 +21,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; - +import org.apache.seata.common.holder.ObjectHolder; import org.apache.seata.common.metadata.Node; import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.protocol.Version; +import org.springframework.core.env.ConfigurableEnvironment; +import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; /** */ @@ -55,7 +57,16 @@ public Node createNode(String host, int txPort, int internalPort, int controlPor node.setGroup(group); node.setVersion(Version.getCurrent()); node.setInternal(node.createEndpoint(host, internalPort, "raft")); - Optional.ofNullable(metadata).ifPresent(node::setMetadata); + ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); + String seataRegistryMetadataExternalValue = environment.resolvePlaceholders("${SEATA_REGISTRY_METADATA_EXTERNAL:${seata.registry.metadata.external:}}"); + if (metadata != null) { + if (StringUtils.isNotEmpty(seataRegistryMetadataExternalValue)) { + Map newMetadata = node.updateMetadataWithExternalEndpoints(metadata, node.createExternalEndpoints(seataRegistryMetadataExternalValue)); + Optional.ofNullable(newMetadata).ifPresent(node::setMetadata); + } else { + node.setMetadata(metadata); + } + } return node; } From 6310ce3307a8154f61060f2fb2b051693f5e360d Mon Sep 17 00:00:00 2001 From: funkye Date: Fri, 3 Jan 2025 09:59:47 +0800 Subject: [PATCH 2/6] test: fix the issue of NacosMockTest failing to run (#7092) --- changes/en-us/2.x.md | 3 ++- changes/zh-cn/2.x.md | 3 ++- .../org/apache/seata/config/nacos/NacosMockTest.java | 11 ++++++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 1ff5b892226..1e957dd4a1a 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -23,7 +23,7 @@ Add changes here for all PR submitted to the 2.x branch. ### test: -- [[#PR_NO](https://github.com/apache/incubator-seata/pull/PR_NO)] test XXX +- [[#7092](https://github.com/apache/incubator-seata/pull/7092)] fix the issue of NacosMockTest failing to run ### refactor: @@ -37,5 +37,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) - [PeppaO](https://github.com/PeppaO) +- [funky-eyes](https://github.com/funky-eyes) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. \ No newline at end of file diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index f38d504d3fa..343f4a59fe5 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -22,7 +22,7 @@ ### test: -- [[#PR_NO](https://github.com/apache/incubator-seata/pull/PR_NO)] 测试XXX +- [[#7092](https://github.com/apache/incubator-seata/pull/7092)] 修复NacosMockTest测试方法并行导致测试结果被干扰失败的问题 ### refactor: @@ -36,5 +36,6 @@ - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) - [PeppaO](https://github.com/PeppaO) +- [funky-eyes](https://github.com/funky-eyes) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 \ No newline at end of file diff --git a/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java b/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java index f246ba33ea8..cafdcfc6f57 100644 --- a/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java +++ b/config/seata-config-nacos/src/test/java/org/apache/seata/config/nacos/NacosMockTest.java @@ -36,11 +36,14 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledOnJre; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class NacosMockTest { private static ConfigService configService; private static final String NACOS_ENDPOINT = "127.0.0.1:8848"; @@ -68,6 +71,7 @@ public static void setup() throws NacosException { @Test @EnabledOnOs(OS.LINUX) + @Order(1) public void getInstance() { Assertions.assertNotNull(configService); Assertions.assertNotNull(NacosConfiguration.getInstance()); @@ -76,6 +80,7 @@ public void getInstance() { @Test @EnabledOnOs(OS.LINUX) + @Order(2) public void getConfig() { Configuration configuration = ConfigurationFactory.getInstance(); String configStrValue = configuration.getConfig(SUB_NACOS_DATAID); @@ -141,6 +146,7 @@ public void getConfig() { @Test @EnabledOnOs(OS.LINUX) + @Order(3) public void putConfigIfAbsent() { Configuration configuration = ConfigurationFactory.getInstance(); Assertions.assertThrows(UndeclaredThrowableException.class, () -> { @@ -150,6 +156,7 @@ public void putConfigIfAbsent() { @Test @EnabledOnOs(OS.LINUX) + @Order(4) public void removeConfig() { Configuration configuration = ConfigurationFactory.getInstance(); boolean removed = configuration.removeConfig(NACOS_DATAID); @@ -158,6 +165,7 @@ public void removeConfig() { @Test @EnabledOnOs(OS.LINUX) + @Order(5) public void putConfig() { Configuration configuration = ConfigurationFactory.getInstance(); boolean added = configuration.putConfig(SUB_NACOS_DATAID, "TEST"); @@ -168,6 +176,7 @@ public void putConfig() { @Test @EnabledOnOs(OS.LINUX) + @Order(6) public void testConfigListener() throws NacosException, InterruptedException { Configuration configuration = ConfigurationFactory.getInstance(); configuration.putConfig(NACOS_DATAID, "KEY=TEST"); From 8de9730171aebdc75f0ffd8e97f7aabc9169efa7 Mon Sep 17 00:00:00 2001 From: GoodBoyCoder Date: Fri, 3 Jan 2025 10:23:09 +0800 Subject: [PATCH 3/6] feature: add fury undolog parser support (#7037) --- changes/en-us/2.x.md | 4 +- changes/zh-cn/2.x.md | 2 + dependencies/pom.xml | 9 +++ rm-datasource/pom.xml | 6 ++ .../undo/parser/FuryUndoLogParser.java | 67 +++++++++++++++++++ ...che.seata.rm.datasource.undo.UndoLogParser | 3 +- .../undo/parser/FuryUndoLogParserTest.java | 36 ++++++++++ 7 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParser.java create mode 100644 rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParserTest.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 1e957dd4a1a..3eb72b26554 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -4,9 +4,9 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: +- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] support fury undolog parser - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft cluster mode supports address translation - ### bugfix: - [[#PR_NO](https://github.com/apache/incubator-seata/pull/#PR_NO)] fix XXX @@ -34,9 +34,11 @@ Thanks to these contributors for their code commits. Please report an unintended - [slievrly](https://github.com/slievrly) +- [GoodBoyCoder](https://github.com/GoodBoyCoder) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) - [PeppaO](https://github.com/PeppaO) - [funky-eyes](https://github.com/funky-eyes) + Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. \ No newline at end of file diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 343f4a59fe5..139dd586de7 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -4,6 +4,7 @@ ### feature: +- [[#7037](https://github.com/apache/incubator-seata/pull/7037)] 支持UndoLog的fury序列化方式 - [[#7069](https://github.com/apache/incubator-seata/pull/7069)] Raft集群模式支持地址转换 ### bugfix: @@ -33,6 +34,7 @@ - [slievrly](https://github.com/slievrly) +- [GoodBoyCoder](https://github.com/GoodBoyCoder) - [lyl2008dsg](https://github.com/lyl2008dsg) - [remind](https://github.com/remind) - [PeppaO](https://github.com/PeppaO) diff --git a/dependencies/pom.xml b/dependencies/pom.xml index a4275c8c146..2b567d6927f 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -136,6 +136,9 @@ 3.1.10 4.12.0 2.4.0 + + + 0.8.0 @@ -879,6 +882,12 @@ ${rocketmq-version} + + + org.apache.fury + fury-core + ${fury.version} + diff --git a/rm-datasource/pom.xml b/rm-datasource/pom.xml index af2c0d09f59..8dc7afd62d8 100644 --- a/rm-datasource/pom.xml +++ b/rm-datasource/pom.xml @@ -143,5 +143,11 @@ DmJdbcDriver18 test + + org.apache.fury + fury-core + provided + true + diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParser.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParser.java new file mode 100644 index 00000000000..0d64a2b6e04 --- /dev/null +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParser.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource.undo.parser; + +import org.apache.fury.Fury; +import org.apache.fury.ThreadLocalFury; +import org.apache.fury.ThreadSafeFury; +import org.apache.fury.config.CompatibleMode; +import org.apache.fury.config.Language; +import org.apache.seata.common.executor.Initialize; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.rm.datasource.undo.BranchUndoLog; +import org.apache.seata.rm.datasource.undo.UndoLogParser; + +@LoadLevel(name = FuryUndoLogParser.NAME) +public class FuryUndoLogParser implements UndoLogParser, Initialize { + public static final String NAME = "fury"; + + private static final ThreadSafeFury FURY = new ThreadLocalFury(classLoader -> Fury.builder() + .withLanguage(Language.JAVA) + // In JAVA mode, classes cannot be registered by tag, and the different registration order between the server and the client will cause deserialization failure + // In XLANG cross-language mode has problems with Java class serialization, such as enum classes [https://github.com/apache/fury/issues/1644]. + .requireClassRegistration(false) + //enable reference tracking for shared/circular reference. + .withRefTracking(true) + .withClassLoader(classLoader) + .withCompatibleMode(CompatibleMode.COMPATIBLE) + .build()); + @Override + public void init() { + } + + @Override + public String getName() { + return NAME; + } + + @Override + public byte[] getDefaultContent() { + return encode(new BranchUndoLog()); + } + + @Override + public byte[] encode(BranchUndoLog branchUndoLog) { + return FURY.serializeJavaObject(branchUndoLog); + } + + @Override + public BranchUndoLog decode(byte[] bytes) { + return FURY.deserializeJavaObject(bytes, BranchUndoLog.class); + } + +} diff --git a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogParser b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogParser index 9a32cda00b5..41596cc4c6a 100644 --- a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogParser +++ b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogParser @@ -18,4 +18,5 @@ org.apache.seata.rm.datasource.undo.parser.FastjsonUndoLogParser org.apache.seata.rm.datasource.undo.parser.JacksonUndoLogParser org.apache.seata.rm.datasource.undo.parser.ProtostuffUndoLogParser org.apache.seata.rm.datasource.undo.parser.KryoUndoLogParser -org.apache.seata.rm.datasource.undo.parser.Fastjson2UndoLogParser \ No newline at end of file +org.apache.seata.rm.datasource.undo.parser.Fastjson2UndoLogParser +org.apache.seata.rm.datasource.undo.parser.FuryUndoLogParser \ No newline at end of file diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParserTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParserTest.java new file mode 100644 index 00000000000..926339f42ea --- /dev/null +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/FuryUndoLogParserTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource.undo.parser; + +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.rm.datasource.undo.BaseUndoLogParserTest; +import org.apache.seata.rm.datasource.undo.UndoLogParser; + + +public class FuryUndoLogParserTest extends BaseUndoLogParserTest { + + FuryUndoLogParser parser = (FuryUndoLogParser) EnhancedServiceLoader.load(UndoLogParser.class, FuryUndoLogParser.NAME); + + @Override + public UndoLogParser getParser() { + return parser; + } + + @Override + public void testTimestampEncodeAndDecode() { + } +} From b0aec943288b09c7f1b355a5a87023cbc49acc62 Mon Sep 17 00:00:00 2001 From: funkye Date: Fri, 3 Jan 2025 11:09:16 +0800 Subject: [PATCH 4/6] optimize: add a test workflow for JDK 21 (#7093) --- .github/workflows/build.yml | 4 ++-- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dd84681ba7c..24f0b4b59f5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ on: branches: [ 2.x, develop, master ] jobs: - # job 1: Test based on java 8 and 17. Do not checkstyle. + # job 1: Test based on java 8, 17 and 21. Do not checkstyle. build: name: "build" services: @@ -28,7 +28,7 @@ jobs: strategy: fail-fast: false matrix: - java: [ 8, 17 ] + java: [ 8, 17, 21 ] steps: # step 1 - name: "Checkout" diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 3eb72b26554..041cc11c5b1 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -16,6 +16,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6828](https://github.com/apache/incubator-seata/pull/6828)] spring boot compatible with file.conf and registry.conf - [[#7012](https://github.com/apache/incubator-seata/pull/7012)] When the number of primary keys exceeds 1000, use union to concatenate the SQL - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null +- [[#7093](https://github.com/apache/incubator-seata/pull/7093)] add a test workflow for JDK 21 ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 139dd586de7..387f0285f13 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -16,6 +16,7 @@ - [[#6828](https://github.com/apache/incubator-seata/pull/6828)] seata-spring-boot-starter兼容file.conf和registry.conf - [[#7012](https://github.com/apache/incubator-seata/pull/7012)] 当主键超过1000个时,使用union拼接sql,可以使用索引 - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待 +- [[#7093](https://github.com/apache/incubator-seata/pull/7093)] 增加jdk21的工作流测试 ### security: From 61f420b7297c3afe6642e413246009c72563f423 Mon Sep 17 00:00:00 2001 From: MaoMaoandSnail <37172584+MaoMaoandSnail@users.noreply.github.com> Date: Fri, 3 Jan 2025 11:38:03 +0800 Subject: [PATCH 5/6] optimize: expand english abbreviations to full words (#7088) --- .../seata/saga/engine/impl/ProcessCtrlStateMachineEngine.java | 2 +- .../pcext/interceptors/ServiceTaskHandlerInterceptor.java | 2 +- .../saga/statelang/parser/utils/DesignerJsonTransformer.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/impl/ProcessCtrlStateMachineEngine.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/impl/ProcessCtrlStateMachineEngine.java index 645fc8e656d..bc4405abe96 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/impl/ProcessCtrlStateMachineEngine.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/impl/ProcessCtrlStateMachineEngine.java @@ -238,7 +238,7 @@ protected StateMachineInstance forwardInternal(String stateMachineInstId, Map actList = stateMachineInstance.getStateList(); if (CollectionUtils.isEmpty(actList)) { throw new ForwardInvalidException("StateMachineInstance[id:" + stateMachineInstId - + "] has no stateInstance, pls start a new StateMachine execution instead", + + "] has no stateInstance, please start a new StateMachine execution instead", FrameworkErrorCode.OperationDenied); } diff --git a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java index 1cee6d2ccf4..5091fc584d6 100644 --- a/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java +++ b/saga/seata-saga-engine/src/main/java/org/apache/seata/saga/engine/pcext/interceptors/ServiceTaskHandlerInterceptor.java @@ -369,7 +369,7 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn } EngineExecutionException exception = new EngineExecutionException("State [" + state.getName() - + "] execute finished, but cannot matching status, pls check its status manually", + + "] execute finished, but cannot matching status, please check its status manually", FrameworkErrorCode.NoMatchedStatus); if (LOGGER.isDebugEnabled()) { LOGGER.debug("State[{}] execute finish with status[{}]", state.getName(), diff --git a/saga/seata-saga-statelang/src/main/java/org/apache/seata/saga/statelang/parser/utils/DesignerJsonTransformer.java b/saga/seata-saga-statelang/src/main/java/org/apache/seata/saga/statelang/parser/utils/DesignerJsonTransformer.java index 7a69c8b5c25..930c24241fd 100644 --- a/saga/seata-saga-statelang/src/main/java/org/apache/seata/saga/statelang/parser/utils/DesignerJsonTransformer.java +++ b/saga/seata-saga-statelang/src/main/java/org/apache/seata/saga/statelang/parser/utils/DesignerJsonTransformer.java @@ -82,7 +82,7 @@ private static void transformNode(Map machineJsonObject, Map Date: Fri, 3 Jan 2025 15:26:46 +0800 Subject: [PATCH 6/6] optimize: support instance registration to the registry center (#7089) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../metadata/{namingserver => }/Instance.java | 30 +------ .../metadata/namingserver/InstanceTest.java | 1 + .../core/rpc/netty/NettyServerBootstrap.java | 13 ++- .../discovery/registry/RegistryService.java | 27 ++++++ .../NamingserverRegistryServiceImpl.java | 23 +++-- .../apache/seata/mockserver/MockServer.java | 3 + .../java/org/apache/seata/server/Server.java | 10 +-- .../controller/VGroupMappingController.java | 2 +- ...stance.java => ServerInstanceFactory.java} | 85 ++++++++++--------- .../DataBaseVGroupMappingStoreManager.java | 2 +- .../db/store/VGroupMappingDataBaseDAO.java | 2 +- .../store/RedisVGroupMappingStoreManager.java | 2 +- .../store/VGroupMappingStoreManager.java | 2 +- .../RedisVGroupMappingStoreManagerTest.java | 2 +- 16 files changed, 114 insertions(+), 92 deletions(-) rename common/src/main/java/org/apache/seata/common/metadata/{namingserver => }/Instance.java (79%) rename server/src/main/java/org/apache/seata/server/instance/{ServerInstance.java => ServerInstanceFactory.java} (58%) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 041cc11c5b1..b1afbb58d16 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -16,6 +16,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6828](https://github.com/apache/incubator-seata/pull/6828)] spring boot compatible with file.conf and registry.conf - [[#7012](https://github.com/apache/incubator-seata/pull/7012)] When the number of primary keys exceeds 1000, use union to concatenate the SQL - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] fast fail when channel is null +- [[#7089](https://github.com/apache/incubator-seata/pull/7089)] support instance registration to the registry center - [[#7093](https://github.com/apache/incubator-seata/pull/7093)] add a test workflow for JDK 21 ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 387f0285f13..0cf918bf87a 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -16,6 +16,7 @@ - [[#6828](https://github.com/apache/incubator-seata/pull/6828)] seata-spring-boot-starter兼容file.conf和registry.conf - [[#7012](https://github.com/apache/incubator-seata/pull/7012)] 当主键超过1000个时,使用union拼接sql,可以使用索引 - [[#7075](https://github.com/apache/incubator-seata/pull/7075)] 当channel为空时,快速失败,以便于减少不必要的等待 +- [[#7089](https://github.com/apache/incubator-seata/pull/7089)] 新增instance注册到注册中心的接口 - [[#7093](https://github.com/apache/incubator-seata/pull/7093)] 增加jdk21的工作流测试 ### security: diff --git a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java b/common/src/main/java/org/apache/seata/common/metadata/Instance.java similarity index 79% rename from common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java rename to common/src/main/java/org/apache/seata/common/metadata/Instance.java index 3159dc4a429..e588df0e98f 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/namingserver/Instance.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Instance.java @@ -14,27 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.common.metadata.namingserver; +package org.apache.seata.common.metadata; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.seata.common.metadata.ClusterRole; -import org.apache.seata.common.metadata.Node; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import static org.apache.seata.common.util.CollectionUtils.mapToJsonString; - - public class Instance { private String namespace; private String clusterName; private String unit; - private Node.Endpoint control = new Node.Endpoint(); - private Node.Endpoint transaction = new Node.Endpoint(); + private Node.Endpoint control; + private Node.Endpoint transaction; private double weight = 1.0; private boolean healthy = true; private long term; @@ -169,25 +164,6 @@ public String toJsonString(ObjectMapper objectMapper) { } } - - public Map toMap() { - Map resultMap = new HashMap<>(); - - - resultMap.put("namespace", namespace); - resultMap.put("clusterName", clusterName); - resultMap.put("unit", unit); - resultMap.put("control", control.toString()); - resultMap.put("transaction", transaction.toString()); - resultMap.put("weight", String.valueOf(weight)); - resultMap.put("healthy", String.valueOf(healthy)); - resultMap.put("term", String.valueOf(term)); - resultMap.put("timestamp", String.valueOf(timestamp)); - resultMap.put("metadata", mapToJsonString(metadata)); - - return resultMap; - } - private static class SingletonHolder { private static final Instance SERVER_INSTANCE = new Instance(); } diff --git a/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java b/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java index fb9ddf559f6..55741a835f0 100644 --- a/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java +++ b/common/src/test/java/org/apache/seata/common/metadata/namingserver/InstanceTest.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seata.common.metadata.ClusterRole; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.metadata.Node; import org.junit.jupiter.api.Test; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index b589396e5ab..ac4632ce753 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -34,6 +34,8 @@ import io.netty.handler.timeout.IdleStateHandler; import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.XID; +import org.apache.seata.common.metadata.Instance; +import org.apache.seata.common.metadata.Node; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.rpc.RemotingBootstrap; @@ -170,9 +172,13 @@ public void initChannel(SocketChannel ch) { try { this.serverBootstrap.bind(port).sync(); LOGGER.info("Server started, service listen port: {}", getListenPort()); - InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort()); + Instance instance = Instance.getInstance(); + // Lines 177-180 are just for compatibility with test cases + if (instance.getTransaction() == null) { + Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty")); + } for (RegistryService registryService : MultiRegistryFactory.getInstances()) { - registryService.register(address); + registryService.register(Instance.getInstance()); } initialized.set(true); } catch (SocketException se) { @@ -189,9 +195,8 @@ public void shutdown() { LOGGER.info("Shutting server down, the listen port: {}", XID.getPort()); } if (initialized.get()) { - InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort()); for (RegistryService registryService : MultiRegistryFactory.getInstances()) { - registryService.unregister(address); + registryService.unregister(Instance.getInstance()); registryService.close(); } //wait a few seconds for server transport diff --git a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java index efc997b20bf..93a3cfd1b84 100644 --- a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryService.java @@ -16,6 +16,7 @@ */ package org.apache.seata.discovery.registry; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.config.ConfigurationFactory; @@ -61,16 +62,42 @@ public interface RegistryService { * @param address the address * @throws Exception the exception */ + @Deprecated void register(InetSocketAddress address) throws Exception; + /** + * Register. + * + * @param instance the address + * @throws Exception the exception + */ + default void register(Instance instance) throws Exception { + InetSocketAddress inetSocketAddress = + new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort()); + register(inetSocketAddress); + } + /** * Unregister. * * @param address the address * @throws Exception the exception */ + @Deprecated void unregister(InetSocketAddress address) throws Exception; + /** + * Unregister. + * + * @param instance the instance + * @throws Exception the exception + */ + default void unregister(Instance instance) throws Exception { + InetSocketAddress inetSocketAddress = + new InetSocketAddress(instance.getTransaction().getHost(), instance.getTransaction().getPort()); + unregister(inetSocketAddress); + } + /** * Subscribe. * diff --git a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java index fa53bd45f19..88386253fa6 100644 --- a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java +++ b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java @@ -47,8 +47,7 @@ import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; import org.apache.http.util.EntityUtils; -import org.apache.seata.common.metadata.Node; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.metadata.namingserver.MetaResponse; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.CollectionUtils; @@ -148,9 +147,11 @@ static NamingserverRegistryServiceImpl getInstance() { @Override public void register(InetSocketAddress address) throws Exception { - NetUtil.validAddress(address); - Instance instance = Instance.getInstance(); - instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty")); + register(Instance.getInstance()); + } + + @Override + public void register(Instance instance) throws Exception { instance.setTimestamp(System.currentTimeMillis()); doRegister(instance, getNamingAddrs()); } @@ -198,11 +199,15 @@ public boolean doHealthCheck(String url) { } } + + + @Override + public void unregister(InetSocketAddress inetSocketAddress) { + unregister(Instance.getInstance()); + } + @Override - public void unregister(InetSocketAddress address) { - NetUtil.validAddress(address); - Instance instance = Instance.getInstance(); - instance.setTransaction(new Node.Endpoint(address.getAddress().getHostAddress(), address.getPort(), "netty")); + public void unregister(Instance instance) { for (String urlSuffix : getNamingAddrs()) { String url = HTTP_PREFIX + urlSuffix + "/naming/v1/unregister?"; String unit = instance.getUnit(); diff --git a/mock-server/src/main/java/org/apache/seata/mockserver/MockServer.java b/mock-server/src/main/java/org/apache/seata/mockserver/MockServer.java index fb40440d02c..b04f1c7b6ed 100644 --- a/mock-server/src/main/java/org/apache/seata/mockserver/MockServer.java +++ b/mock-server/src/main/java/org/apache/seata/mockserver/MockServer.java @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit; import org.apache.seata.common.XID; +import org.apache.seata.common.metadata.Instance; +import org.apache.seata.common.metadata.Node; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.NumberUtils; @@ -78,6 +80,7 @@ public static void start(int port) { XID.setIpAddress(NetUtil.getLocalIp()); XID.setPort(port); // init snowflake for transactionId, branchId + Instance.getInstance().setTransaction(new Node.Endpoint(XID.getIpAddress(),XID.getPort(),"netty")); UUIDGenerator.init(1L); MockCoordinator coordinator = MockCoordinator.getInstance(); diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java index c699ef037b1..39648dae824 100644 --- a/server/src/main/java/org/apache/seata/server/Server.java +++ b/server/src/main/java/org/apache/seata/server/Server.java @@ -31,12 +31,11 @@ import org.apache.seata.core.rpc.netty.NettyRemotingServer; import org.apache.seata.core.rpc.netty.NettyServerConfig; import org.apache.seata.server.coordinator.DefaultCoordinator; -import org.apache.seata.server.instance.ServerInstance; +import org.apache.seata.server.instance.ServerInstanceFactory; import org.apache.seata.server.lock.LockerManagerFactory; import org.apache.seata.server.metrics.MetricsManager; import org.apache.seata.server.session.SessionHolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -52,10 +51,9 @@ */ @Component("seataServer") public class Server { - private static final Logger LOGGER = LoggerFactory.getLogger(Server.class); @Resource - ServerInstance serverInstance; + ServerInstanceFactory serverInstanceFactory; /** * The entry point of application. @@ -106,7 +104,7 @@ public void start(String[] args) { coordinator.init(); nettyRemotingServer.setHandler(coordinator); - serverInstance.serverInstanceInit(); + serverInstanceFactory.serverInstanceInit(); // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); nettyRemotingServer.init(); diff --git a/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java b/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java index 1d55855f5d8..bf9edf97876 100644 --- a/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java +++ b/server/src/main/java/org/apache/seata/server/controller/VGroupMappingController.java @@ -16,7 +16,7 @@ */ package org.apache.seata.server.controller; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.result.Result; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; diff --git a/server/src/main/java/org/apache/seata/server/instance/ServerInstance.java b/server/src/main/java/org/apache/seata/server/instance/ServerInstanceFactory.java similarity index 58% rename from server/src/main/java/org/apache/seata/server/instance/ServerInstance.java rename to server/src/main/java/org/apache/seata/server/instance/ServerInstanceFactory.java index eecf39b0717..78842461e73 100644 --- a/server/src/main/java/org/apache/seata/server/instance/ServerInstance.java +++ b/server/src/main/java/org/apache/seata/server/instance/ServerInstanceFactory.java @@ -16,11 +16,11 @@ */ package org.apache.seata.server.instance; +import org.apache.seata.common.XID; import org.apache.seata.common.holder.ObjectHolder; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.metadata.Node; -import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; import org.apache.seata.server.Server; import org.apache.seata.server.ServerRunner; @@ -48,8 +48,8 @@ import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; -@Component("serverInstance") -public class ServerInstance { +@Component("serverInstanceFactory") +public class ServerInstanceFactory { @Resource private RegistryProperties registryProperties; @@ -61,53 +61,58 @@ public class ServerInstance { private static final Logger LOGGER = LoggerFactory.getLogger(Server.class); public void serverInstanceInit() { - if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { - VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); - EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("scheduledExcuter", 1, true)); - ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); - - // load node properties - Instance instance = Instance.getInstance(); - // load namespace - String namespace = registryNamingServerProperties.getNamespace(); - instance.setNamespace(namespace); - // load cluster name - String clusterName = registryNamingServerProperties.getCluster(); - instance.setClusterName(clusterName); - - // load cluster type - String clusterType = String.valueOf(StoreConfig.getSessionMode()); - instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); - - // load unit name - instance.setUnit(String.valueOf(UUID.randomUUID())); - - instance.setTerm(System.currentTimeMillis()); - - // load node Endpoint - instance.setControl(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); - - // load metadata - for (PropertySource propertySource : environment.getPropertySources()) { - if (propertySource instanceof EnumerablePropertySource) { - EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource) propertySource; - for (String propertyName : enumerablePropertySource.getPropertyNames()) { - if (propertyName.startsWith(META_PREFIX)) { - instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName)); - } + VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + ConfigurableEnvironment environment = + (ConfigurableEnvironment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); + + // load node properties + Instance instance = Instance.getInstance(); + // load namespace + String namespace = registryNamingServerProperties.getNamespace(); + instance.setNamespace(namespace); + // load cluster name + String clusterName = registryNamingServerProperties.getCluster(); + instance.setClusterName(clusterName); + + // load cluster type + String clusterType = String.valueOf(StoreConfig.getSessionMode()); + instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); + + // load unit name + instance.setUnit(String.valueOf(UUID.randomUUID())); + + instance.setTerm(System.currentTimeMillis()); + + // load node Endpoint + instance.setControl(new Node.Endpoint(XID.getIpAddress(), + Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); + + // load metadata + for (PropertySource propertySource : environment.getPropertySources()) { + if (propertySource instanceof EnumerablePropertySource) { + EnumerablePropertySource enumerablePropertySource = (EnumerablePropertySource)propertySource; + for (String propertyName : enumerablePropertySource.getPropertyNames()) { + if (propertyName.startsWith(META_PREFIX)) { + instance.addMetadata(propertyName.substring(META_PREFIX.length()), + enumerablePropertySource.getProperty(propertyName)); } } } + } + instance.setTransaction(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "netty")); + if (StringUtils.equals(registryProperties.getType(), NAMING_SERVER)) { // load vgroup mapping relationship instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); - + EXECUTOR_SERVICE = + new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("heartbeat-namingserver", 1, true)); EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { try { vGroupMappingStoreManager.notifyMapping(); } catch (Exception e) { LOGGER.error("Naming server register Exception", e); } - }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), TimeUnit.MILLISECONDS); + }, registryNamingServerProperties.getHeartbeatPeriod(), registryNamingServerProperties.getHeartbeatPeriod(), + TimeUnit.MILLISECONDS); ServerRunner.addDisposable(EXECUTOR_SERVICE::shutdown); } } diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java index d83ecf34c37..20072d8945c 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java @@ -19,7 +19,7 @@ import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.loader.LoadLevel; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.store.MappingDO; diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java index 84fea262d57..1749ca04cb2 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java @@ -18,7 +18,7 @@ import org.apache.seata.common.exception.ErrorCode; import org.apache.seata.common.exception.SeataRuntimeException; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.util.IOUtil; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java index 1367ad281f3..4422bdc33f8 100644 --- a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java @@ -18,7 +18,7 @@ import org.apache.seata.common.exception.RedisException; import org.apache.seata.common.loader.LoadLevel; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.common.util.StringUtils; import org.apache.seata.core.store.MappingDO; import org.apache.seata.server.storage.redis.JedisPooledFactory; diff --git a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java index d4ce6a5412f..f2b2b8bd4f1 100644 --- a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java +++ b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java @@ -17,7 +17,7 @@ package org.apache.seata.server.store; import org.apache.seata.common.XID; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.core.store.MappingDO; import org.apache.seata.discovery.registry.MultiRegistryFactory; import org.apache.seata.discovery.registry.RegistryService; diff --git a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java index 755c10759c6..49418ae8075 100644 --- a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java +++ b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java @@ -16,7 +16,7 @@ */ package org.apache.seata.server.storage.redis.store; -import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.metadata.Instance; import org.apache.seata.core.store.MappingDO; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach;