diff --git a/common/src/main/java/org/apache/seata/common/metadata/Node.java b/common/src/main/java/org/apache/seata/common/metadata/Node.java index e200e79824c..f8168c07e1e 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Node.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Node.java @@ -24,11 +24,16 @@ public class Node { Map metadata = new HashMap<>(); private Endpoint control; + private Endpoint transaction; + private Endpoint internal; + private String group; private ClusterRole role = ClusterRole.MEMBER; + private String version; + public Node() {} public Endpoint createEndpoint(String host, int port, String protocol) { @@ -75,6 +80,22 @@ public void setTransaction(Endpoint transaction) { this.transaction = transaction; } + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public Endpoint getInternal() { + return internal; + } + + public void setInternal(Endpoint internal) { + this.internal = internal; + } + public static class Endpoint { private String host; diff --git a/dependencies/pom.xml b/dependencies/pom.xml index b317f642ecb..0f67224ce32 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -41,7 +41,7 @@ 1.8.3 1.12.17 2.6.10 - 5.5.3 + 5.6.5 1.2.83 1.5.9 1.2.1 @@ -73,8 +73,11 @@ 1.10.12 1.7.1 1.3.14 + 4.1.86.Final + 2.0 4.1.94.Final 4.0.3 + 1.6.7 3.16.3 1.27.1 @@ -188,6 +191,11 @@ hessian ${sofa.hessian.version} + + com.alipay.sofa + bolt + ${sofa.bolt.version} + com.alibaba fastjson diff --git a/server/pom.xml b/server/pom.xml index 6dbd763ba9d..fd7f6de18d4 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -268,6 +268,16 @@ com.alipay.sofa jraft-core + + + com.alipay.sofa + bolt + + + + + com.alipay.sofa + bolt org.codehaus.janino diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServer.java b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServer.java index 9baa9a19991..37d18976144 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServer.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServer.java @@ -98,6 +98,10 @@ public RaftStateMachine getRaftStateMachine() { return raftStateMachine; } + public PeerId getServerId() { + return serverId; + } + @Override public void close() { destroy(); diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java index 58aad7ec79e..b891bea8484 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftServerManager.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.CliService; import com.alipay.sofa.jraft.RaftServiceFactory; import com.alipay.sofa.jraft.conf.Configuration; @@ -39,9 +40,12 @@ import org.apache.seata.common.XID; import org.apache.seata.common.util.StringUtils; import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.serializer.SerializerType; import org.apache.seata.discovery.registry.FileRegistryServiceImpl; import org.apache.seata.discovery.registry.MultiRegistryFactory; import org.apache.seata.discovery.registry.RegistryService; +import org.apache.seata.server.cluster.raft.processor.PutNodeInfoRequestProcessor; +import org.apache.seata.server.cluster.raft.serializer.JacksonBoltSerializer; import org.apache.seata.server.store.StoreConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,8 +152,12 @@ public static void start() { } LOGGER.info("started seata server raft cluster, group: {} ", group); }); - if (rpcServer != null && !rpcServer.init(null)) { - throw new RuntimeException("start raft node fail!"); + if (rpcServer != null) { + rpcServer.registerProcessor(new PutNodeInfoRequestProcessor()); + SerializerManager.addSerializer(SerializerType.JACKSON.getCode(), new JacksonBoltSerializer()); + if (!rpcServer.init(null)) { + throw new RuntimeException("start raft node fail!"); + } } } @@ -222,9 +230,11 @@ public static Set groups() { private static class SingletonHandler { private static final CliService CLI_SERVICE = RaftServiceFactory.createAndInitCliService(new CliOptions()); private static final CliClientService CLI_CLIENT_SERVICE = new CliClientServiceImpl(); + static { CLI_CLIENT_SERVICE.init(new CliOptions()); } + } } diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java index 4bd0bf77a41..23738aadf73 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/RaftStateMachine.java @@ -18,13 +18,19 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import com.alipay.sofa.jraft.rpc.InvokeContext; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.RouteTable; @@ -32,14 +38,19 @@ import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.core.StateMachineAdapter; import com.alipay.sofa.jraft.entity.LeaderChangeContext; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import org.apache.seata.common.XID; import org.apache.seata.common.holder.ObjectHolder; import org.apache.seata.common.metadata.ClusterRole; import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.serializer.SerializerType; import org.apache.seata.server.cluster.raft.context.SeataClusterContext; +import org.apache.seata.server.cluster.raft.processor.request.PutNodeMetadataRequest; import org.apache.seata.server.cluster.raft.snapshot.metadata.LeaderMetadataSnapshotFile; import org.apache.seata.server.cluster.raft.snapshot.session.SessionSnapshotFile; import org.apache.seata.server.cluster.raft.snapshot.StoreSnapshotFile; @@ -68,7 +79,6 @@ import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT; -import static org.apache.seata.common.DefaultValues.SERVICE_OFFSET_SPRING_BOOT; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_BRANCH_SESSION; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_GLOBAL_SESSION; import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.REFRESH_CLUSTER_METADATA; @@ -93,7 +103,9 @@ public class RaftStateMachine extends StateMachineAdapter { private static final Map> EXECUTES = new HashMap<>(); - private volatile RaftClusterMetadata raftClusterMetadata; + private volatile RaftClusterMetadata raftClusterMetadata = new RaftClusterMetadata(); + + private Lock lock = new ReentrantLock(); /** * Leader term @@ -105,6 +117,8 @@ public class RaftStateMachine extends StateMachineAdapter { */ private final AtomicLong currentTerm = new AtomicLong(-1); + private final AtomicBoolean init = new AtomicBoolean(false); + public boolean isLeader() { return this.leaderTerm.get() > 0; } @@ -198,7 +212,6 @@ public void onLeaderStart(final long term) { this.leaderTerm.set(term); LOGGER.info("groupId: {}, onLeaderStart: term={}.", group, term); this.currentTerm.set(term); - SeataClusterContext.bindGroup(group); syncMetadata(); if (!leader && RaftServerManager.isRaftMode()) { CompletableFuture.runAsync(() -> { @@ -231,21 +244,48 @@ public void onStopFollowing(final LeaderChangeContext ctx) { public void onStartFollowing(final LeaderChangeContext ctx) { LOGGER.info("groupId: {}, onStartFollowing: {}.", group, ctx); this.currentTerm.set(ctx.getTerm()); + CompletableFuture.runAsync(() -> syncCurrentNodeInfo(ctx.getLeaderId())); } @Override public void onConfigurationCommitted(Configuration conf) { LOGGER.info("groupId: {}, onConfigurationCommitted: {}.", group, conf); - syncMetadata(); RouteTable.getInstance().updateConfiguration(group, conf); + if (isLeader()) { + lock.lock(); + try { + List newFollowers = conf.getPeers(); + Set newLearners = conf.getLearners(); + List currentFollowers = raftClusterMetadata.getFollowers(); + if (CollectionUtils.isNotEmpty(newFollowers)) { + raftClusterMetadata.setFollowers(currentFollowers.stream() + .filter(node -> contains(node, newFollowers)).collect(Collectors.toList())); + } + if (CollectionUtils.isNotEmpty(newLearners)) { + raftClusterMetadata.setLearner(raftClusterMetadata.getLearner().stream() + .filter(node -> contains(node, newLearners)).collect(Collectors.toList())); + } + syncMetadata(); + } finally { + lock.unlock(); + } + } + } + + private boolean contains(Node node, Collection list) { + if (node.getInternal() == null) { + return true; + } + PeerId nodePeer = new PeerId(node.getInternal().getHost(), node.getInternal().getPort()); + return list.contains(nodePeer); } - - private void syncMetadata() { + + public void syncMetadata() { if (isLeader()) { SeataClusterContext.bindGroup(group); try { RaftClusterMetadataMsg raftClusterMetadataMsg = - new RaftClusterMetadataMsg(createNewRaftClusterMetadata()); + new RaftClusterMetadataMsg(changeOrInitRaftClusterMetadata()); RaftTaskUtil.createTask(status -> refreshClusterMetadata(raftClusterMetadataMsg), raftClusterMetadataMsg, null); } catch (Exception e) { @@ -287,39 +327,98 @@ public void setRaftLeaderMetadata(RaftClusterMetadata raftClusterMetadata) { this.raftClusterMetadata = raftClusterMetadata; } - public RaftClusterMetadata createNewRaftClusterMetadata() { - RaftClusterMetadata metadata = new RaftClusterMetadata(this.currentTerm.get()); - Node leader = metadata.createNode(XID.getIpAddress(), XID.getPort(), - Integer.parseInt(((Environment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT)) - .getProperty("server.port", String.valueOf(8088))), - group, Collections.emptyMap()); - leader.setRole(ClusterRole.LEADER); - metadata.setLeader(leader); - Configuration configuration = RouteTable.getInstance().getConfiguration(this.group); - List learners = configuration.getLearners().stream().map(learner -> { - int nettyPort = learner.getPort() - SERVICE_OFFSET_SPRING_BOOT; - Node learnerNode = metadata.createNode(learner.getIp(), nettyPort, nettyPort - SERVICE_OFFSET_SPRING_BOOT, - this.group, Collections.emptyMap()); - learnerNode.setRole(ClusterRole.LEARNER); - return learnerNode; - }).collect(Collectors.toList()); - metadata.setLearner(learners); - List followers = configuration.getPeers().stream().map(follower -> { - int nettyPort = follower.getPort() - SERVICE_OFFSET_SPRING_BOOT; - Node followerNode = metadata.createNode(follower.getIp(), nettyPort, nettyPort - SERVICE_OFFSET_SPRING_BOOT, - this.group, Collections.emptyMap()); - followerNode.setRole(ClusterRole.FOLLOWER); - return followerNode; - }).collect(Collectors.toList()); - metadata.setFollowers(followers); - return metadata; + public RaftClusterMetadata changeOrInitRaftClusterMetadata() { + raftClusterMetadata.setTerm(this.currentTerm.get()); + Node leaderNode = raftClusterMetadata.getLeader(); + RaftServer raftServer = RaftServerManager.getRaftServer(group); + PeerId cureentPeerId = raftServer.getServerId(); + // After the re-election, the leader information may be different from the latest leader, and you need to replace the leader information + if (leaderNode == null || (leaderNode.getInternal() != null + && !cureentPeerId.equals(new PeerId(leaderNode.getInternal().getHost(), leaderNode.getInternal().getPort())))) { + Node leader = + raftClusterMetadata.createNode(XID.getIpAddress(), XID.getPort(), raftServer.getServerId().getPort(), + Integer.parseInt( + ((Environment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT)) + .getProperty("server.port", String.valueOf(7091))), + group, Collections.emptyMap()); + leader.setRole(ClusterRole.LEADER); + raftClusterMetadata.setLeader(leader); + } + return raftClusterMetadata; } public void refreshClusterMetadata(RaftBaseMsg syncMsg) { + // Directly receive messages from the leader and update the cluster metadata raftClusterMetadata = ((RaftClusterMetadataMsg)syncMsg).getRaftClusterMetadata(); ((ApplicationEventPublisher)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)) .publishEvent(new ClusterChangeEvent(this, group, raftClusterMetadata.getTerm(), this.isLeader())); LOGGER.info("groupId: {}, refresh cluster metadata: {}", group, raftClusterMetadata); } + private void syncCurrentNodeInfo(PeerId leaderPeerId) { + if (init.compareAndSet(false, true)) { + try { + RaftServer raftServer = RaftServerManager.getRaftServer(group); + PeerId cureentPeerId = raftServer.getServerId(); + Node node = raftClusterMetadata.createNode(XID.getIpAddress(), XID.getPort(), cureentPeerId.getPort(), + Integer.parseInt( + ((Environment)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT)) + .getProperty("server.port", String.valueOf(7091))), + group, Collections.emptyMap()); + InvokeContext invokeContext = new InvokeContext(); + PutNodeMetadataRequest putNodeInfoRequest = new PutNodeMetadataRequest(node); + Configuration configuration = RouteTable.getInstance().getConfiguration(group); + node.setRole( + configuration.getPeers().contains(cureentPeerId) ? ClusterRole.FOLLOWER : ClusterRole.LEARNER); + invokeContext.put(com.alipay.remoting.InvokeContext.BOLT_CUSTOM_SERIALIZER, + SerializerType.JACKSON.getCode()); + CliClientServiceImpl cliClientService = + (CliClientServiceImpl)RaftServerManager.getCliClientServiceInstance(); + // The previous leader may be an old snapshot or log playback, which is not accurate, and you + // need to get the leader again + cliClientService.getRpcClient().invokeAsync(leaderPeerId.getEndpoint(), putNodeInfoRequest, + invokeContext, (result, err) -> { + if (err == null) { + LOGGER.info("sync node info to leader: {}, result: {}", leaderPeerId, result); + } else { + LOGGER.error("sync node info to leader: {}, error: {}", leaderPeerId, err.getMessage(), + err); + } + }, 30000); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + } + + public void changeNodeMetadata(Node node) { + lock.lock(); + try { + List list = node.getRole() == ClusterRole.FOLLOWER ? raftClusterMetadata.getFollowers() + : raftClusterMetadata.getLearner(); + // If the node currently exists, modify it + for (Node follower : list) { + Node.Endpoint endpoint = follower.getInternal(); + if (endpoint != null) { + // change old follower node metadata + if (endpoint.getHost().equals(node.getInternal().getHost()) + && endpoint.getPort() == node.getInternal().getPort()) { + follower.setTransaction(node.getTransaction()); + follower.setControl(node.getControl()); + follower.setGroup(group); + follower.setMetadata(node.getMetadata()); + follower.setVersion(node.getVersion()); + follower.setRole(node.getRole()); + return; + } + } + } + // add new node node metadata + list.add(node); + syncMetadata(); + } finally { + lock.unlock(); + } + } + } diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/processor/PutNodeInfoRequestProcessor.java b/server/src/main/java/org/apache/seata/server/cluster/raft/processor/PutNodeInfoRequestProcessor.java new file mode 100644 index 00000000000..b99666831e3 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/processor/PutNodeInfoRequestProcessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.processor; + +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.server.cluster.raft.RaftServer; +import org.apache.seata.server.cluster.raft.RaftServerManager; +import org.apache.seata.server.cluster.raft.RaftStateMachine; +import org.apache.seata.server.cluster.raft.processor.request.PutNodeMetadataRequest; +import org.apache.seata.server.cluster.raft.processor.response.PutNodeMetadataResponse; + +public class PutNodeInfoRequestProcessor implements RpcProcessor { + + public PutNodeInfoRequestProcessor() { + super(); + } + + @Override + public void handleRequest(RpcContext rpcCtx, PutNodeMetadataRequest request) { + Node node = request.getNode(); + String group = node.getGroup(); + if (RaftServerManager.isLeader(group)) { + RaftServer raftServer = RaftServerManager.getRaftServer(group); + RaftStateMachine raftStateMachine = raftServer.getRaftStateMachine(); + raftStateMachine.changeNodeMetadata(node); + rpcCtx.sendResponse(new PutNodeMetadataResponse(true)); + } + } + + @Override + public String interest() { + return PutNodeMetadataRequest.class.getName(); + } + +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/processor/request/PutNodeMetadataRequest.java b/server/src/main/java/org/apache/seata/server/cluster/raft/processor/request/PutNodeMetadataRequest.java new file mode 100644 index 00000000000..028487133ef --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/processor/request/PutNodeMetadataRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.processor.request; + +import java.io.Serializable; +import org.apache.seata.common.metadata.Node; + +public class PutNodeMetadataRequest implements Serializable { + + private Node node; + + public PutNodeMetadataRequest() {} + + public PutNodeMetadataRequest(Node node) { + this.node = node; + } + + public Node getNode() { + return node; + } + + public void setNode(Node node) { + this.node = node; + } +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/processor/response/PutNodeMetadataResponse.java b/server/src/main/java/org/apache/seata/server/cluster/raft/processor/response/PutNodeMetadataResponse.java new file mode 100644 index 00000000000..08494abf667 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/processor/response/PutNodeMetadataResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.cluster.raft.processor.response; + +import java.io.Serializable; + +public class PutNodeMetadataResponse implements Serializable { + + private boolean success; + + public PutNodeMetadataResponse(boolean success) { + this.success = success; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + @Override + public String toString() { + return "PutNodeMetadataResponse{" + "success=" + success + '}'; + } + +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/serializer/JacksonBoltSerializer.java b/server/src/main/java/org/apache/seata/server/cluster/raft/serializer/JacksonBoltSerializer.java new file mode 100644 index 00000000000..740f9ff81ad --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/serializer/JacksonBoltSerializer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.seata.server.cluster.raft.serializer; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.Serializer; +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +public class JacksonBoltSerializer implements Serializer { + + private final org.apache.seata.core.serializer.Serializer seataSerializer = + EnhancedServiceLoader.load(org.apache.seata.core.serializer.Serializer.class, + SerializerType.getByCode(SerializerType.JACKSON.getCode()).name()); + + @Override + public byte[] serialize(Object obj) throws CodecException { + try { + return seataSerializer.serialize(obj); + } catch (Exception e) { + throw new CodecException("Failed to serialize data", e); + } + } + + @Override + public T deserialize(byte[] data, String classOfT) throws CodecException { + try { + return seataSerializer.deserialize(data); + } catch (Exception e) { + throw new CodecException("Failed to deserialize data", e); + } + } + +} diff --git a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java index 6e2b9955adf..2e3e217a96c 100644 --- a/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java +++ b/server/src/main/java/org/apache/seata/server/cluster/raft/sync/msg/dto/RaftClusterMetadata.java @@ -17,12 +17,14 @@ package org.apache.seata.server.cluster.raft.sync.msg.dto; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.seata.common.metadata.Node; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.Version; /** */ @@ -32,9 +34,9 @@ public class RaftClusterMetadata implements Serializable { private Node leader; - private List followers; + private List followers = new ArrayList<>(); - private List learner; + private List learner = new ArrayList<>(); private long term; @@ -45,11 +47,14 @@ public RaftClusterMetadata(long term) { this.term = term; } - public Node createNode(String host, int txPort, int controlPort, String group, Map metadata) { + public Node createNode(String host, int txPort, int internalPort, int controlPort, String group, + Map metadata) { Node node = new Node(); node.setTransaction(node.createEndpoint(host, txPort, "seata")); node.setControl(node.createEndpoint(host, controlPort, "http")); node.setGroup(group); + node.setVersion(Version.getCurrent()); + node.setInternal(node.createEndpoint(host, internalPort, "raft")); Optional.ofNullable(metadata).ifPresent(node::setMetadata); return node; }