diff --git a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java index 1a9d8cd..1839f3a 100644 --- a/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java +++ b/dubbo/dubbo-plugins/dubbo-registry-polaris/src/main/java/com/tencent/polaris/dubbo/registry/PolarisRegistry.java @@ -19,6 +19,7 @@ import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.listener.ServiceListener; import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.common.registry.BaseBootConfigHandler; import com.tencent.polaris.common.registry.BootConfigHandler; @@ -59,8 +60,9 @@ public class PolarisRegistry extends FailbackRegistry { private final AtomicBoolean destroyed = new AtomicBoolean(false); - private final Map serviceListeners = new ConcurrentHashMap<>(); + private final Map> dubboListeners = new ConcurrentHashMap<>(); + private final Map serviceListeners = new ConcurrentHashMap<>(); private final PolarisOperator polarisOperator; public PolarisRegistry(URL url) { @@ -128,15 +130,12 @@ public void doSubscribe(URL url, NotifyListener listener) { Instance[] instances = polarisOperator.getAvailableInstances(service, true); onInstances(url, listener, instances); LOGGER.info("[POLARIS] submit watch task for service {}", service); - serviceListeners.computeIfAbsent(listener, notifyListener -> { - ServiceListener serviceListener = event -> { - try { - Instance[] curInstances = polarisOperator.getAvailableInstances(service, true); - onInstances(url, listener, curInstances); - } catch (PolarisException e) { - LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); - } - }; + + dubboListeners.computeIfAbsent(url, s -> new ConcurrentHashSet<>()); + dubboListeners.get(url).add(listener); + + serviceListeners.computeIfAbsent(url, dubboUrl -> { + ServiceListener serviceListener = new DubboServiceListener(url, this); polarisOperator.watchService(service, serviceListener); return serviceListener; }); @@ -208,4 +207,32 @@ public void doUnsubscribe(URL url, NotifyListener listener) { public boolean isAvailable() { return true; } + + private static class DubboServiceListener implements ServiceListener { + + private final URL url; + + private final String service; + + private final PolarisRegistry registry; + + private DubboServiceListener(URL url, PolarisRegistry registry) { + this.url = url; + this.service = url.getServiceInterface(); + this.registry = registry; + } + + @Override + public void onEvent(ServiceChangeEvent serviceChangeEvent) { + try { + Set listeners = registry.dubboListeners.getOrDefault(url, Collections.emptySet()); + Instance[] curInstances = registry.polarisOperator.getAvailableInstances(service, true); + for (NotifyListener listener : listeners) { + registry.onInstances(url, listener, curInstances); + } + } catch (PolarisException e) { + LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); + } + } + } } diff --git a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java index 963b959..5995ce2 100644 --- a/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java +++ b/dubbox/dubbox-plugins/dubbox-registry-polaris/src/main/java/com/tencent/polaris/dubbox/registry/PolarisRegistry.java @@ -28,6 +28,7 @@ import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.listener.ServiceListener; import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceChangeEvent; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.common.registry.Consts; import com.tencent.polaris.common.registry.ConvertUtils; @@ -60,7 +61,9 @@ public class PolarisRegistry extends FailbackRegistry { private final AtomicBoolean destroyed = new AtomicBoolean(false); - private final Map serviceListeners = new ConcurrentHashMap<>(); + private final Map> dubboListeners = new ConcurrentHashMap<>(); + + private final Map serviceListeners = new ConcurrentHashMap<>(); private final PolarisOperator polarisOperator; @@ -125,15 +128,11 @@ public void doSubscribe(URL url, NotifyListener listener) { onInstances(url, listener, instances); LOGGER.info("[POLARIS] submit watch task for service {}", service); - serviceListeners.computeIfAbsent(listener, notifyListener -> { - ServiceListener serviceListener = event -> { - try { - Instance[] curInstances = polarisOperator.getAvailableInstances(service, true); - onInstances(url, listener, curInstances); - } catch (PolarisException e) { - LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); - } - }; + dubboListeners.computeIfAbsent(url, s -> new ConcurrentHashSet<>()); + dubboListeners.get(url).add(listener); + + serviceListeners.computeIfAbsent(url, dubboUrl -> { + ServiceListener serviceListener = new DubboServiceListener(url, this); polarisOperator.watchService(service, serviceListener); return serviceListener; }); @@ -202,4 +201,32 @@ public void doUnsubscribe(URL url, NotifyListener listener) { public boolean isAvailable() { return true; } + + private static class DubboServiceListener implements ServiceListener { + + private final URL url; + + private final String service; + + private final PolarisRegistry registry; + + private DubboServiceListener(URL url, PolarisRegistry registry) { + this.url = url; + this.service = url.getServiceInterface(); + this.registry = registry; + } + + @Override + public void onEvent(ServiceChangeEvent serviceChangeEvent) { + try { + Set listeners = registry.dubboListeners.getOrDefault(url, Collections.emptySet()); + Instance[] curInstances = registry.polarisOperator.getAvailableInstances(service, true); + for (NotifyListener listener : listeners) { + registry.onInstances(url, listener, curInstances); + } + } catch (PolarisException e) { + LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString()); + } + } + } } diff --git a/dubbox/pom.xml b/dubbox/pom.xml index e40b80b..d184e22 100644 --- a/dubbox/pom.xml +++ b/dubbox/pom.xml @@ -11,7 +11,6 @@ 4.0.0 dubbox - pom