Skip to content

Commit

Permalink
add SubscribeMap for grpcClient
Browse files Browse the repository at this point in the history
  • Loading branch information
SoarYu committed Sep 6, 2022
1 parent 478b2a2 commit 0a5545e
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 114 deletions.
9 changes: 4 additions & 5 deletions bin/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ rm -rf coredns
rm -rf nacos-coredns-plugin

# clone current codes
git clone https://github.com/SoarYu/nacos-coredns-plugin.git
git clone https://github.com/nacos-group/nacos-coredns-plugin.git
git clone https://github.com/coredns/coredns.git


# cd coredns directory
cd $GOPATH/src/coredns
git checkout -b v1.9.3 v1.9.3
go get github.com/cihub/seelog
go get github.com/nacos-group/nacos-sdk-go/v2
go get github.com/soaryu/nacos-coredns-plugin/nacos

# copy nacos plugin to coredns
cp -r ../nacos-coredns-plugin/nacos plugin/
Expand All @@ -25,8 +22,10 @@ cp -r ../nacos-coredns-plugin/conf conf

# insert nacos into plugin
sed -i '/hosts/a\\t"nacos",' core/dnsserver/zdirectives.go
sed -i '/coredns\/plugin\/hosts/a\\t_ "github.com/soaryu/nacos-coredns-plugin/nacos"' core/plugin/zplugin.go
sed -i '/coredns\/plugin\/hosts/a\\t_ "github.com/coredns/coredns/plugin/nacos"' core/plugin/zplugin.go
sed -i '/hosts:hosts/a\nacos:nacos' plugin.cfg

go mod tidy

# build
make
2 changes: 0 additions & 2 deletions conf/nacos-coredns.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
nacos {
nacos_namespaceId public
nacos_server_host console.nacos.io:8848
# nacos_server 106.52.77.111
# nacos_server_port 8848
}
forward . /etc/resolv.conf
}
24 changes: 14 additions & 10 deletions nacos/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,26 @@ func (vs *Nacos) String() string {
return string(b)
}

