Skip to content

Commit

Permalink
fix:优化dubbo接口级注册发现的内存使用 (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Jan 19, 2024
1 parent 16f8499 commit 1789f14
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceChangeEvent;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.common.registry.BaseBootConfigHandler;
import com.tencent.polaris.common.registry.BootConfigHandler;
Expand Down Expand Up @@ -59,8 +60,9 @@ public class PolarisRegistry extends FailbackRegistry {

private final AtomicBoolean destroyed = new AtomicBoolean(false);

private final Map<NotifyListener, ServiceListener> serviceListeners = new ConcurrentHashMap<>();
private final Map<URL, Set<NotifyListener>> dubboListeners = new ConcurrentHashMap<>();

private final Map<URL, ServiceListener> serviceListeners = new ConcurrentHashMap<>();
private final PolarisOperator polarisOperator;

public PolarisRegistry(URL url) {
Expand Down Expand Up @@ -128,15 +130,12 @@ public void doSubscribe(URL url, NotifyListener listener) {
Instance[] instances = polarisOperator.getAvailableInstances(service, true);
onInstances(url, listener, instances);
LOGGER.info("[POLARIS] submit watch task for service {}", service);
serviceListeners.computeIfAbsent(listener, notifyListener -> {
ServiceListener serviceListener = event -> {
try {
Instance[] curInstances = polarisOperator.getAvailableInstances(service, true);
onInstances(url, listener, curInstances);
} catch (PolarisException e) {
LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString());
}
};

dubboListeners.computeIfAbsent(url, s -> new ConcurrentHashSet<>());
dubboListeners.get(url).add(listener);

serviceListeners.computeIfAbsent(url, dubboUrl -> {
ServiceListener serviceListener = new DubboServiceListener(url, this);
polarisOperator.watchService(service, serviceListener);
return serviceListener;
});
Expand Down Expand Up @@ -208,4 +207,32 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
public boolean isAvailable() {
return true;
}

private static class DubboServiceListener implements ServiceListener {

private final URL url;

private final String service;

private final PolarisRegistry registry;

private DubboServiceListener(URL url, PolarisRegistry registry) {
this.url = url;
this.service = url.getServiceInterface();
this.registry = registry;
}

@Override
public void onEvent(ServiceChangeEvent serviceChangeEvent) {
try {
Set<NotifyListener> listeners = registry.dubboListeners.getOrDefault(url, Collections.emptySet());
Instance[] curInstances = registry.polarisOperator.getAvailableInstances(service, true);
for (NotifyListener listener : listeners) {
registry.onInstances(url, listener, curInstances);
}
} catch (PolarisException e) {
LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceChangeEvent;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.common.registry.Consts;
import com.tencent.polaris.common.registry.ConvertUtils;
Expand Down Expand Up @@ -60,7 +61,9 @@ public class PolarisRegistry extends FailbackRegistry {

private final AtomicBoolean destroyed = new AtomicBoolean(false);

private final Map<NotifyListener, ServiceListener> serviceListeners = new ConcurrentHashMap<>();
private final Map<URL, Set<NotifyListener>> dubboListeners = new ConcurrentHashMap<>();

private final Map<URL, ServiceListener> serviceListeners = new ConcurrentHashMap<>();

private final PolarisOperator polarisOperator;

Expand Down Expand Up @@ -125,15 +128,11 @@ public void doSubscribe(URL url, NotifyListener listener) {
onInstances(url, listener, instances);
LOGGER.info("[POLARIS] submit watch task for service {}", service);

serviceListeners.computeIfAbsent(listener, notifyListener -> {
ServiceListener serviceListener = event -> {
try {
Instance[] curInstances = polarisOperator.getAvailableInstances(service, true);
onInstances(url, listener, curInstances);
} catch (PolarisException e) {
LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString());
}
};
dubboListeners.computeIfAbsent(url, s -> new ConcurrentHashSet<>());
dubboListeners.get(url).add(listener);

serviceListeners.computeIfAbsent(url, dubboUrl -> {
ServiceListener serviceListener = new DubboServiceListener(url, this);
polarisOperator.watchService(service, serviceListener);
return serviceListener;
});
Expand Down Expand Up @@ -202,4 +201,32 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
public boolean isAvailable() {
return true;
}

private static class DubboServiceListener implements ServiceListener {

private final URL url;

private final String service;

private final PolarisRegistry registry;

private DubboServiceListener(URL url, PolarisRegistry registry) {
this.url = url;
this.service = url.getServiceInterface();
this.registry = registry;
}

@Override
public void onEvent(ServiceChangeEvent serviceChangeEvent) {
try {
Set<NotifyListener> listeners = registry.dubboListeners.getOrDefault(url, Collections.emptySet());
Instance[] curInstances = registry.polarisOperator.getAvailableInstances(service, true);
for (NotifyListener listener : listeners) {
registry.onInstances(url, listener, curInstances);
}
} catch (PolarisException e) {
LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString());
}
}
}
}
1 change: 0 additions & 1 deletion dubbox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>dubbox</artifactId>

<packaging>pom</packaging>

<properties>
Expand Down

0 comments on commit 1789f14

Please sign in to comment.