Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nacos registry support subscribe multi category #2783

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 11 additions & 44 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type callback func(services []model.Instance, err error)

type nacosListener struct {
namingClient *nacosClient.NacosNamingClient
listenURL *common.URL
serviceName string
regURL *common.URL
events *gxchan.UnboundedChan
instanceMap map[string]model.Instance
Expand All @@ -61,31 +61,17 @@ type nacosListener struct {
subscribeParam *vo.SubscribeParam
}

// NewNacosListener creates a data listener for nacos
func NewNacosListener(url, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
// NewNacosListenerWithServiceName creates a data listener for nacos
func NewNacosListenerWithServiceName(serviceName string, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenURL: url,
serviceName: serviceName,
regURL: regURL,
events: gxchan.NewUnboundedChan(32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListen()
return listener, err
}

// NewNacosListener creates a data listener for nacos
func NewNacosListenerWithServiceName(serviceName string, url, regURL *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenURL: url,
regURL: regURL,
events: gxchan.NewUnboundedChan(32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListenWithServiceName(serviceName)
err := listener.listenService(serviceName)
return listener, err
}

Expand Down Expand Up @@ -188,37 +174,18 @@ func getSubscribeName(url *common.URL) string {
return buffer.String()
}

func (nl *nacosListener) startListen() error {
func (nl *nacosListener) listenService(serviceName string) error {
if nl.namingClient == nil {
return perrors.New("nacos naming namingClient stopped")
}
nl.subscribeParam = createSubscribeParam(nl.listenURL, nl.regURL, nl.Callback)
nl.subscribeParam = createSubscribeParam(serviceName, nl.regURL, nl.Callback)
if nl.subscribeParam == nil {
return perrors.New("create nacos subscribeParam failed")
}
go func() {
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
if err == nil {
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
}
}()
return nil
}

func (nl *nacosListener) startListenWithServiceName(serviceName string) error {
if nl.namingClient == nil {
return perrors.New("nacos naming namingClient stopped")
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
if err == nil {
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
}
nl.subscribeParam = createSubscribeParamWithServiceName(serviceName, nl.regURL, nl.Callback)
if nl.subscribeParam == nil {
return perrors.New("create nacos subscribeParam failed")
}
go func() {
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
if err == nil {
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
}
}()
return nil
}

Expand All @@ -235,7 +202,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-nl.done:
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenURL)
logger.Warnf("nacos listener is close!service name:%+v", nl.serviceName)
return nil, perrors.New("listener stopped")

case val := <-nl.events.Out():
Expand Down
60 changes: 22 additions & 38 deletions registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package nacos
import (
"bytes"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -158,10 +159,6 @@ func (nr *nacosRegistry) UnRegister(url *common.URL) error {
return nil
}

func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return NewNacosListener(conf, nr.URL, nr.namingClient)
}

// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
// TODO
Expand All @@ -178,7 +175,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
return perrors.New("nacosRegistry is not available.")
}

services, err := nr.getAllSubscribeServiceNames()
services, err := nr.getAllSubscribeServiceNames(url)
if err != nil {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
Expand All @@ -190,7 +187,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}

for _, service := range services {
listener, err := nr.subscribeToService(url, service)
listener, err := NewNacosListenerWithServiceName(service, nr.URL, nr.namingClient)
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
logger.Warnf("Failed to subscribe to service '%s': %v", service, err)
Expand All @@ -208,7 +205,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
return perrors.New("nacosRegistry is not available.")
}

listener, err := nr.subscribe(url)
listener, err := NewNacosListenerWithServiceName(getSubscribeName(url), nr.URL, nr.namingClient)
metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil))
if err != nil {
if !nr.IsAvailable() {
Expand All @@ -225,25 +222,33 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}

// getAllServices retrieves the list of all services from the registry
func (nr *nacosRegistry) getAllSubscribeServiceNames() ([]string, error) {
func (nr *nacosRegistry) getAllSubscribeServiceNames(url *common.URL) ([]string, error) {
services, err := nr.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: nr.GetParam(constant.RegistryGroupKey, defaultGroup),
PageNo: 1,
PageSize: 10,
PageSize: math.MaxInt32,
})
subScribeServiceNames := []string{}
var subScribeServiceNames []string
categories := strings.Split(url.GetParam(constant.CategoryKey, constant.DefaultCategory), constant.CommaSeparator)
for _, dom := range services.Doms {
if strings.HasPrefix(dom, "providers:") {
subScribeServiceNames = append(subScribeServiceNames, dom)
if strings.Contains(dom, constant.NacosServiceNameSeparator) {
realCategory := strings.Split(dom, constant.NacosServiceNameSeparator)[0]
if contains(categories, realCategory) {
subScribeServiceNames = append(subScribeServiceNames, dom)
}
}
}

return subScribeServiceNames, err
}

// subscribeToService subscribes to a specific service in the registry
func (nr *nacosRegistry) subscribeToService(url *common.URL, service string) (listener registry.Listener, err error) {
return NewNacosListenerWithServiceName(service, url, nr.URL, nr.namingClient)
func contains(slice []string, element string) bool {
for _, item := range slice {
if item == element {
return true
}
}
return false
}

// handleServiceEvents receives service events from the listener and notifies the notifyListener
Expand All @@ -262,7 +267,7 @@ func (nr *nacosRegistry) handleServiceEvents(listener registry.Listener, notifyL

// UnSubscribe :
func (nr *nacosRegistry) UnSubscribe(url *common.URL, _ registry.NotifyListener) error {
param := createSubscribeParam(url, nr.URL, nil)
param := createSubscribeParam(getSubscribeName(url), nr.URL, nil)
if param == nil {
return nil
}
Expand Down Expand Up @@ -294,28 +299,7 @@ func (nr *nacosRegistry) LoadSubscribeInstances(url *common.URL, notify registry
return nil
}

func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribeParam {
serviceName := getSubscribeName(url)
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
if cb == nil {
v, ok := listenerCache.Load(serviceName + groupName)
if !ok {
return nil
}
listener, ok := v.(*nacosListener)
if !ok {
return nil
}
cb = listener.Callback
}
return &vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: cb,
GroupName: groupName,
}
}

func createSubscribeParamWithServiceName(serviceName string, regUrl *common.URL, cb callback) *vo.SubscribeParam {
func createSubscribeParam(serviceName string, regUrl *common.URL, cb callback) *vo.SubscribeParam {
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
if cb == nil {
v, ok := listenerCache.Load(serviceName + groupName)
Expand Down
Loading