func (vs *Nacos) managed(dom, clientIP string) bool {
if _, ok := DNSDomains[dom]; ok {
func (vs *Nacos) managed(service, clientIP string) bool {
if _, ok := DNSDomains[service]; ok {
return false
}

defer AllDoms.DLock.RUnlock()

AllDoms.DLock.RLock()
_, ok1 := AllDoms.Data[dom]

_, inCache := vs.NacosClientImpl.GetDomainCache().Get(dom)

// service had already subscribed but not in cache
if ok1 && !inCache {
AllDoms.Data[dom] = false
GrpcClient.Subscribe(dom)
_, ok1 := AllDoms.Data[service]

_, inCache := vs.NacosClientImpl.GetDomainCache().Get(service)

/*
ok1 means service is alive in server
根据dns请求订阅服务:
1.服务首次请求, 缓存中没有数据
2.插件初始化时在缓存文件中缓存了该服务数据, 但未订阅
*/
if ok1 && (!inCache || !GrpcClient.SubscribeMap[service]) {
vs.NacosClientImpl.getServiceNow(service, &vs.NacosClientImpl.serviceMap, clientIP)
}

return ok1 || inCache
Expand Down
114 changes: 50 additions & 64 deletions nacos/nacos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func init() {
}

type NacosClient struct {
domainMap ConcurrentMap
udpServer UDPServer
serviceMap ConcurrentMap
udpServer UDPServer
//serverManager ServerManager
//serverPort int
}
Expand Down Expand Up @@ -131,10 +131,10 @@ func initLog() {
}
}

func (nacosClient *NacosClient) asyncGetAllDomNAmes() {
func (nacosClient *NacosClient) asyncGetAllServiceNames() {
for {
time.Sleep(time.Duration(AllDoms.CacheSeconds) * time.Second)
nacosClient.getAllDomNames()
nacosClient.getAllServiceNames()
}
}

Expand All @@ -146,7 +146,7 @@ func (nacosClient *NacosClient) GetUdpServer() (us UDPServer) {
return nacosClient.udpServer
}

func (nacosClient *NacosClient) getAllDomNames() {
func (nacosClient *NacosClient) getAllServiceNames() {

services := GrpcClient.GetAllServicesInfo()
if services == nil {
Expand All @@ -156,20 +156,16 @@ func (nacosClient *NacosClient) getAllDomNames() {

AllDoms.DLock.Lock()
if AllDoms.Data == nil {
var allDoms map[string]bool
// subscribe services return from server
allDoms := make(map[string]bool)
// record all serviceNames return from server
for _, service := range services {
NacosClientLogger.Info("subscirbe service:", service)
GrpcClient.Subscribe(service)
allDoms[service] = true
}
AllDoms.Data = allDoms
AllDoms.CacheSeconds = 30 //刷新间隔30s
AllDoms.CacheSeconds = 10 //刷新间隔
} else {
for _, service := range services {
if !AllDoms.Data[service] {
NacosClientLogger.Info("subscirbe service:", service)
GrpcClient.Subscribe(service)
AllDoms.Data[service] = true
}
}
Expand All @@ -181,10 +177,10 @@ func (nacosClient *NacosClient) getAllDomNames() {
// nacosClient.serverManager.SetServers(servers)
//}

func (vc *NacosClient) Registered(dom string) bool {
func (vc *NacosClient) Registered(service string) bool {
defer AllDoms.DLock.RUnlock()
AllDoms.DLock.RLock()
_, ok1 := AllDoms.Data[dom]
_, ok1 := AllDoms.Data[service]

return ok1
}
Expand All @@ -211,28 +207,28 @@ func (vc *NacosClient) loadCache() {
continue
}

vc.domainMap.Set(f.Name(), service)
vc.serviceMap.Set(f.Name(), service)
}

NacosClientLogger.Info("finish loading cache, total: " + strconv.Itoa(len(files)))
}

func ProcessDomainString(s string) (Domain, error) {
var domain Domain
err1 := json.Unmarshal([]byte(s), &domain)
func ProcessDomainString(s string) (model.Service, error) {
var service model.Service
err1 := json.Unmarshal([]byte(s), &service)

if err1 != nil {
NacosClientLogger.Error("failed to unmarshal json string: "+s, err1)
return Domain{}, err1
return model.Service{}, err1
}

if len(domain.Instances) == 0 {
NacosClientLogger.Warn("get empty ip list, ignore it, dom: " + domain.Name)
return domain, NacosClientError{"empty ip list"}
if len(service.Hosts) == 0 {
NacosClientLogger.Warn("get empty ip list, ignore it, service: " + service.Name)
return service, NacosClientError{"empty ip list"}
}

NacosClientLogger.Info("domain "+domain.Name+" is updated, current ips: ", domain.Instances)
return domain, nil
NacosClientLogger.Info("domain "+service.Name+" is updated, current ips: ", service.Hosts)
return service, nil
}

func NewNacosClient(namespaceId string, serverHosts []string) *NacosClient {
Expand All @@ -255,11 +251,11 @@ func NewNacosClient(namespaceId string, serverHosts []string) *NacosClient {
AllDoms = AllDomsMap{}
AllDoms.Data = make(map[string]bool)
AllDoms.DLock = sync.RWMutex{}
AllDoms.CacheSeconds = 30
AllDoms.CacheSeconds = 10

vc.getAllDomNames()
vc.getAllServiceNames()

go vc.asyncGetAllDomNAmes()
go vc.asyncGetAllServiceNames()

//go vc.asyncUpdateDomain()

Expand All @@ -268,11 +264,11 @@ func NewNacosClient(namespaceId string, serverHosts []string) *NacosClient {
}

func (vc *NacosClient) GetDomainCache() ConcurrentMap {
return vc.domainMap
return vc.serviceMap
}

func (vc *NacosClient) GetDomain(name string) (*Domain, error) {
item, _ := vc.domainMap.Get(name)
item, _ := vc.serviceMap.Get(name)

if item == nil {
domain := Domain{}
Expand All @@ -281,7 +277,7 @@ func (vc *NacosClient) GetDomain(name string) (*Domain, error) {
domain.Name = ss[0]
domain.CacheMillis = DefaultCacheMillis
domain.LastRefMillis = CurrentMillis()
vc.domainMap.Set(name, domain)
vc.serviceMap.Set(name, domain)
item = domain
return nil, NacosClientError{"domain not found: " + name}
}
Expand All @@ -293,18 +289,18 @@ func (vc *NacosClient) GetDomain(name string) (*Domain, error) {

func (vc *NacosClient) asyncUpdateDomain() {
for {
for k, v := range vc.domainMap.Items() {
dom := v.(model.Service)
for k, v := range vc.serviceMap.Items() {
service := v.(model.Service)
ss := strings.Split(k, SEPERATOR)

domName := ss[0]
serviceName := ss[0]
var clientIP string
if len(ss) > 1 && ss[1] != "" {
clientIP = ss[1]
}

if uint64(CurrentMillis())-dom.LastRefTime > dom.CacheMillis && vc.Registered(domName) {
vc.getDomNow(domName, &vc.domainMap, clientIP)
if uint64(CurrentMillis())-service.LastRefTime > service.CacheMillis && vc.Registered(serviceName) {
vc.getServiceNow(serviceName, &vc.serviceMap, clientIP)
}
}
time.Sleep(1 * time.Second)
Expand All @@ -316,54 +312,44 @@ func GetCacheKey(dom, clientIP string) string {
return dom + SEPERATOR + clientIP
}

func (vc *NacosClient) getDomNow(serviceName string, cache *ConcurrentMap, clientIP string) model.Service {
func (vc *NacosClient) getServiceNow(serviceName string, cache *ConcurrentMap, clientIP string) model.Service {
service := GrpcClient.GetService(serviceName)

oldDomain, ok := cache.Get(serviceName)
if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {
if !ok {
NacosClientLogger.Info("dom not found in cache " + serviceName)
oldDomain = model.Service{}
}
NacosClientLogger.Info("dom "+serviceName+" updated: ", service)
}
cache.Set(serviceName, service)

//subscribe service
if !AllDoms.Data[serviceName] {
NacosClientLogger.Info("dom "+serviceName+" updated: ", service)

if !GrpcClient.SubscribeMap[serviceName] {
GrpcClient.Subscribe(serviceName)
AllDoms.Data[serviceName] = true
}

return service
}

func (vc *NacosClient) SrvInstance(domainName, clientIP string) *model.Instance {
item, hasDom := vc.domainMap.Get(domainName)
var dom model.Service
if !hasDom {
dom = vc.getDomNow(domainName, &vc.domainMap, clientIP)
vc.domainMap.Set(domainName, dom)
func (vc *NacosClient) SrvInstance(serviceName, clientIP string) *model.Instance {
item, hasService := vc.serviceMap.Get(serviceName)
var service model.Service
if !hasService {
service = vc.getServiceNow(serviceName, &vc.serviceMap, clientIP)
vc.serviceMap.Set(serviceName, service)
} else {
dom = item.(model.Service)
service = item.(model.Service)
}

//select healthy instances
var hosts []model.Instance
for _, host := range dom.Hosts {
for _, host := range service.Hosts {
if host.Healthy && host.Enable && host.Weight > 0 {
for i := 0; i < int(math.Ceil(host.Weight)); i++ {
hosts = append(hosts, host)
}
hosts = append(hosts, host)
}
}

if len(hosts) == 0 {
NacosClientLogger.Warn("no hosts for " + domainName)
NacosClientLogger.Warn("no healthy instances for " + serviceName)
return nil
}

i, indexOk := indexMap.Get(domainName)
i, indexOk := indexMap.Get(serviceName)
var index int

if !indexOk {
Expand All @@ -376,19 +362,19 @@ func (vc *NacosClient) SrvInstance(domainName, clientIP string) *model.Instance
}
}

indexMap.Set(domainName, index)
indexMap.Set(serviceName, index)

return &hosts[index]
}

func (vc *NacosClient) SrvInstances(domainName, clientIP string) []model.Instance {
cacheKey := GetCacheKey(domainName, clientIP)
item, hasDom := vc.domainMap.Get(cacheKey)
item, hasDom := vc.serviceMap.Get(cacheKey)
var dom model.Service

if !hasDom {
dom = vc.getDomNow(domainName, &vc.domainMap, clientIP)
vc.domainMap.Set(cacheKey, dom)
dom = vc.getServiceNow(domainName, &vc.serviceMap, clientIP)
vc.serviceMap.Set(cacheKey, dom)
} else {
dom = item.(model.Service)
}
Expand Down
19 changes: 8 additions & 11 deletions nacos/nacos_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func NewNacosClientTEST() *NacosClient {
return &vc
}

func TestNacosClient_getAllDomNames(t *testing.T) {
func TestNacosClient_getAllServiceNames(t *testing.T) {
GrpcClient = grpcClientTest
nacosClientTest.getAllDomNames()
nacosClientTest.getAllServiceNames()

AllDoms.DLock.Lock()

Expand All @@ -76,15 +76,12 @@ func TestNacosClient_getAllDomNames(t *testing.T) {

func TestNacosClient_getDomNow(t *testing.T) {
GrpcClient = grpcClientTest
nacosClientTest.getAllDomNames()

domainMapTest := NewConcurrentMap()
for k, _ := range AllDoms.Data {
s, ok := nacosClientTest.GetDomainCache().Get(k)
nacosClientTest.getAllServiceNames()
for serviceName, _ := range AllDoms.Data {
nacosClientTest.getServiceNow(serviceName, &nacosClientTest.serviceMap, "0.0.0.0")
s, ok := nacosClientTest.GetDomainCache().Get(serviceName)
assert.True(t, ok)
ncService := s.(model.Service)
service := nacosClientTest.getDomNow(k, &domainMapTest, "0.0.0.0")
assert.True(t, service.Name == ncService.Name)
return
service := s.(model.Service)
assert.True(t, serviceName == service.Name)
}
}
Loading

0 comments on commit 0a5545e

Please sign in to comment.