diff --git a/cmd/erda-server/bootstrap.yaml b/cmd/erda-server/bootstrap.yaml index 43649ff8e81..69733a5a4b4 100644 --- a/cmd/erda-server/bootstrap.yaml +++ b/cmd/erda-server/bootstrap.yaml @@ -371,9 +371,13 @@ erda.dop.rule.action.dingtalkworknotice: { } #erda.core.org: {} ########### cmp -cmp: { } +cmp: + # steve cache time to live, default 10 min + cache_ttl: ${CMP_STEVE_CACHE_TTL:10m} + # steve cache total size (MB), default 64MB + cache_size: ${CMP_STEVE_CACHE_SIZE:64} grpc-client@erda.core.monitor.metric: - addr: "${MONITOR_GRPC_ADDR:monitor:7080}" + addr: "${MONITOR_GRPC_ADDR:monitor:7080}"xwxxx block: false erda.core.monitor.metric-client: { } grpc-client@erda.core.monitor.alert: diff --git a/internal/apps/cmp/endpoints/endpoints.go b/internal/apps/cmp/endpoints/endpoints.go index ce4ba32b6f5..a2d39e66390 100644 --- a/internal/apps/cmp/endpoints/endpoints.go +++ b/internal/apps/cmp/endpoints/endpoints.go @@ -17,6 +17,7 @@ package endpoints import ( "context" "net/http" + "time" clusterpb "github.com/erda-project/erda-proto-go/core/clustermanager/cluster/pb" cronpb "github.com/erda-project/erda-proto-go/core/pipeline/cron/pb" @@ -62,10 +63,16 @@ type Endpoints struct { ClusterSvc clusterpb.ClusterServiceServer PipelineSvc pipelinepb.PipelineServiceServer - reportTable *resource.ReportTable - CronService cronpb.CronServiceServer - org org.Interface - registry registry.Interface + reportTable *resource.ReportTable + CronService cronpb.CronServiceServer + org org.Interface + registry registry.Interface + SteveCacheConfig *steveCacheConfig +} + +type steveCacheConfig struct { + TTL time.Duration + Size int } type Option func(*Endpoints) @@ -88,7 +95,7 @@ func New(ctx context.Context, db *dbclient.DBClient, js jsonstore.JsonStore, cac e.metrics = ctx.Value("metrics").(*metrics.Metric) e.Resource = ctx.Value("resource").(*resource.Resource) e.CachedJS = cachedJS - e.SteveAggregator = steve.NewAggregator(ctx, e.bdl, e.ClusterSvc) + e.SteveAggregator = steve.NewAggregator(ctx, e.bdl, e.ClusterSvc, e.SteveCacheConfig.TTL, e.SteveCacheConfig.Size) e.registry = registry.New(e.ClusterSvc) return e } @@ -97,6 +104,15 @@ func (e *Endpoints) GetCluster() *clusters.Clusters { return e.clusters } +func WithSteveCacheConfig(ttl time.Duration, size int) Option { + return func(e *Endpoints) { + e.SteveCacheConfig = &steveCacheConfig{ + TTL: ttl, + Size: size, + } + } +} + // WithBundle With bundle func WithBundle(bdl *bundle.Bundle) Option { return func(e *Endpoints) { diff --git a/internal/apps/cmp/initialize.go b/internal/apps/cmp/initialize.go index b6be35ea071..ea7b943ca93 100644 --- a/internal/apps/cmp/initialize.go +++ b/internal/apps/cmp/initialize.go @@ -218,6 +218,7 @@ func (p *provider) initEndpoints(ctx context.Context, db *dbclient.DBClient, js, endpoints.WithClusterServiceServer(p.ClusterSvc), endpoints.WithOrg(p.Org), endpoints.WithPipelineSvc(p.PipelineSvc), + endpoints.WithSteveCacheConfig(p.Cfg.SteveCacheTTL, p.Cfg.SteveCacheSize), ) // Sync org resource task status diff --git a/internal/apps/cmp/provider.go b/internal/apps/cmp/provider.go index 20e5cc58218..f827220a99e 100644 --- a/internal/apps/cmp/provider.go +++ b/internal/apps/cmp/provider.go @@ -67,6 +67,12 @@ type provider struct { Tran i18n.Translator `translator:"common"` SteveAggregator *steve.Aggregator Org org.Interface + Cfg *config +} + +type config struct { + SteveCacheTTL time.Duration `file:"cache_ttl" default:"10m"` + SteveCacheSize int `file:"cache_size" default:"5000"` } // Run Run the provider diff --git a/internal/apps/cmp/steve/aggregator.go b/internal/apps/cmp/steve/aggregator.go index 7e7a8a6ae66..9cfa0a99a0a 100644 --- a/internal/apps/cmp/steve/aggregator.go +++ b/internal/apps/cmp/steve/aggregator.go @@ -18,9 +18,9 @@ import ( "context" "fmt" "net/http" - "sync" "time" + "github.com/bluele/gcache" "github.com/pkg/errors" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/attributes" @@ -38,6 +38,8 @@ import ( "github.com/erda-project/erda/bundle" apierrors2 "github.com/erda-project/erda/bundle/apierrors" "github.com/erda-project/erda/internal/apps/cmp/steve/predefined" + "github.com/erda-project/erda/pkg/common/apis" + "github.com/erda-project/erda/pkg/discover" "github.com/erda-project/erda/pkg/http/httputil" "github.com/erda-project/erda/pkg/k8sclient" "github.com/erda-project/erda/pkg/k8sclient/config" @@ -52,50 +54,69 @@ type group struct { type Aggregator struct { Ctx context.Context Bdl *bundle.Bundle - servers sync.Map clusterSvc clusterpb.ClusterServiceServer + server gcache.Cache } // NewAggregator new an aggregator with steve servers for all current clusters -func NewAggregator(ctx context.Context, bdl *bundle.Bundle, clusterSvc clusterpb.ClusterServiceServer) *Aggregator { +func NewAggregator(ctx context.Context, bdl *bundle.Bundle, clusterSvc clusterpb.ClusterServiceServer, ttl time.Duration, size int) *Aggregator { a := &Aggregator{ Ctx: ctx, Bdl: bdl, clusterSvc: clusterSvc, } + + a.server = gcache.New(size).Expiration(ttl).LoaderFunc(a.loadFunc).LRU().Build() a.init() go a.watchClusters(ctx) return a } +func (a *Aggregator) loadFunc(key any) (any, error) { + ctx := apis.WithInternalClientContext(a.Ctx, discover.SvcCMP) + clusterName, ok := key.(string) + if !ok { + return nil, errors.Errorf("key:[%v] can't convert to string", key) + } + cluster, err := a.clusterSvc.GetCluster(ctx, &clusterpb.GetClusterRequest{IdOrName: clusterName}) + if err != nil { + return nil, err + } + clusterInfo := cluster.Data + g := &group{ready: false} + go a.prepareSteveServer(clusterInfo) + return g, nil +} + func (a *Aggregator) GetAllClusters() []string { var clustersNames []string - a.servers.Range(func(key, _ interface{}) bool { + servers := a.server.GetALL(false) + for key := range servers { if clusterName, ok := key.(string); ok { clustersNames = append(clustersNames, clusterName) } - return true - }) + } return clustersNames } // ListClusters list ready and unready clusters in steveAggregator func (a *Aggregator) ListClusters() (ready, unready []string) { - a.servers.Range(func(key, value interface{}) bool { - g := value.(*group) + servers := a.server.GetALL(false) + for key, item := range servers { + g := item.(*group) if g.ready { ready = append(ready, key.(string)) } else { unready = append(unready, key.(string)) } - return true - }) + } return } func (a *Aggregator) IsServerReady(clusterName string) bool { - s, ok := a.servers.Load(clusterName) - if !ok { + s, err := a.server.Get(clusterName) + if err != nil { + logrus.Errorf("fail to get server by clusterName , %s", err) return false } g := s.(*group) @@ -104,12 +125,13 @@ func (a *Aggregator) IsServerReady(clusterName string) bool { // HasAccess set schemas for apiOp and check access for user in apiOp func (a *Aggregator) HasAccess(clusterName string, apiOp *types.APIRequest, verb string) (bool, error) { - g, ok := a.servers.Load(clusterName) - if !ok { - return false, errors.Errorf("steve server not found for cluster %s", clusterName) + item, err := a.server.Get(clusterName) + if err != nil { + logrus.Errorf("fail to get server by clusterName , %s", err) + return false, errors.Errorf(" can't found steve server for cluster %s", clusterName) } - server := g.(*group).server + server := item.(*group).server if err := server.SetSchemas(apiOp); err != nil { return false, err } @@ -152,27 +174,27 @@ func (a *Aggregator) watchClusters(ctx context.Context) { continue } exists[cluster.Name] = struct{}{} - if _, ok := a.servers.Load(cluster.Name); ok { + if g, err := a.server.Get(cluster.Name); err == nil && g != nil { continue } a.Add(cluster) } var readyCluster []string - checkDeleted := func(key interface{}, value interface{}) (res bool) { - res = true - g, _ := value.(*group) + + cacheClusters := a.server.GetALL(false) + for name, cluster := range cacheClusters { + g, _ := cluster.(*group) if g.ready { - readyCluster = append(readyCluster, key.(string)) + readyCluster = append(readyCluster, name.(string)) } - if _, ok := exists[key.(string)]; ok { + if _, ok := exists[name.(string)]; ok { return } - a.Delete(key.(string)) - return + a.Delete(name.(string)) } - a.servers.Range(checkDeleted) + logrus.Infof("Clusters with ready steve server: %v", readyCluster) } } @@ -212,46 +234,58 @@ func (a *Aggregator) Add(clusterInfo *clusterpb.ClusterInfo) { if clusterInfo.Type != "k8s" && clusterInfo.Type != "edas" { return } - - if _, ok := a.servers.Load(clusterInfo.Name); ok { + if a.server.Has(clusterInfo.Name) { logrus.Infof("cluster %s is already existed, skip adding cluster", clusterInfo.Name) return } g := &group{ready: false} - a.servers.Store(clusterInfo.Name, g) - go func() { - logrus.Infof("creating predefined resource for cluster %s", clusterInfo.Name) - if err := a.createPredefinedResource(clusterInfo.Name); err != nil { - logrus.Infof("failed to create predefined resource for cluster %s, %v. Skip starting steve server", - clusterInfo.Name, err) - a.servers.Delete(clusterInfo.Name) - return - } - logrus.Infof("starting steve server for cluster %s", clusterInfo.Name) - var err error - server, cancel, err := a.createSteve(clusterInfo) - defer func() { - if err != nil { - if cancel != nil { - cancel() - } - a.servers.Delete(clusterInfo.Name) - } - }() - if err != nil { - logrus.Errorf("failed to create steve server for cluster %s, %v", clusterInfo.Name, err) - return - } + err := a.server.Set(clusterInfo.Name, g) + if err != nil { + logrus.Infof("set cluster %s error : %s, skip adding cluster", clusterInfo.Name, err.Error()) + return + } + go a.prepareSteveServer(clusterInfo) +} - g := &group{ - ready: true, - server: server, - cancel: cancel, +// prepareSteveServer creates steve server for a cluster. +func (a *Aggregator) prepareSteveServer(clusterInfo *clusterpb.ClusterInfo) { + if clusterInfo == nil { + return + } + logrus.Infof("creating predefined resource for cluster %s", clusterInfo.Name) + if err := a.createPredefinedResource(clusterInfo.Name); err != nil { + logrus.Infof("failed to create predefined resource for cluster %s, %v. Skip starting steve server", + clusterInfo.Name, err) + return + } + logrus.Infof("starting steve server for cluster %s", clusterInfo.Name) + var err error + server, cancel, err := a.createSteve(clusterInfo) + defer func() { + if err != nil { + if cancel != nil { + cancel() + } + a.server.Remove(clusterInfo.Name) } - a.servers.Store(clusterInfo.Name, g) - logrus.Infof("steve server for cluster %s started", clusterInfo.Name) }() + if err != nil { + logrus.Errorf("failed to create steve server for cluster %s, %v", clusterInfo.Name, err) + return + } + + g := &group{ + ready: true, + server: server, + cancel: cancel, + } + err = a.server.Set(clusterInfo.Name, g) + if err != nil { + logrus.Infof("set cluster %s error : %s, skip adding cluster", clusterInfo.Name, err.Error()) + return + } + logrus.Infof("steve server for cluster %s started", clusterInfo.Name) } func (a *Aggregator) createPredefinedResource(clusterName string) error { @@ -325,17 +359,21 @@ func (a *Aggregator) insureSystemNamespace(client *k8sclient.K8sClient) error { // Delete closes a steve server for k8s cluster with clusterName and delete it from aggregator func (a *Aggregator) Delete(clusterName string) { - g, ok := a.servers.Load(clusterName) - if !ok { + if !a.server.Has(clusterName) { logrus.Infof("steve server for cluster %s not existed, skip", clusterName) return } + g, err := a.server.Get(clusterName) + if err != nil { + logrus.Infof("can not get steve server for cluster %s, skip", clusterName) + return + } group, _ := g.(*group) if group.ready { group.cancel() } - a.servers.Delete(clusterName) + a.server.Remove(clusterName) logrus.Infof("steve server for cluster %s stopped", clusterName) } @@ -350,8 +388,8 @@ func (a *Aggregator) ServeHTTP(rw http.ResponseWriter, req *http.Request) { return } - s, ok := a.servers.Load(clusterName) - if !ok { + s, err := a.server.Get(clusterName) + if s == nil || err != nil { ctx := transport.WithHeader(a.Ctx, metadata.New(map[string]string{httputil.InternalHeader: "true"})) resp, err := a.clusterSvc.GetCluster(ctx, &clusterpb.GetClusterRequest{IdOrName: clusterName}) if err != nil { @@ -371,7 +409,9 @@ func (a *Aggregator) ServeHTTP(rw http.ResponseWriter, req *http.Request) { logrus.Infof("steve for cluster %s not exist, starting a new server", clusterInfo.Name) a.Add(clusterInfo) - if s, ok = a.servers.Load(clusterInfo.Name); !ok { + + if s, err = a.server.Get(clusterInfo.Name); err != nil { + logrus.Errorf("load cluster server error: %v", err) rw.WriteHeader(http.StatusInternalServerError) rw.Write(apistructs.NewSteveError(apistructs.ServerError, "Internal server error").JSON()) return @@ -389,8 +429,8 @@ func (a *Aggregator) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } func (a *Aggregator) Serve(clusterName string, apiOp *types.APIRequest) error { - s, ok := a.servers.Load(clusterName) - if !ok { + s, err := a.server.Get(clusterName) + if err != nil { ctx := transport.WithHeader(a.Ctx, metadata.New(map[string]string{httputil.InternalHeader: "true"})) resp, err := a.clusterSvc.GetCluster(ctx, &clusterpb.GetClusterRequest{IdOrName: clusterName}) if err != nil { @@ -404,7 +444,8 @@ func (a *Aggregator) Serve(clusterName string, apiOp *types.APIRequest) error { logrus.Infof("steve for cluster %s not exist, starting a new server", clusterInfo.Name) a.Add(clusterInfo) - if s, ok = a.servers.Load(clusterInfo.Name); !ok { + if s, err = a.server.Get(clusterInfo.Name); err != nil { + logrus.Errorf("load cluster server error: %v", err) return apierrors2.ErrInvoke.InternalError(errors.Errorf("failed to start steve server for cluster %s", clusterInfo.Name)) } }