From d799fb38baf953952bd1b4ec611bbd5769635bc4 Mon Sep 17 00:00:00 2001 From: baoyinghai Date: Mon, 18 Dec 2023 10:05:33 +0800 Subject: [PATCH] add a control switch and fix watch bug Co-authored-by: zhangyongxi Co-authored-by: wuyingjun Co-authored-by: zhouhao Signed-off-by: baoyinghai --- cmd/apiserver/app/options/options.go | 15 +++--- .../app/options/options.go | 12 +++-- cmd/clustersynchro-manager/app/synchro.go | 18 +++++++ pkg/apiserver/apiserver.go | 10 ++-- pkg/generated/openapi/zz_generated.openapi.go | 6 +++ .../internalstorage/resource_storage.go | 41 ++++++++++---- .../internalstorage/resource_storage_test.go | 8 +-- pkg/storage/internalstorage/storage.go | 22 +++++--- .../informer/resourceversion_storage.go | 19 ++++++- .../clustersynchro/resource_synchro.go | 53 ++++++++++--------- .../components/multi_cluster_event_pool.go | 45 ---------------- .../components/multi_cluster_watcher.go | 16 ++---- pkg/watcher/middleware/publisher.go | 1 + pkg/watcher/middleware/rabbitmq/register.go | 4 +- pkg/watcher/middleware/subscriber.go | 1 + pkg/watcher/options/options.go | 12 +++-- pkg/watcher/register.go | 8 +-- .../api/clusterpedia/v1beta1/types.go | 3 ++ .../v1beta1/zz_generated.conversion.go | 9 ++++ vendor/modules.txt | 3 ++ 20 files changed, 177 insertions(+), 129 deletions(-) diff --git a/cmd/apiserver/app/options/options.go b/cmd/apiserver/app/options/options.go index 03aa90df2..79a1bf4a4 100644 --- a/cmd/apiserver/app/options/options.go +++ b/cmd/apiserver/app/options/options.go @@ -26,6 +26,7 @@ import ( storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options" "github.com/clusterpedia-io/clusterpedia/pkg/watcher" watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components" + "github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware" watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options" ) @@ -46,7 +47,7 @@ type ClusterPediaServerOptions struct { Storage *storageoptions.StorageOptions - Subscriber *watchoptions.MiddlerwareOptions + Subscriber *watchoptions.MiddlewareOptions } func NewServerOptions() *ClusterPediaServerOptions { @@ -129,11 +130,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) { StorageFactory: storage, } - err = watcher.NewSubscriber(o.Subscriber) - watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize) - - if err != nil { - return nil, err + middleware.SubscriberEnabled = o.Subscriber.Enabled + if middleware.SubscriberEnabled { + err = watcher.NewSubscriber(o.Subscriber) + if err != nil { + return nil, err + } + watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize) } return config, nil diff --git a/cmd/clustersynchro-manager/app/options/options.go b/cmd/clustersynchro-manager/app/options/options.go index ae7d9c9f2..586dd532f 100644 --- a/cmd/clustersynchro-manager/app/options/options.go +++ b/cmd/clustersynchro-manager/app/options/options.go @@ -28,6 +28,7 @@ import ( storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro" "github.com/clusterpedia-io/clusterpedia/pkg/watcher" + "github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware" watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options" ) @@ -50,7 +51,7 @@ type Options struct { WorkerNumber int // WorkerNumber is the number of worker goroutines PageSizeForResourceSync int64 ShardingName string - Publisher *watchoptions.MiddlerwareOptions + Publisher *watchoptions.MiddlewareOptions } func NewClusterSynchroManagerOptions() (*Options, error) { @@ -137,9 +138,12 @@ func (o *Options) Config() (*config.Config, error) { return nil, err } - err = watcher.NewPulisher(o.Publisher) - if err != nil { - return nil, err + middleware.PublisherEnabled = o.Publisher.Enabled + if middleware.PublisherEnabled { + err = watcher.NewPulisher(o.Publisher) + if err != nil { + return nil, err + } } kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig) diff --git a/cmd/clustersynchro-manager/app/synchro.go b/cmd/clustersynchro-manager/app/synchro.go index a66305725..15662cfb3 100644 --- a/cmd/clustersynchro-manager/app/synchro.go +++ b/cmd/clustersynchro-manager/app/synchro.go @@ -26,6 +26,8 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" "github.com/clusterpedia-io/clusterpedia/pkg/version/verflag" + "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components" + "github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware" ) func init() { @@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error { } if !c.LeaderElection.LeaderElect { + if middleware.PublisherEnabled { + err := middleware.GlobalPublisher.InitPublisher(ctx) + if err != nil { + return err + } + } synchromanager.Run(c.WorkerNumber, ctx.Done()) return nil } @@ -138,6 +146,12 @@ func Run(ctx context.Context, c *config.Config) error { defer close(done) stopCh := ctx.Done() + if middleware.PublisherEnabled { + err := middleware.GlobalPublisher.InitPublisher(ctx) + if err != nil { + return + } + } synchromanager.Run(c.WorkerNumber, stopCh) }, OnStoppedLeading: func() { @@ -145,6 +159,10 @@ func Run(ctx context.Context, c *config.Config) error { if done != nil { <-done } + if middleware.PublisherEnabled { + middleware.GlobalPublisher.StopPublisher() + components.EC.CloseChannels() + } }, }, }) diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 03db90c27..adf72f888 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -110,10 +110,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) { // init event cache pool eventStop := make(chan struct{}) - watchcomponents.InitEventCachePool(eventStop) - err := middleware.GlobalSubscriber.InitSubscriber(eventStop) - if err != nil { - return nil, err + if middleware.SubscriberEnabled { + watchcomponents.InitEventCachePool(eventStop) + err := middleware.GlobalSubscriber.InitSubscriber(eventStop) + if err != nil { + return nil, err + } } discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig) diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 5361791e7..9afcfc08b 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -928,6 +928,12 @@ func schema_clusterpedia_io_api_clusterpedia_v1beta1_ListOptions(ref common.Refe Format: "", }, }, + "resourcePrefix": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "orderby": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index c29ed5f49..63df1b03c 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -286,19 +286,40 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s return obj, nil } +func (s *ResourceStorage) GenGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB { + condition := map[string]interface{}{ + "namespace": namespace, + "name": name, + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + "deleted": false, + } + + if cluster != "" { + condition["cluster"] = cluster + } + return s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition) +} + func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { - var objects [][]byte - if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil { + var resource Resource + if result := s.GenGetObjectQuery(ctx, cluster, namespace, name).First(&resource); result.Error != nil { return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error) } - obj, _, err := s.codec.Decode(objects[0], nil, into) + obj, _, err := s.codec.Decode(resource.Object, nil, into) if err != nil { return err } if obj != into { return fmt.Errorf("Failed to decode resource, into is %T", into) } + metaObj, err := meta.Accessor(obj) + if err != nil { + return err + } + metaObj.SetResourceVersion(utils.ParseInt642Str(resource.ClusterResourceVersion)) return nil } @@ -308,14 +329,13 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna result = &ResourceMetadataList{} } - var condition map[string]interface{} + condition := map[string]interface{}{ + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + } if !isAll { - condition = map[string]interface{}{ - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "deleted": false, - } + condition["deleted"] = false } query := s.db.WithContext(ctx).Model(&Resource{}).Where(condition) @@ -330,6 +350,7 @@ func (s *ResourceStorage) genListQuery(ctx context.Context, newfunc func() runti "group": s.storageGroupResource.Group, "version": s.storageVersion.Version, "resource": s.storageGroupResource.Resource, + "deleted": false, } query := s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(condition) _, _, query, err := applyListOptionsToResourceQuery(s.db, query, opts) diff --git a/pkg/storage/internalstorage/resource_storage_test.go b/pkg/storage/internalstorage/resource_storage_test.go index fe0ae5860..5cc5b6e92 100644 --- a/pkg/storage/internalstorage/resource_storage_test.go +++ b/pkg/storage/internalstorage/resource_storage_test.go @@ -200,8 +200,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) { "", "", expected{ - `SELECT "object" FROM "resources" WHERE "cluster" = '' AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`, - "SELECT `object` FROM `resources` WHERE `cluster` = '' AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1", + `SELECT cluster_resource_version, object FROM "resources" WHERE "deleted" = false AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`, + "SELECT cluster_resource_version, object FROM `resources` WHERE `deleted` = false AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1", "", }, }, @@ -212,8 +212,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) { "ns-1", "resource-1", expected{ - `SELECT "object" FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`, - "SELECT `object` FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1", + `SELECT cluster_resource_version, object FROM "resources" WHERE "cluster" = 'cluster-1' AND "deleted" = false AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`, + "SELECT cluster_resource_version, object FROM `resources` WHERE `cluster` = 'cluster-1' AND `deleted` = false AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1", "", }, }, diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index 61df7d815..42a265b6a 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -10,6 +10,7 @@ import ( internal "github.com/clusterpedia-io/api/clusterpedia" "github.com/clusterpedia-io/clusterpedia/pkg/storage" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer" "github.com/clusterpedia-io/clusterpedia/pkg/utils" watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components" "github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware" @@ -43,8 +44,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi KeyFunc: utils.GetKeyFunc(gvr, config.Namespaced), } - // initEventCache is true when Apiserver starts, false when clustersynchro-manager starts - if initEventCache { + // SubscriberEnabled is true when Apiserver starts and middleware enabled + if middleware.SubscriberEnabled { var cache *watchcomponents.EventCache buffer := watchcomponents.GetMultiClusterEventPool().GetClusterBufferByGVR(gvr) cachePool := watchcomponents.GetInitEventCachePool() @@ -72,7 +73,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi resourceStorage.buffer = buffer resourceStorage.eventCache = cache - } else { + } else if middleware.PublisherEnabled { // PublisherEnabled is true when clustersynchro-manager starts and middleware enabled err := middleware.GlobalPublisher.PublishTopic(gvr, config.Codec) if err != nil { return nil, err @@ -99,8 +100,11 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) { var resources []Resource - result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version"). - Where(map[string]interface{}{"cluster": cluster}). + result := f.db.WithContext(ctx).Select("group", "version", "resource", + "namespace", "name", "resource_version", "deleted", "published"). + Where(map[string]interface{}{"cluster": cluster, "deleted": false}). + //In case deleted event be losted when synchro manager do a leaderelection or reboot + Or(map[string]interface{}{"cluster": cluster, "deleted": true, "published": false}). Find(&resources) if result.Error != nil { return nil, InterpretDBError(cluster, result.Error) @@ -119,7 +123,13 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string if resource.Namespace != "" { key = resource.Namespace + "/" + resource.Name } - versions[key] = resource.ResourceVersion + versions[key] = informer.StorageElement{ + Version: resource.ResourceVersion, + Deleted: resource.Deleted, + Published: resource.Published, + Name: resource.Name, + Namespace: resource.Namespace, + } } return resourceversions, nil } diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go index f1610c13c..839f5ace3 100644 --- a/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go +++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go @@ -33,12 +33,21 @@ func (c *ResourceVersionStorage) Add(obj interface{}) error { if err != nil { return cache.KeyError{Obj: obj, Err: err} } + + c.cacheStorage.Delete(key) + accessor, err := meta.Accessor(obj) if err != nil { return err } - c.cacheStorage.Add(key, accessor.GetResourceVersion()) + c.cacheStorage.Add(key, StorageElement{ + Version: accessor.GetResourceVersion(), + Deleted: false, + Published: true, + Name: accessor.GetName(), + Namespace: accessor.GetNamespace(), + }) return nil } @@ -52,7 +61,13 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error { return err } - c.cacheStorage.Update(key, accessor.GetResourceVersion()) + c.cacheStorage.Update(key, StorageElement{ + Version: accessor.GetResourceVersion(), + Deleted: false, + Published: true, + Name: accessor.GetName(), + Namespace: accessor.GetNamespace(), + }) return nil } diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go index 7ef455d5e..48ece7324 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/resource_synchro.go @@ -27,6 +27,7 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features" "github.com/clusterpedia-io/clusterpedia/pkg/utils" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" + "github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware" ) type ResourceSynchroConfig struct { @@ -347,14 +348,26 @@ func (synchro *ResourceSynchro) OnDelete(obj interface{}) { if !synchro.isRunnableForStorage.Load() { return } + + if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { + if obj, ok = d.Obj.(*unstructured.Unstructured); !ok { + namespace, name, err := cache.SplitMetaNamespaceKey(d.Key) + if err != nil { + return + } + obj = &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}} + } + } + if o, ok := obj.(*unstructured.Unstructured); ok { synchro.pruneObject(o) } - obj, err := synchro.storage.ConvertDeletedObject(obj) - if err != nil { - return - } + // full obj is needed in watch feature + //obj, err := synchro.storage.ConvertDeletedObject(obj) + //if err != nil { + // return + //} _ = synchro.queue.Delete(obj) } @@ -389,22 +402,6 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) { if !ok { if _, ok = event.Object.(cache.DeletedFinalStateUnknown); !ok { return - } else { - dfs := event.Object.(cache.DeletedFinalStateUnknown) - var se informer.StorageElement - if se, ok = dfs.Obj.(informer.StorageElement); !ok { - return - } - var err error - obj, err = synchro.storage.GetObj(synchro.ctx, synchro.cluster, se.Namespace, se.Name) - if err != nil { - return - } - metaObj, err := meta.Accessor(obj) - if err == nil { - klog.Warning("DeletedFinalStateUnknown, name: ", metaObj.GetName(), ", time: ", metaObj.GetDeletionTimestamp(), - ", kind: ", obj.GetObjectKind().GroupVersionKind().Kind, ", cluster: ", synchro.cluster) - } } } key, _ := cache.MetaNamespaceKeyFunc(obj) @@ -437,9 +434,11 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) { Published: true, } synchro.rvsLock.Unlock() - err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster) - if err != nil { - return + if middleware.PublisherEnabled { + err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster) + if err != nil { + return + } } } } else { @@ -447,9 +446,11 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) { synchro.rvsLock.Lock() delete(synchro.rvs, key) synchro.rvsLock.Unlock() - err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster) - if err != nil { - return + if middleware.PublisherEnabled { + err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster) + if err != nil { + return + } } } } diff --git a/pkg/watcher/components/multi_cluster_event_pool.go b/pkg/watcher/components/multi_cluster_event_pool.go index 098a39e1f..3a5f248ee 100644 --- a/pkg/watcher/components/multi_cluster_event_pool.go +++ b/pkg/watcher/components/multi_cluster_event_pool.go @@ -16,7 +16,6 @@ var ( type MultiClusterEventPool struct { clusterbuffer map[schema.GroupVersionResource]*MultiClusterBuffer - //todo //use atomic.value instead of lock lock sync.Mutex } @@ -42,22 +41,6 @@ func (p *MultiClusterEventPool) GetClusterBufferByGVR(gvr schema.GroupVersionRes } } -/*func (p *MultiClusterEventPool) RemoveCluster(cluster string) { - p.lock.Lock() - defer p.lock.Unlock() - for _, multiClusterBuffer := range p.clusterbuffer { - multiClusterBuffer.removeCluster(cluster) - } -} - -func (p *MultiClusterEventPool) RemoveClusterWithGvr(cluster string, gvr schema.GroupVersionResource) { - p.lock.Lock() - defer p.lock.Unlock() - if multiClusterBuffer, ok := p.clusterbuffer[gvr]; ok { - multiClusterBuffer.removeCluster(cluster) - } -}*/ - type MultiClusterBuffer struct { gvr schema.GroupVersionResource watcherbuffer []*MultiClusterWatcher @@ -119,31 +102,3 @@ func (b *MultiClusterBuffer) ProcessCompleteEvent(event *watch.Event) error { } return nil } - -/*func (b *MultiClusterBuffer) UpdateObjectResourceVersion(obj runtime.Object, clusterName string) (*watchcache.ClusterResourceVersion, error) { - metaobj, err := meta.Accessor(obj) - if err != nil { - return nil, err - } - // clusterpedia will retry send event when storage db failed. in this case, rv has already been encoded - if isCrv(metaobj.GetResourceVersion()) { - return watchcache.NewClusterResourceVersionFromString(metaobj.GetResourceVersion()) - } else { - return b.resourceVersionSynchro.UpdateClusterResourceVersion(obj, clusterName) - } -} - -func isCrv(s string) bool { - _, err := strconv.ParseFloat(s, 64) - return err != nil -}*/ - -/*func (b *MultiClusterBuffer) UpdateObjectResourceVersion(obj runtime.Object, clusterName string) ([]byte, error) { - return b.resourceVersionSynchro.UpdateClusterResourceVersionWithBytes(obj, clusterName) -} - -func (b *MultiClusterBuffer) removeCluster(cluster string) { - b.lock.Lock() - defer b.lock.Unlock() - b.resourceVersionSynchro.RemoveCluster(cluster) -}*/ diff --git a/pkg/watcher/components/multi_cluster_watcher.go b/pkg/watcher/components/multi_cluster_watcher.go index c2be20422..6ca3fab72 100644 --- a/pkg/watcher/components/multi_cluster_watcher.go +++ b/pkg/watcher/components/multi_cluster_watcher.go @@ -18,7 +18,7 @@ import ( type FilterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool -// CacheWatcaher implements watch.Interface +// MultiClusterWatcher implements watch.Interface type MultiClusterWatcher struct { input chan *watch.Event //output @@ -78,7 +78,7 @@ func (w *MultiClusterWatcher) NonblockingAdd(event *watch.Event) bool { } } -// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +// Add Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) func (w *MultiClusterWatcher) Add(event *watch.Event, timer *time.Timer) bool { // Try to send the event immediately, without blocking. if w.NonblockingAdd(event) { @@ -145,11 +145,11 @@ func internalToUnstructured(internal runtime.Object, gvk schema.GroupVersionKind if err != nil { return nil, err } - unstructured, err := externalToUnstructured(into) + toUnstructured, err := externalToUnstructured(into) if err != nil { return nil, err } - return unstructured, nil + return toUnstructured, nil } func externalToUnstructured(obj interface{}) (*unstructured.Unstructured, error) { @@ -160,18 +160,12 @@ func externalToUnstructured(obj interface{}) (*unstructured.Unstructured, error) return &unstructured.Unstructured{Object: uncastObj}, nil } -// apiserver\pkg\storage\cacher\cacher.go:1339 这里区别比较大 估计会有bug +// apiserver\pkg\storage\cacher\cacher.go:1339 func (w *MultiClusterWatcher) convertToWatchEvent(event *watch.Event) *watch.Event { if event.Type == watch.Error || w.filter == nil { return event } - // defer func() { - // if err := recover(); err != nil { - // klog.Error(err) - // } - // }() - unstructuredData, err := internalToUnstructured(event.Object, w.gvk) if err != nil { klog.Error(err) diff --git a/pkg/watcher/middleware/publisher.go b/pkg/watcher/middleware/publisher.go index bf57ddd8b..1be4f4f99 100644 --- a/pkg/watcher/middleware/publisher.go +++ b/pkg/watcher/middleware/publisher.go @@ -11,6 +11,7 @@ import ( ) var GlobalPublisher Publisher +var PublisherEnabled bool = false type Publisher interface { InitPublisher(ctx context.Context) error diff --git a/pkg/watcher/middleware/rabbitmq/register.go b/pkg/watcher/middleware/rabbitmq/register.go index bbf63a65f..325e99509 100644 --- a/pkg/watcher/middleware/rabbitmq/register.go +++ b/pkg/watcher/middleware/rabbitmq/register.go @@ -15,7 +15,7 @@ const ( SubscribeerName = "rabbitmq" ) -func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) { +func NewPulisher(mo *options.MiddlewareOptions) (middleware.Publisher, error) { if mo.MaxConnections <= 0 { mo.MaxConnections = 3 } @@ -34,7 +34,7 @@ func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) { return publisher, nil } -func NewSubscriber(mo *options.MiddlerwareOptions) (middleware.Subscriber, error) { +func NewSubscriber(mo *options.MiddlewareOptions) (middleware.Subscriber, error) { if mo.MaxConnections <= 0 { mo.MaxConnections = 3 } diff --git a/pkg/watcher/middleware/subscriber.go b/pkg/watcher/middleware/subscriber.go index c81369d84..001d87475 100644 --- a/pkg/watcher/middleware/subscriber.go +++ b/pkg/watcher/middleware/subscriber.go @@ -7,6 +7,7 @@ import ( ) var GlobalSubscriber Subscriber +var SubscriberEnabled bool = false type Subscriber interface { InitSubscriber(stopCh <-chan struct{}) error diff --git a/pkg/watcher/options/options.go b/pkg/watcher/options/options.go index 22c44beb3..2efd25c9a 100644 --- a/pkg/watcher/options/options.go +++ b/pkg/watcher/options/options.go @@ -6,7 +6,8 @@ import ( "github.com/spf13/pflag" ) -type MiddlerwareOptions struct { +type MiddlewareOptions struct { + Enabled bool // middleware enabled Name string ServerIp string ServerPort int @@ -20,11 +21,11 @@ type MiddlerwareOptions struct { CacheSize int } -func NewMiddlerwareOptions() *MiddlerwareOptions { - return &MiddlerwareOptions{Name: "apiserver", CacheSize: 100} +func NewMiddlerwareOptions() *MiddlewareOptions { + return &MiddlewareOptions{Enabled: false, Name: "rabbitmq", CacheSize: 100} } -func (o *MiddlerwareOptions) Validate() []error { +func (o *MiddlewareOptions) Validate() []error { if o == nil { return nil } @@ -45,7 +46,8 @@ func (o *MiddlerwareOptions) Validate() []error { return errors } -func (o *MiddlerwareOptions) AddFlags(fs *pflag.FlagSet) { +func (o *MiddlewareOptions) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.Enabled, "middleware-enabled", o.Enabled, "middlerware enabled") fs.StringVar(&o.Name, "middleware-name", o.Name, "middlerware name") fs.StringVar(&o.ServerIp, "middleware-serverIp", o.ServerIp, "middlerware server Ip") fs.IntVar(&o.ServerPort, "middleware-serverPort", o.ServerPort, "middlerware server port") diff --git a/pkg/watcher/register.go b/pkg/watcher/register.go index 99877ade1..a80b90819 100644 --- a/pkg/watcher/register.go +++ b/pkg/watcher/register.go @@ -8,8 +8,8 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options" ) -type NewPublisherFunc func(mo *options.MiddlerwareOptions) (middleware.Publisher, error) -type NewSubscriberFunc func(mo *options.MiddlerwareOptions) (middleware.Subscriber, error) +type NewPublisherFunc func(mo *options.MiddlewareOptions) (middleware.Publisher, error) +type NewSubscriberFunc func(mo *options.MiddlewareOptions) (middleware.Subscriber, error) var publisherFuncs = make(map[string]NewPublisherFunc) var subscriberFuncs = make(map[string]NewSubscriberFunc) @@ -33,7 +33,7 @@ func RegisterSubscriberFunc(name string, f NewSubscriberFunc) { subscriberFuncs[name] = f } -func NewPulisher(mo *options.MiddlerwareOptions) error { +func NewPulisher(mo *options.MiddlewareOptions) error { provider, ok := publisherFuncs[mo.Name] if !ok { return fmt.Errorf("publisher %s is unregistered", mo.Name) @@ -48,7 +48,7 @@ func NewPulisher(mo *options.MiddlerwareOptions) error { return nil } -func NewSubscriber(mo *options.MiddlerwareOptions) error { +func NewSubscriber(mo *options.MiddlewareOptions) error { provider, ok := subscriberFuncs[mo.Name] if !ok { return fmt.Errorf("publisher %s is unregistered", mo.Name) diff --git a/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/types.go b/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/types.go index 25a827d17..6b9013708 100644 --- a/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/types.go +++ b/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/types.go @@ -23,6 +23,9 @@ type ListOptions struct { // +optional Namespaces string `json:"namespaces,omitempty"` + // +optional + ResourcePrefix string `json:"resourcePrefix,omitempty"` + // +optional OrderBy string `json:"orderby,omitempty"` diff --git a/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/zz_generated.conversion.go b/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/zz_generated.conversion.go index e0c81f294..83b5ac500 100644 --- a/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/zz_generated.conversion.go +++ b/staging/src/github.com/clusterpedia-io/api/clusterpedia/v1beta1/zz_generated.conversion.go @@ -196,6 +196,7 @@ func autoConvert_v1beta1_ListOptions_To_clusterpedia_ListOptions(in *ListOptions // WARNING: in.Names requires manual conversion: inconvertible types (string vs []string) // WARNING: in.ClusterNames requires manual conversion: inconvertible types (string vs []string) // WARNING: in.Namespaces requires manual conversion: inconvertible types (string vs []string) + out.ResourcePrefix = in.ResourcePrefix // WARNING: in.OrderBy requires manual conversion: inconvertible types (string vs []github.com/clusterpedia-io/api/clusterpedia.OrderBy) out.OwnerUID = in.OwnerUID out.OwnerName = in.OwnerName @@ -222,6 +223,7 @@ func autoConvert_clusterpedia_ListOptions_To_v1beta1_ListOptions(in *clusterpedi if err := runtime.Convert_Slice_string_To_string(&in.Namespaces, &out.Namespaces, s); err != nil { return err } + out.ResourcePrefix = in.ResourcePrefix // WARNING: in.OrderBy requires manual conversion: inconvertible types ([]github.com/clusterpedia-io/api/clusterpedia.OrderBy vs string) out.OwnerName = in.OwnerName out.OwnerUID = in.OwnerUID @@ -262,6 +264,13 @@ func autoConvert_url_Values_To_v1beta1_ListOptions(in *url.Values, out *ListOpti } else { out.Namespaces = "" } + if values, ok := map[string][]string(*in)["resourcePrefix"]; ok && len(values) > 0 { + if err := runtime.Convert_Slice_string_To_string(&values, &out.ResourcePrefix, s); err != nil { + return err + } + } else { + out.ResourcePrefix = "" + } if values, ok := map[string][]string(*in)["orderby"]; ok && len(values) > 0 { if err := runtime.Convert_Slice_string_To_string(&values, &out.OrderBy, s); err != nil { return err diff --git a/vendor/modules.txt b/vendor/modules.txt index 19e106f58..6b8748316 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -337,6 +337,9 @@ github.com/spf13/pflag # github.com/stoewer/go-strcase v1.2.0 ## explicit; go 1.11 github.com/stoewer/go-strcase +# github.com/streadway/amqp v1.1.0 +## explicit; go 1.10 +github.com/streadway/amqp # github.com/stretchr/testify v1.8.3 ## explicit; go 1.20 github.com/stretchr/testify/assert