Skip to content

Commit

Permalink
1.3.2nacos<--->2.2.3nacos双向同步时,暂停其中的一个task,重启jvm会抛出空指针 异常 nacos-group…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoquanidea committed Nov 3, 2023
1 parent a1d683a commit a2157be
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
*/
package com.alibaba.nacossync.event.listener;

import javax.annotation.PostConstruct;

import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.monitor.MetricsManager;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.alibaba.nacossync.cache.SkyWalkerCacheServices;
import com.alibaba.nacossync.constant.MetricsStatisticsType;
import com.alibaba.nacossync.event.DeleteTaskEvent;
import com.alibaba.nacossync.event.SyncTaskEvent;
import com.alibaba.nacossync.extension.SyncManagerService;
import com.alibaba.nacossync.monitor.MetricsManager;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
* @author NacosSync
Expand Down Expand Up @@ -62,6 +60,7 @@ public void listenerSyncTaskEvent(SyncTaskEvent syncTaskEvent) {

try {
long start = System.currentTimeMillis();
// 如果该任务同步完成,则在finishedTaskMap中添加key:operationId,value:finishedTask
if (syncManagerService.sync(syncTaskEvent.getTaskDO(), null)) {
skyWalkerCacheServices.addFinishedTask(syncTaskEvent.getTaskDO());
metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import com.alibaba.nacossync.pojo.model.ClusterDO;
import com.alibaba.nacossync.pojo.model.TaskDO;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* @author paderlol
Expand All @@ -56,6 +56,7 @@ NamingService createServer(String clusterId, Supplier<String> serverAddressSuppl
throws Exception {
String newClusterId;
if (clusterId.contains(":")) {
// clusterId:sourceClusterId:destClusterId:index
String[] split = clusterId.split(":");
newClusterId = split[1];
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public class NacosSyncToNacosServiceImpl implements SyncService, InitializingBea

@Autowired
private NacosServerHolder nacosServerHolder;


// 同步开始之前,会将同步task缓存至此,key:operationId
private ConcurrentHashMap<String, TaskDO> allSyncTaskMap = new ConcurrentHashMap<>();

@Autowired
Expand Down Expand Up @@ -126,7 +127,7 @@ public boolean delete(TaskDO taskDO) {
try {
NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskDO.getTaskId(),
taskDO.getSourceClusterId());

if ("ALL".equals(taskDO.getServiceName())) {
String operationId = taskUpdateProcessor.getTaskIdAndOperationIdMap(taskDO.getTaskId());
if (!StringUtils.isEmpty(operationId)) {
Expand All @@ -142,15 +143,18 @@ public boolean delete(TaskDO taskDO) {
skyWalkerCacheServices.removeFinishedTask(operationKey);
allSyncTaskMap.remove(operationKey);
NamingService destNamingService = popNamingService(taskDO);
sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(taskDO.getGroupName()),
listenerMap.remove(taskDO.getTaskId() + serviceName));

List<Instance> sourceInstances = sourceNamingService.getAllInstances(serviceName,
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
destNamingService.deregisterInstance(serviceName,
getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
// bugfix:解决task status为DELETE,项目重启之后sync的task不会执行状态为DELETE的任务,不会将namingService记录在本类的serviceClient中,这里获取使用会NPE
if (destNamingService != null) {
sourceNamingService.unsubscribe(serviceName, getGroupNameOrDefault(taskDO.getGroupName()),
listenerMap.remove(taskDO.getTaskId() + serviceName));

List<Instance> sourceInstances = sourceNamingService.getAllInstances(serviceName,
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), false);
for (Instance instance : sourceInstances) {
if (needSync(instance.getMetadata())) {
destNamingService.deregisterInstance(serviceName,
getGroupNameOrDefault(taskDO.getGroupName()), instance.getIp(), instance.getPort());
}
}
}
}
Expand Down Expand Up @@ -192,8 +196,11 @@ public boolean sync(TaskDO taskDO, Integer index) {
log.info("线程 {} 开始同步 {} ", Thread.currentThread().getId(), System.currentTimeMillis());
String operationId = taskDO.getOperationId();
try {
// 创建namingService并缓存至serviceMap中,key:taskId+sourceClusterId
NamingService sourceNamingService = nacosServerHolder.getSourceNamingService(taskDO.getTaskId(),
taskDO.getSourceClusterId());
// 创建dest集群 namingService,并缓存至nacosServerHolder#globalNameService属性中,key:destClusterId;(这个缓存只缓存nacos的namingService)
// 在AbstractServerHolderImpl#serviceMap中也缓存一份,key:sourceClusterId:destClusterId:index;(这个缓存缓存所有的service,例如eureka、nacos等)
NamingService destNamingService = getDestNamingService(taskDO, index);
allSyncTaskMap.put(operationId, taskDO);
//防止暂停同步任务后,重新同步/或删除任务以后新建任务不会再接收到新的事件导致不能同步,所以每次订阅事件之前,先全量同步一次任务
Expand Down Expand Up @@ -252,11 +259,13 @@ private NamingService getDestNamingService(TaskDO taskDO, Integer index) {

private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService,
NamingService destNamingService) throws NacosException {
// 如果该任务还在执行中,则放弃本次同步
if (syncTaskTap.putIfAbsent(taskId, 1) != null) {
log.info("任务Id:{}上一个同步任务尚未结束", taskId);
return;
}
//记录目标集群的Client
// 将destNamingService缓存至该类的serviceClient中,key:taskId:serviceName,value:namingService;好处是可以使用taskDO对象(id+serviceName)可以直接获取dest namingService
recordNamingService(taskDO, destNamingService);
try {

Expand Down Expand Up @@ -288,6 +297,7 @@ private void handlerPersistenceInstance(TaskDO taskDO, NamingService destNamingS
needRegisterInstance.add(instance);
}
}
// 获取目标
List<Instance> destAllInstances = destNamingService.getAllInstances(taskDO.getServiceName(),
getGroupNameOrDefault(taskDO.getGroupName()), new ArrayList<>(), true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void run() {

taskDOS.forEach(taskDO -> {

// 通过task operationId来判断该任务是否完成,有值即未完成
if ((null != skyWalkerCacheServices.getFinishedTask(taskDO))) {

return;
Expand Down

0 comments on commit a2157be

Please sign in to comment.