From 83500dc29e1a247ad5134460eb625f1c23ce5d0a Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 27 Nov 2024 11:47:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=94=AF=E6=8C=81=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E7=AE=A1=E7=90=86=E5=91=98=E5=B8=90=E6=88=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apiserver/nacosserver/v1/config/watch.go | 20 ++++ apiserver/nacosserver/v2/config/watch.go | 8 ++ cache/api/types.go | 2 +- common/model/client.go | 2 + common/model/config_file.go | 13 +-- common/model/http.go | 13 ++- config/api.go | 4 + config/client.go | 99 ++++++++++++++++++- config/interceptor/auth/config_file.go | 11 +++ .../paramcheck/config_file_check.go | 30 ++++++ config/watcher.go | 22 +++++ 11 files changed, 213 insertions(+), 11 deletions(-) diff --git a/apiserver/nacosserver/v1/config/watch.go b/apiserver/nacosserver/v1/config/watch.go index 42365ca85..b7fee566b 100644 --- a/apiserver/nacosserver/v1/config/watch.go +++ b/apiserver/nacosserver/v1/config/watch.go @@ -29,6 +29,7 @@ import ( ) type LongPollWatchContext struct { + lock sync.RWMutex clientId string labels map[string]string once sync.Once @@ -76,6 +77,9 @@ func (c *LongPollWatchContext) ClientID() string { // ShouldNotify . func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease) bool { + c.lock.RLock() + defer c.lock.RUnlock() + key := event.FileKey() watchFile, ok := c.watchConfigFiles[key] if !ok { @@ -90,6 +94,9 @@ func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease } func (c *LongPollWatchContext) ListWatchFiles() []*config_manage.ClientConfigFileInfo { + c.lock.RLock() + defer c.lock.RUnlock() + ret := make([]*config_manage.ClientConfigFileInfo, 0, len(c.watchConfigFiles)) for _, v := range c.watchConfigFiles { ret = append(ret, v) @@ -97,14 +104,27 @@ func (c *LongPollWatchContext) ListWatchFiles() []*config_manage.ClientConfigFil return ret } +func (c *LongPollWatchContext) CurWatchVersion(k string) uint64 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.watchConfigFiles[k].GetVersion().GetValue() +} + // AppendInterest . func (c *LongPollWatchContext) AppendInterest(item *config_manage.ClientConfigFileInfo) { + c.lock.Lock() + defer c.lock.Unlock() + key := model.BuildKeyForClientConfigFileInfo(item) c.watchConfigFiles[key] = item } // RemoveInterest . func (c *LongPollWatchContext) RemoveInterest(item *config_manage.ClientConfigFileInfo) { + c.lock.Lock() + defer c.lock.Unlock() + key := model.BuildKeyForClientConfigFileInfo(item) delete(c.watchConfigFiles, key) } diff --git a/apiserver/nacosserver/v2/config/watch.go b/apiserver/nacosserver/v2/config/watch.go index 185a1de73..187ea0582 100644 --- a/apiserver/nacosserver/v2/config/watch.go +++ b/apiserver/nacosserver/v2/config/watch.go @@ -123,6 +123,14 @@ func (c *StreamWatchContext) ListWatchFiles() []*apiconfig.ClientConfigFileInfo return c.watchConfigFiles.Values() } +func (c *StreamWatchContext) CurWatchVersion(k string) uint64 { + val, ok := c.watchConfigFiles.Load(k) + if !ok { + return 0 + } + return val.GetVersion().GetValue() +} + // AppendInterest . func (c *StreamWatchContext) AppendInterest(item *apiconfig.ClientConfigFileInfo) { key := model.BuildKeyForClientConfigFileInfo(item) diff --git a/cache/api/types.go b/cache/api/types.go index feee9f4a2..2f184cf7a 100644 --- a/cache/api/types.go +++ b/cache/api/types.go @@ -318,7 +318,7 @@ type ( QueryInstances(filter, metaFilter map[string]string, offset, limit uint32) (uint32, []*model.Instance, error) // DiscoverServiceInstances 服务发现获取实例 DiscoverServiceInstances(serviceID string, onlyHealthy bool) []*model.Instance - // RemoveService + // RemoveService RemoveService(serviceID string) } ) diff --git a/common/model/client.go b/common/model/client.go index dc0d6fd30..3d85fd2f3 100644 --- a/common/model/client.go +++ b/common/model/client.go @@ -141,4 +141,6 @@ const ( ClientLabel_Version = "CLIENT_VERSION" // ClientLabel_Language 客户端语言 ClientLabel_Language = "CLIENT_LANGUAGE" + // ClientLabel_Host 客户端主机名 + ClientLabel_Host = "CLIENT_HOST" ) diff --git a/common/model/config_file.go b/common/model/config_file.go index 32ae2577c..0db62a0a2 100644 --- a/common/model/config_file.go +++ b/common/model/config_file.go @@ -565,17 +565,18 @@ type Subscriber struct { // ConfigSubscribers 以文件视角的监听数据 type ConfigSubscribers struct { // key - key ConfigFileKey + Key ConfigFileKey // VersionClients 版本对应的客户端 - VersionClients []*struct { - Versoin uint64 `json:"versoin"` - Subscribers []Subscriber `json:"subscribers"` - } `json:"clients"` + VersionClients []*VersionClient `json:"clients"` +} + +type VersionClient struct { + Versoin uint64 `json:"versoin"` + Subscribers []*Subscriber `json:"subscribers"` } // FileReleaseSubscribeInfo 文件订阅信息 type FileReleaseSubscribeInfo struct { - Id uint64 `json:"id"` Name string `json:"name"` Namespace string `json:"namespace"` Group string `json:"group"` diff --git a/common/model/http.go b/common/model/http.go index f81171fe6..1df99633b 100644 --- a/common/model/http.go +++ b/common/model/http.go @@ -17,7 +17,11 @@ package model -import "net/http" +import ( + "net/http" + + api "github.com/polarismesh/polaris/common/api/v1" +) type DebugHandlerGroup struct { Name string @@ -35,3 +39,10 @@ type CommonResponse struct { Info string `json:"info"` Data interface{} `json:"data"` } + +func NewCommonResponse(code uint32) *CommonResponse { + return &CommonResponse{ + Code: code, + Info: api.Code2Info(code), + } +} diff --git a/config/api.go b/config/api.go index 78c35a94b..2594a2eb9 100644 --- a/config/api.go +++ b/config/api.go @@ -69,6 +69,10 @@ type ConfigFileOperate interface { configFiles []*apiconfig.ConfigFile, conflictHandling string) *apiconfig.ConfigImportResponse // GetAllConfigEncryptAlgorithms 获取配置加密算法 GetAllConfigEncryptAlgorithms(ctx context.Context) *apiconfig.ConfigEncryptAlgorithmResponse + // GetClientSubscribers 获取客户端订阅者 + GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse + // GetConfigSubscribers 获取配置订阅者 + GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse } // ConfigFileReleaseOperate 配置文件发布接口 diff --git a/config/client.go b/config/client.go index 4d1f3f3c3..99e45d08e 100644 --- a/config/client.go +++ b/config/client.go @@ -295,10 +295,103 @@ func (s *Server) PublishConfigFileFromClient(ctx context.Context, // GetConfigSubscribers 根据配置视角获取订阅者列表 func (s *Server) GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse { - return nil + namespace := filter["namespace"] + group := filter["group"] + fileName := filter["file_name"] + + key := utils.GenFileId(namespace, group, fileName) + clientIds, _ := s.watchCenter.watchers.Load(key) + if clientIds == nil { + return model.NewCommonResponse(uint32(apimodel.Code_NotFoundResource)) + } + + versionClients := map[uint64][]*model.Subscriber{} + clientIds.Range(func(val string) { + watchCtx, ok := s.watchCenter.clients.Load(val) + if !ok { + return + } + curVer := watchCtx.CurWatchVersion(key) + if _, ok := versionClients[curVer]; !ok { + versionClients[curVer] = []*model.Subscriber{} + } + + watchCtx.ClientLabels() + + versionClients[curVer] = append(versionClients[curVer], &model.Subscriber{ + ID: watchCtx.ClientID(), + Host: watchCtx.ClientLabels()[model.ClientLabel_Host], + Version: watchCtx.ClientLabels()[model.ClientLabel_Version], + ClientType: watchCtx.ClientLabels()[model.ClientLabel_Language], + }) + }) + + rsp := model.NewCommonResponse(uint32(apimodel.Code_ExecuteSuccess)) + rsp.Data = &model.ConfigSubscribers{ + Key: model.ConfigFileKey{ + Namespace: namespace, + Group: group, + Name: fileName, + }, + VersionClients: func() []*model.VersionClient { + ret := make([]*model.VersionClient, 0, len(versionClients)) + for ver, clients := range versionClients { + ret = append(ret, &model.VersionClient{ + Versoin: ver, + Subscribers: clients, + }) + } + return ret + }(), + } + return rsp } -// GetConfigSubscribers 根据客户端视角获取订阅的配置文件列表 +// GetClientSubscribers 根据客户端视角获取订阅的配置文件列表 func (s *Server) GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse { - return nil + clientId := filter["client_id"] + watchCtx, ok := s.watchCenter.clients.Load(clientId) + if !ok { + return model.NewCommonResponse(uint32(apimodel.Code_NotFoundResource)) + } + + watchFiles := watchCtx.ListWatchFiles() + data := &model.ClientSubscriber{ + Subscriber: model.Subscriber{ + ID: watchCtx.ClientID(), + Host: watchCtx.ClientLabels()[model.ClientLabel_Host], + Version: watchCtx.ClientLabels()[model.ClientLabel_Version], + ClientType: watchCtx.ClientLabels()[model.ClientLabel_Language], + }, + Files: []model.FileReleaseSubscribeInfo{}, + } + + for _, file := range watchFiles { + key := model.BuildKeyForClientConfigFileInfo(file) + curVer := watchCtx.CurWatchVersion(key) + + ns := file.GetNamespace().GetValue() + group := file.GetGroup().GetValue() + filename := file.GetFileName().GetValue() + + data.Files = append(data.Files, model.FileReleaseSubscribeInfo{ + Name: file.GetName().GetValue(), + Namespace: ns, + Group: group, + FileName: filename, + ReleaseType: func() model.ReleaseType { + if gray := s.fileCache.GetActiveGrayRelease(ns, group, filename); gray != nil { + if gray.Version == curVer { + return model.ReleaseTypeGray + } + } + return model.ReleaseTypeFull + }(), + Version: curVer, + }) + } + + rsp := model.NewCommonResponse(uint32(apimodel.Code_ExecuteSuccess)) + rsp.Data = data + return rsp } diff --git a/config/interceptor/auth/config_file.go b/config/interceptor/auth/config_file.go index f515b8f55..e9e4245bd 100644 --- a/config/interceptor/auth/config_file.go +++ b/config/interceptor/auth/config_file.go @@ -23,6 +23,7 @@ import ( apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage" api "github.com/polarismesh/polaris/common/api/v1" + "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/common/model/auth" "github.com/polarismesh/polaris/common/utils" ) @@ -152,3 +153,13 @@ func (s *Server) GetAllConfigEncryptAlgorithms( ctx context.Context) *apiconfig.ConfigEncryptAlgorithmResponse { return s.nextServer.GetAllConfigEncryptAlgorithms(ctx) } + +// GetClientSubscribers 获取客户端订阅者 +func (s *Server) GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse { + return s.nextServer.GetClientSubscribers(ctx, filter) +} + +// GetConfigSubscribers 获取配置订阅者 +func (s *Server) GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse { + return s.nextServer.GetConfigSubscribers(ctx, filter) +} diff --git a/config/interceptor/paramcheck/config_file_check.go b/config/interceptor/paramcheck/config_file_check.go index cf70f043b..0888a38d5 100644 --- a/config/interceptor/paramcheck/config_file_check.go +++ b/config/interceptor/paramcheck/config_file_check.go @@ -23,8 +23,10 @@ import ( apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" + "google.golang.org/protobuf/types/known/wrapperspb" api "github.com/polarismesh/polaris/common/api/v1" + "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/common/utils" ) @@ -115,3 +117,31 @@ func (s *Server) GetAllConfigEncryptAlgorithms( ctx context.Context) *apiconfig.ConfigEncryptAlgorithmResponse { return s.nextServer.GetAllConfigEncryptAlgorithms(ctx) } + +// GetClientSubscribers 获取客户端订阅者 +func (s *Server) GetClientSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse { + clientId := filter["client_id"] + if clientId == "" { + return model.NewCommonResponse(uint32(apimodel.Code_BadRequest)) + } + return s.nextServer.GetClientSubscribers(ctx, filter) +} + +// GetConfigSubscribers 获取配置订阅者 +func (s *Server) GetConfigSubscribers(ctx context.Context, filter map[string]string) *model.CommonResponse { + namespace := filter["namespace"] + group := filter["group"] + fileName := filter["file_name"] + + if err := CheckFileName(wrapperspb.String(fileName)); err != nil { + return model.NewCommonResponse(uint32(apimodel.Code_InvalidConfigFileName)) + } + if err := utils.CheckResourceName(wrapperspb.String(group)); err != nil { + return model.NewCommonResponse(uint32(apimodel.Code_InvalidConfigFileGroupName)) + } + if err := utils.CheckResourceName(wrapperspb.String(namespace)); err != nil { + return model.NewCommonResponse(uint32(apimodel.Code_InvalidNamespaceName)) + } + + return s.nextServer.GetConfigSubscribers(ctx, filter) +} diff --git a/config/watcher.go b/config/watcher.go index 3d8323d5f..e49bc65c2 100644 --- a/config/watcher.go +++ b/config/watcher.go @@ -71,12 +71,15 @@ type ( ShouldExpire(now time.Time) bool // ListWatchFiles 列举出当前订阅的所有配置文件 ListWatchFiles() []*apiconfig.ClientConfigFileInfo + // CurWatchVersion 获取当前订阅的配置文件的版本 + CurWatchVersion(k string) uint64 // IsOnce 是不是只能被通知一次 IsOnce() bool } ) type LongPollWatchContext struct { + lock sync.RWMutex clientId string labels map[string]string once sync.Once @@ -120,6 +123,9 @@ func (c *LongPollWatchContext) ClientID() string { } func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease) bool { + c.lock.RLock() + defer c.lock.RUnlock() + if event.ReleaseType == model.ReleaseTypeGray && !c.betaMatcher(c.ClientLabels(), event) { return false } @@ -133,6 +139,9 @@ func (c *LongPollWatchContext) ShouldNotify(event *model.SimpleConfigFileRelease } func (c *LongPollWatchContext) ListWatchFiles() []*apiconfig.ClientConfigFileInfo { + c.lock.RLock() + defer c.lock.RUnlock() + ret := make([]*apiconfig.ClientConfigFileInfo, 0, len(c.watchConfigFiles)) for _, v := range c.watchConfigFiles { ret = append(ret, v) @@ -140,14 +149,27 @@ func (c *LongPollWatchContext) ListWatchFiles() []*apiconfig.ClientConfigFileInf return ret } +func (c *LongPollWatchContext) CurWatchVersion(k string) uint64 { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.watchConfigFiles[k].GetVersion().GetValue() +} + // AppendInterest . func (c *LongPollWatchContext) AppendInterest(item *apiconfig.ClientConfigFileInfo) { + c.lock.Lock() + defer c.lock.Unlock() + key := model.BuildKeyForClientConfigFileInfo(item) c.watchConfigFiles[key] = item } // RemoveInterest . func (c *LongPollWatchContext) RemoveInterest(item *apiconfig.ClientConfigFileInfo) { + c.lock.Lock() + defer c.lock.Unlock() + key := model.BuildKeyForClientConfigFileInfo(item) delete(c.watchConfigFiles, key) }